[jira] [Commented] (SPARK-25557) ORC predicate pushdown for nested fields

2019-05-30 Thread Ivan Vergiliev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16851920#comment-16851920
 ] 

Ivan Vergiliev commented on SPARK-25557:


It seems like this was partially addressed by 
https://github.com/apache/spark/pull/23943 , and the remaining work now is to 
make it work for the v2 source - is that correct?

> ORC predicate pushdown for nested fields
> 
>
> Key: SPARK-25557
> URL: https://issues.apache.org/jira/browse/SPARK-25557
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: DB Tsai
>Assignee: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27105) Prevent exponential complexity in ORC `createFilter`

2019-03-08 Thread Ivan Vergiliev (JIRA)
Ivan Vergiliev created SPARK-27105:
--

 Summary: Prevent exponential complexity in ORC `createFilter`  
 Key: SPARK-27105
 URL: https://issues.apache.org/jira/browse/SPARK-27105
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Ivan Vergiliev


`OrcFilters.createFilters` currently has complexity that's exponential in the 
height of the filter tree. There are multiple places in Spark that try to 
prevent the generation of skewed trees so as to not trigger this behaviour, for 
example:
- `org.apache.spark.sql.catalyst.parser.AstBuilder.visitLogicalBinary` combines 
a number of binary logical expressions into a balanced tree.
- https://github.com/apache/spark/pull/22313 introduced a change to 
`OrcFilters` to create a balanced tree instead of a skewed tree.

However, the underlying exponential behaviour can still be triggered by code 
paths that don't go through any of the tree balancing methods. For example, if 
one generates a tree of `Column`s directly in user code, there's nothing in 
Spark that automatically balances that tree and, hence, skewed trees hit the 
exponential behaviour. We have hit this in production with jobs mysteriously 
taking hours on the Spark driver with no worker activity, with as few as ~30 OR 
filters.

I have a fix locally that makes the underlying logic have linear complexity 
instead of exponential complexity. With this fix, the code can handle thousands 
of filters in milliseconds. I'll send a PR with the fix soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26859) Reading ORC files with explicit schema can result in wrong data

2019-02-12 Thread Ivan Vergiliev (JIRA)
Ivan Vergiliev created SPARK-26859:
--

 Summary: Reading ORC files with explicit schema can result in 
wrong data
 Key: SPARK-26859
 URL: https://issues.apache.org/jira/browse/SPARK-26859
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Ivan Vergiliev


There is a bug in the ORC deserialization code that, when triggered, results in 
completely wrong data being read. I've marked this as a Blocker as per the docs 
in https://spark.apache.org/contributing.html as it's a data correctness issue.

The bug is triggered when the following set of conditions are all met:
- the non-vectorized ORC reader is being used;
- a schema is explicitly specified when reading the ORC file
- the provided schema has columns not present in the ORC file, and these 
columns are in the middle of the schema
- the ORC file being read contains null values in the columns after the ones 
added by the schema.

When all of these are met:
- the internal state of the ORC deserializer gets messed up, and, as a result
- the null values from the ORC file end up being set on wrong columns, not the 
one they're in, and
- the old values from the null columns don't get cleared from the previous 
record.

Here's a concrete example. Let's consider the following DataFrame:

{code:scala}
val rdd = sparkContext.parallelize(Seq((1, 2, "abc"), (4, 5, "def"), 
(8, 9, null)))
val df = rdd.toDF("col1", "col2", "col3")
{code}

and the following schema:

{code:scala}
col1 int, col4 int, col2 int, col3 string
{code}

Notice the `col4 int` added in the middle that doesn't exist in the dataframe.

Saving this dataframe to ORC and then reading it back with the specified schema 
should result in reading the same values, with nulls for `col4`. Instead, we 
get the following back:


{code:java}
[1,null,2,abc]
[4,null,5,def]
[8,null,null,def]
{code}

Notice how the `def` from the second record doesn't get properly cleared and 
ends up in the third record as well; also, instead of `col2 = 9` in the last 
record as expected, we get the null that should've been in column 3 instead.

*Impact*
When this issue is triggered, it results in completely wrong results being read 
from the ORC file. The set of conditions under which it gets triggered is 
somewhat narrow so the set of affected users is probably limited. There are 
possibly also people that are affected but haven't realized it because the 
conditions are so obscure.

*Bug details*
The issue is caused by calling `setNullAt` with a wrong index in 
`OrcDeserializer.scala:deserialize()`. I have a fix that I'll send out for 
review shortly.

*Workaround*
This bug is currently only triggered when new columns are added to the middle 
of the schema. This means that it can be worked around by only adding new 
columns at the end.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2984) FileNotFoundException on _temporary directory

2015-09-15 Thread Ivan Vergiliev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745073#comment-14745073
 ] 

Ivan Vergiliev commented on SPARK-2984:
---

I'm also seeing this - Spark 1.3.1, Spark Standalone, speculation enabled. Any 
ideas on what we can do to avoid this (except for disabling speculation)?

Stack trace:

{code}
[2015-09-15 08:25:18,090] WARN  .jobserver.JobManagerActor [] 
[akka://JobServer/user/context-supervisor/default-context] - Exception from job 
419b8ce6-8284-46a6-bd5e-55585c844105:
java.io.FileNotFoundException: File 
hdfs://FOLDER/_temporary/0/task_201509150825_115810_r_33 does not exist.
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:697)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1020)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:903)
at 
org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopFile(JavaPairRDD.scala:798)
at com.leanplum.spark.SessionsJob.runJob(SessionsJob.java:104)
at 
spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

Thanks,
Ivan

> FileNotFoundException on _temporary directory
> -
>
> Key: SPARK-2984
> URL: https://issues.apache.org/jira/browse/SPARK-2984
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Andrew Ash
>Assignee: Josh Rosen
>Priority: Critical
> Fix For: 1.3.0
>
>
> We've seen several stacktraces and threads on the user mailing list where 
> people are having issues with a {{FileNotFoundException}} stemming from an 
> HDFS path containing {{_temporary}}.
> I ([~aash]) think this may be related to {{spark.speculation}}.  I think the 
> error condition might manifest in this circumstance:
> 1) task T starts on a executor E1
> 2) it takes a long time, so task T' is started on another executor E2
> 3) T finishes in E1 so moves its data from {{_temporary}} to the final 
> destination and deletes the {{_temporary}} directory during cleanup
> 4) T' finishes in E2 and attempts to move its data from {{_temporary}}, but 
> those files no longer exist!  exception
> Some samples:
> {noformat}
> 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job 
> 140774430 ms.0
> java.io.FileNotFoundException: File 
> hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07
>  does not exist.
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
> at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
> at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
> at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
> at 
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
> at 
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
> 

[jira] [Created] (SPARK-5269) BlockManager.dataDeserialize always creates a new serializer instance

2015-01-15 Thread Ivan Vergiliev (JIRA)
Ivan Vergiliev created SPARK-5269:
-

 Summary: BlockManager.dataDeserialize always creates a new 
serializer instance
 Key: SPARK-5269
 URL: https://issues.apache.org/jira/browse/SPARK-5269
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Ivan Vergiliev


BlockManager.dataDeserialize always creates a new instance of the serializer, 
which is pretty slow in some cases. I'm using Kryo serialization and have a 
custom registrator, and its register method is showing up as taking about 15% 
of the execution time in my profiles. This started happening after I increased 
the number of keys in a job with a shuffle phase by a factor of 40.

One solution I can think of is to create a ThreadLocal SerializerInstance for 
the defaultSerializer, and only create a new one if a custom serializer is 
passed in. AFAICT a custom serializer is passed only from DiskStore.getValues, 
and that, on the other hand, depends on the serializer passed to 
ExternalSorter. I don't know how often this is used, but I think this can still 
be a good solution for the standard use case.
Oh, and also - ExternalSorter already has a SerializerInstance, so if the 
getValues method is called from a single thread, maybe we can pass that 
directly?

I'd be happy to try a patch but would probably need a confirmation from someone 
that this approach would indeed work (or an idea for another).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4743) Use SparkEnv.serializer instead of closureSerializer in aggregateByKey and foldByKey

2014-12-04 Thread Ivan Vergiliev (JIRA)
Ivan Vergiliev created SPARK-4743:
-

 Summary: Use SparkEnv.serializer instead of closureSerializer in 
aggregateByKey and foldByKey
 Key: SPARK-4743
 URL: https://issues.apache.org/jira/browse/SPARK-4743
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Ivan Vergiliev


AggregateByKey and foldByKey in PairRDDFunctions both use the closure 
serializer to serialize and deserialize the initial value. This means that the 
Java serializer is always used, which can be very expensive if there's a large 
number of groups. Calling combineByKey manually and using the normal serializer 
instead of the closure one improved the performance on the dataset I'm testing 
with by about 30-35%.

I'm not familiar enough with the codebase to be certain that replacing the 
serializer here is OK, but it works correctly in my tests, and it's only 
serializing a single value of type U, which should be serializable by the 
default one since it can be the output of a job. Let me know if I'm missing 
anything.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org