[jira] [Updated] (SPARK-23207) Shuffle+Repartition on an DataFrame could lead to incorrect answers
[ https://issues.apache.org/jira/browse/SPARK-23207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-23207: - Affects Version/s: 1.6.0 2.0.0 2.1.0 2.2.0 > Shuffle+Repartition on an DataFrame could lead to incorrect answers > --- > > Key: SPARK-23207 > URL: https://issues.apache.org/jira/browse/SPARK-23207 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0 >Reporter: Jiang Xingbo >Assignee: Jiang Xingbo >Priority: Blocker > Labels: correctness > Fix For: 2.3.0 > > > Currently shuffle repartition uses RoundRobinPartitioning, the generated > result is nondeterministic since the sequence of input rows are not > determined. > The bug can be triggered when there is a repartition call following a shuffle > (which would lead to non-deterministic row ordering), as the pattern shows > below: > upstream stage -> repartition stage -> result stage > (-> indicate a shuffle) > When one of the executors process goes down, some tasks on the repartition > stage will be retried and generate inconsistent ordering, and some tasks of > the result stage will be retried generating different data. > The following code returns 931532, instead of 100: > {code} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => > x > }.repartition(200).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { > throw new Exception("pkill -f java".!!) > } > x > } > res.distinct().count() > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE
[ https://issues.apache.org/jira/browse/SPARK-25114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587020#comment-16587020 ] Jiang Xingbo commented on SPARK-25114: -- The PR has been merged to master and 2.3 > RecordBinaryComparator may return wrong result when subtraction between two > words is divisible by Integer.MAX_VALUE > --- > > Key: SPARK-25114 > URL: https://issues.apache.org/jira/browse/SPARK-25114 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo >Priority: Blocker > Labels: correctness > > It is possible for two objects to be unequal and yet we consider them as > equal within RecordBinaryComparator, if the long values are separated by > Int.MaxValue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25161) Fix several bugs in failure handling of barrier execution mode
Jiang Xingbo created SPARK-25161: Summary: Fix several bugs in failure handling of barrier execution mode Key: SPARK-25161 URL: https://issues.apache.org/jira/browse/SPARK-25161 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo Fix several bugs in failure handling of barrier execution mode: * Mark TaskSet for a barrier stage as zombie when a task attempt fails; * Multiple barrier task failures from a single barrier stage should not trigger multiple stage retries; * Barrier task failure from a previous failed stage attempt should not trigger stage retry; * Fail the job when a task from a barrier ResultStage failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24941) Add RDDBarrier.coalesce() function
[ https://issues.apache.org/jira/browse/SPARK-24941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579930#comment-16579930 ] Jiang Xingbo commented on SPARK-24941: -- Shall we add something like `spark.default.parallelism`? It maybe not like a fixed number but be a fraction to say that any barrier stage shall launch tasks less than the fraction * totalCores ? > Add RDDBarrier.coalesce() function > -- > > Key: SPARK-24941 > URL: https://issues.apache.org/jira/browse/SPARK-24941 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > https://github.com/apache/spark/pull/21758#discussion_r204917245 > The number of partitions from the input data can be unexpectedly large, eg. > if you do > {code} > sc.textFile(...).barrier().mapPartitions() > {code} > The number of input partitions is based on the hdfs input splits. We shall > provide a way in RDDBarrier to enable users to specify the number of tasks in > a barrier stage. Maybe something like RDDBarrier.coalesce(numPartitions: Int) > . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE
[ https://issues.apache.org/jira/browse/SPARK-25114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579879#comment-16579879 ] Jiang Xingbo commented on SPARK-25114: -- I created https://github.com/apache/spark/pull/22101 for this. > RecordBinaryComparator may return wrong result when subtraction between two > words is divisible by Integer.MAX_VALUE > --- > > Key: SPARK-25114 > URL: https://issues.apache.org/jira/browse/SPARK-25114 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo >Priority: Blocker > Labels: correctness > > It is possible for two objects to be unequal and yet we consider them as > equal within RecordBinaryComparator, if the long values are separated by > Int.MaxValue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE
[ https://issues.apache.org/jira/browse/SPARK-25114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-25114: - Labels: correctness (was: ) > RecordBinaryComparator may return wrong result when subtraction between two > words is divisible by Integer.MAX_VALUE > --- > > Key: SPARK-25114 > URL: https://issues.apache.org/jira/browse/SPARK-25114 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo >Priority: Blocker > Labels: correctness > > It is possible for two objects to be unequal and yet we consider them as > equal within RecordBinaryComparator, if the long values are separated by > Int.MaxValue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE
[ https://issues.apache.org/jira/browse/SPARK-25114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-25114: - Priority: Blocker (was: Major) > RecordBinaryComparator may return wrong result when subtraction between two > words is divisible by Integer.MAX_VALUE > --- > > Key: SPARK-25114 > URL: https://issues.apache.org/jira/browse/SPARK-25114 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo >Priority: Blocker > Labels: correctness > > It is possible for two objects to be unequal and yet we consider them as > equal within RecordBinaryComparator, if the long values are separated by > Int.MaxValue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25114) RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE
Jiang Xingbo created SPARK-25114: Summary: RecordBinaryComparator may return wrong result when subtraction between two words is divisible by Integer.MAX_VALUE Key: SPARK-25114 URL: https://issues.apache.org/jira/browse/SPARK-25114 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0 Reporter: Jiang Xingbo It is possible for two objects to be unequal and yet we consider them as equal within RecordBinaryComparator, if the long values are separated by Int.MaxValue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25095) Python support for BarrierTaskContext
Jiang Xingbo created SPARK-25095: Summary: Python support for BarrierTaskContext Key: SPARK-25095 URL: https://issues.apache.org/jira/browse/SPARK-25095 Project: Spark Issue Type: Task Components: PySpark Affects Versions: 2.4.0 Reporter: Jiang Xingbo Enable call `BarrierTaskContext.barrier()` from python side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23207) Shuffle+Repartition on an DataFrame could lead to incorrect answers
[ https://issues.apache.org/jira/browse/SPARK-23207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16574903#comment-16574903 ] Jiang Xingbo commented on SPARK-23207: -- This affects the 2.2 and lower versions, the reason why we didn't backport the patch is that it can cause huge perf regression to `repartition()` operation, and chance to hit this correctness bug is small. cc [~smilegator][~sameerag] > Shuffle+Repartition on an DataFrame could lead to incorrect answers > --- > > Key: SPARK-23207 > URL: https://issues.apache.org/jira/browse/SPARK-23207 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo >Assignee: Jiang Xingbo >Priority: Blocker > Labels: correctness > Fix For: 2.3.0 > > > Currently shuffle repartition uses RoundRobinPartitioning, the generated > result is nondeterministic since the sequence of input rows are not > determined. > The bug can be triggered when there is a repartition call following a shuffle > (which would lead to non-deterministic row ordering), as the pattern shows > below: > upstream stage -> repartition stage -> result stage > (-> indicate a shuffle) > When one of the executors process goes down, some tasks on the repartition > stage will be retried and generate inconsistent ordering, and some tasks of > the result stage will be retried generating different data. > The following code returns 931532, instead of 100: > {code} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => > x > }.repartition(200).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { > throw new Exception("pkill -f java".!!) > } > x > } > res.distinct().count() > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25074) Implement maxNumConcurrentTasks() in MesosFineGrainedSchedulerBackend
Jiang Xingbo created SPARK-25074: Summary: Implement maxNumConcurrentTasks() in MesosFineGrainedSchedulerBackend Key: SPARK-25074 URL: https://issues.apache.org/jira/browse/SPARK-25074 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo We added a new method `maxNumConcurrentTasks()` to `SchedulerBackend` to get the max number of tasks that can be concurrent launched currently. However the method is not implemented in `MesosFineGrainedSchedulerBackend`, so submit a job containing barrier stage shall always fail fast with `MesosFineGrainedSchedulerBackend` resource manager. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25045) Make `RDDBarrier.mapParititions` similar to `RDD.mapPartitions`
Jiang Xingbo created SPARK-25045: Summary: Make `RDDBarrier.mapParititions` similar to `RDD.mapPartitions` Key: SPARK-25045 URL: https://issues.apache.org/jira/browse/SPARK-25045 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo Signature of the function passed to `RDDBarrier.mapPartitions()` is different from that of `RDD.mapPartitions`. The latter doesn’t take a TaskContext. We shall make the function signature the same to avoid confusion and misusage. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25030) SparkSubmit will not return result if the mainClass submitted creates a Timer()
Jiang Xingbo created SPARK-25030: Summary: SparkSubmit will not return result if the mainClass submitted creates a Timer() Key: SPARK-25030 URL: https://issues.apache.org/jira/browse/SPARK-25030 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.1 Reporter: Jiang Xingbo Create a Timer() in the mainClass submitted to SparkSubmit makes it unable to fetch result, it is very easy to reproduce the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569202#comment-16569202 ] Jiang Xingbo commented on SPARK-24375: -- [~mridulm80] You are right that now we are not able to identify which barrier it is until we really executed the barrier() function. We've thought hard on the issue and don't feel we can make it unless we force users to explicitly set a number in a barrier() call (actually it's not a good idea because it brings more borden to manage the code). The current decision is that we don't distinguish barrier() calls from the same task, users shall be responsible to ensure the same number of barrier() calls shall happen in all possible code branches, otherwise you may get the job hanging or a SparkException after timeout. We've added the following message to the description of `BarrierTaskContext.barrier()`, I hope these can be useful: {quote} * CAUTION! In a barrier stage, each task must have the same number of barrier() calls, in all * possible code branches. Otherwise, you may get the job hanging or a SparkException after * timeout. {quote} > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25017) Add test suite for ContextBarrierState
Jiang Xingbo created SPARK-25017: Summary: Add test suite for ContextBarrierState Key: SPARK-25017 URL: https://issues.apache.org/jira/browse/SPARK-25017 Project: Spark Issue Type: Test Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo We shall be able to add unit test to ContextBarrierState with a mocked RpcCallContext. Currently it's only covered by end-to-end test in `BarrierTaskContextSuite` -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24884) Implement regexp_extract_all
[ https://issues.apache.org/jira/browse/SPARK-24884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566752#comment-16566752 ] Jiang Xingbo commented on SPARK-24884: -- You don't need to be assigned, just prepare and submit a PR, include the JIRA number (SPARK-24884) in the title. > Implement regexp_extract_all > > > Key: SPARK-24884 > URL: https://issues.apache.org/jira/browse/SPARK-24884 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Nick Nicolini >Priority: Major > > I've recently hit many cases of regexp parsing where we need to match on > something that is always arbitrary in length; for example, a text block that > looks something like: > {code:java} > AAA:WORDS| > BBB:TEXT| > MSG:ASDF| > MSG:QWER| > ... > MSG:ZXCV|{code} > Where I need to pull out all values between "MSG:" and "|", which can occur > in each instance between 1 and n times. I cannot reliably use the existing > {{regexp_extract}} method since the number of occurrences is always > arbitrary, and while I can write a UDF to handle this it'd be great if this > was supported natively in Spark. > Perhaps we can implement something like {{regexp_extract_all}} as > [Presto|https://prestodb.io/docs/current/functions/regexp.html] and > [Pig|https://pig.apache.org/docs/latest/api/org/apache/pig/builtin/REGEX_EXTRACT_ALL.html] > have? > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24817) Implement BarrierTaskContext.barrier()
[ https://issues.apache.org/jira/browse/SPARK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566278#comment-16566278 ] Jiang Xingbo commented on SPARK-24817: -- Actually the current implementation of _barrier_ function doesn't requires communications between executors, all executors just talk to a _BarrierCoordinator_ which is in the driver. But to allow launching ML workloads we do need to enable executors to communicate with each other directly, IIUC that shall be investigated under SPARK-24724 . Maybe [~mengxr] can provide more context here. > Implement BarrierTaskContext.barrier() > -- > > Key: SPARK-24817 > URL: https://issues.apache.org/jira/browse/SPARK-24817 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > Implement BarrierTaskContext.barrier(), to support global sync between all > the tasks in a barrier stage. The global sync shall finish immediately once > all tasks in the same barrier stage reaches the same barrier. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24582) Design: Barrier execution mode
[ https://issues.apache.org/jira/browse/SPARK-24582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo resolved SPARK-24582. -- Resolution: Fixed > Design: Barrier execution mode > -- > > Key: SPARK-24582 > URL: https://issues.apache.org/jira/browse/SPARK-24582 > Project: Spark > Issue Type: Story > Components: ML, Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > [~jiangxb1987] and [~cloud_fan] outlined a design sketch in SPARK-24375, > which covers some basic scenarios. This story is for a formal design of the > barrier execution mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24581) Design: BarrierTaskContext.barrier()
[ https://issues.apache.org/jira/browse/SPARK-24581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo resolved SPARK-24581. -- Resolution: Fixed > Design: BarrierTaskContext.barrier() > > > Key: SPARK-24581 > URL: https://issues.apache.org/jira/browse/SPARK-24581 > Project: Spark > Issue Type: Story > Components: ML, Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > We need to provide a communication barrier function to users to help > coordinate tasks within a barrier stage. This is very similar to MPI_Barrier > function in MPI. This story is for its design. > > Requirements: > * Low-latency. The tasks should be unblocked soon after all tasks have > reached this barrier. The latency is more important than CPU cycles here. > * Support unlimited timeout with proper logging. For DL tasks, it might take > very long to converge, we should support unlimited timeout with proper > logging. So users know why a task is waiting. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24954) Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled
Jiang Xingbo created SPARK-24954: Summary: Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled Key: SPARK-24954 URL: https://issues.apache.org/jira/browse/SPARK-24954 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo Since we explicitly listed "Support running barrier stage with dynamic resource allocation" a Non-Goal in the design doc, we shall fail fast on job submit if running a barrier stage with dynamic resource allocation enabled, to avoid some confusing behaviors (can refer to SPARK-24942 for some examples). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24942) Improve cluster resource management with jobs containing barrier stage
[ https://issues.apache.org/jira/browse/SPARK-24942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-24942: - Target Version/s: 3.0.0 > Improve cluster resource management with jobs containing barrier stage > -- > > Key: SPARK-24942 > URL: https://issues.apache.org/jira/browse/SPARK-24942 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > https://github.com/apache/spark/pull/21758#discussion_r205652317 > We shall improve cluster resource management to address the following issues: > - With dynamic resource allocation enabled, it may happen that we acquire > some executors (but not enough to launch all the tasks in a barrier stage) > and later release them due to executor idle time expire, and then acquire > again. > - There can be deadlock with two concurrent applications. Each application > may acquire some resources, but not enough to launch all the tasks in a > barrier stage. And after hitting the idle timeout and releasing them, they > may acquire resources again, but just continually trade resources between > each other. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24941) Add RDDBarrier.coalesce() function
[ https://issues.apache.org/jira/browse/SPARK-24941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-24941: - Target Version/s: 3.0.0 > Add RDDBarrier.coalesce() function > -- > > Key: SPARK-24941 > URL: https://issues.apache.org/jira/browse/SPARK-24941 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > https://github.com/apache/spark/pull/21758#discussion_r204917245 > The number of partitions from the input data can be unexpectedly large, eg. > if you do > {code} > sc.textFile(...).barrier().mapPartitions() > {code} > The number of input partitions is based on the hdfs input splits. We shall > provide a way in RDDBarrier to enable users to specify the number of tasks in > a barrier stage. Maybe something like RDDBarrier.coalesce(numPartitions: Int) > . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24942) Improve cluster resource management with jobs containing barrier stage
Jiang Xingbo created SPARK-24942: Summary: Improve cluster resource management with jobs containing barrier stage Key: SPARK-24942 URL: https://issues.apache.org/jira/browse/SPARK-24942 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Jiang Xingbo https://github.com/apache/spark/pull/21758#discussion_r205652317 We shall improve cluster resource management to address the following issues: - With dynamic resource allocation enabled, it may happen that we acquire some executors (but not enough to launch all the tasks in a barrier stage) and later release them due to executor idle time expire, and then acquire again. - There can be deadlock with two concurrent applications. Each application may acquire some resources, but not enough to launch all the tasks in a barrier stage. And after hitting the idle timeout and releasing them, they may acquire resources again, but just continually trade resources between each other. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24941) Add RDDBarrier.coalesce() function
Jiang Xingbo created SPARK-24941: Summary: Add RDDBarrier.coalesce() function Key: SPARK-24941 URL: https://issues.apache.org/jira/browse/SPARK-24941 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Jiang Xingbo https://github.com/apache/spark/pull/21758#discussion_r204917245 The number of partitions from the input data can be unexpectedly large, eg. if you do {code} sc.textFile(...).barrier().mapPartitions() {code} The number of input partitions is based on the hdfs input splits. We shall provide a way in RDDBarrier to enable users to specify the number of tasks in a barrier stage. Maybe something like RDDBarrier.coalesce(numPartitions: Int) . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24581) Design: BarrierTaskContext.barrier()
[ https://issues.apache.org/jira/browse/SPARK-24581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554409#comment-16554409 ] Jiang Xingbo commented on SPARK-24581: -- Design doc: https://docs.google.com/document/d/1r07-vU5JTH6s1jJ6azkmK0K5it6jwpfO6b_K3mJmxR4/edit?usp=sharing > Design: BarrierTaskContext.barrier() > > > Key: SPARK-24581 > URL: https://issues.apache.org/jira/browse/SPARK-24581 > Project: Spark > Issue Type: Story > Components: ML, Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > We need to provide a communication barrier function to users to help > coordinate tasks within a barrier stage. This is very similar to MPI_Barrier > function in MPI. This story is for its design. > > Requirements: > * Low-latency. The tasks should be unblocked soon after all tasks have > reached this barrier. The latency is more important than CPU cycles here. > * Support unlimited timeout with proper logging. For DL tasks, it might take > very long to converge, we should support unlimited timeout with proper > logging. So users know why a task is waiting. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24340) Clean up non-shuffle disk block manager files following executor death
[ https://issues.apache.org/jira/browse/SPARK-24340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo resolved SPARK-24340. -- Resolution: Fixed > Clean up non-shuffle disk block manager files following executor death > -- > > Key: SPARK-24340 > URL: https://issues.apache.org/jira/browse/SPARK-24340 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > Currently we only clean up local folders on application removed, and we don't > clean up non-shuffle files, such as temp. shuffle blocks, cached > RDD/broadcast blocks, spill files, etc. and this can cause disk space leaks > when executors periodically die and are replaced. > To avoid this source of disk space leak, we can clean up executor disk store > files except for shuffle index and data files on executor finished. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24340) Clean up non-shuffle disk block manager files following executor death
[ https://issues.apache.org/jira/browse/SPARK-24340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551714#comment-16551714 ] Jiang Xingbo commented on SPARK-24340: -- Thanks~ > Clean up non-shuffle disk block manager files following executor death > -- > > Key: SPARK-24340 > URL: https://issues.apache.org/jira/browse/SPARK-24340 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > Currently we only clean up local folders on application removed, and we don't > clean up non-shuffle files, such as temp. shuffle blocks, cached > RDD/broadcast blocks, spill files, etc. and this can cause disk space leaks > when executors periodically die and are replaced. > To avoid this source of disk space leak, we can clean up executor disk store > files except for shuffle index and data files on executor finished. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24877) Ignore the task completion event from a zombie barrier task
Jiang Xingbo created SPARK-24877: Summary: Ignore the task completion event from a zombie barrier task Key: SPARK-24877 URL: https://issues.apache.org/jira/browse/SPARK-24877 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Jiang Xingbo Currently we abort the barrier stage if a zombie barrier task can't get killed to prevent data correctness issue. We can improve the behavior to let zombie barrier task continue running but not able to interact with other barrier tasks (maybe from different stage attempt) and ignore the task completion event from a zombie barrier task. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24874) Allow hybrid of both barrier tasks and regular tasks in a stage
Jiang Xingbo created SPARK-24874: Summary: Allow hybrid of both barrier tasks and regular tasks in a stage Key: SPARK-24874 URL: https://issues.apache.org/jira/browse/SPARK-24874 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Jiang Xingbo Currently we only allow barrier tasks in a barrier stage, however, consider the following query: {code} sc = new SparkContext(conf) val rdd1 = sc.parallelize(1 to 100, 10) val rdd2 = sc.parallelize(1 to 1000, 20).barrier().mapPartitions((it, ctx) => it) val rdd = rdd1.union(rdd2).mapPartitions(t => t) {code} Now it requires 30 free slots to run `rdd.collect()`. Actually, we can launch regular tasks to collect data from rdd1's partitions, they are not required to be launched together. If we can do that, we only need 20 free slots to run `rdd.collect()`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16548784#comment-16548784 ] Jiang Xingbo commented on SPARK-24375: -- {quote}Is the 'barrier' logic pluggable ? Instead of only being a global sync point. {quote} The barrier() function is quite like [MPI_Barrier|https://www.mpich.org/static/docs/v3.2.1/www/www3/MPI_Barrier.html] function in MPI, the major purpose is to provide a way to do global sync between barrier tasks. I'm not sure whether we have plan to support pluggable logic for now, do you have a case in hand that require pluggable barrier() ? {quote}Dynamic resource allocation (dra) triggers allocation of additional resources based on pending tasks - hence the comment We may add a check of total available slots before scheduling tasks from a barrier stage taskset. does not necessarily work in that context. {quote} Support running barrier stage with dynamic resource allocation is a Non-Goal here, however, we can improve the behavior to integrate better with DRA in Spark 3.0 . {quote}Currently DRA in spark uniformly allocates resources - are we envisioning changes as part of this effort to allocate heterogenous executor resources based on pending tasks (atleast initially for barrier support for gpu's) ? {quote} There is another ongoing SPIP SPARK-24615 to add accelerator-aware task scheduling for Spark, I think we shall deal with the above issue within that topic. {quote}In face of exceptions, some tasks will wait on barrier 2 and others on barrier 1 : causing issues.{quote} It's not desired behavior to catch exception thrown by TaskContext.barrier() silently. However, in case this really happens, we can detect that because we have `epoch` both in driver side and executor side, more details will go to the design doc of BarrierTaskContext.barrier() SPARK-24581 {quote}Can you elaborate more on leveraging TaskContext.localProperties ? Is it expected to be sync'ed after 'barrier' returns ? What gaurantees are we expecting to provide ?{quote} We update the localProperties in driver and in executors you shall be able to fetch the updated values through TaskContext, it should not couple with `barrier()` function. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24824) Make Spark task speculation a per-stage config
Jiang Xingbo created SPARK-24824: Summary: Make Spark task speculation a per-stage config Key: SPARK-24824 URL: https://issues.apache.org/jira/browse/SPARK-24824 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Jiang Xingbo Make Spark task speculation a per-stage config, so we can explicitly disable task speculation for a barrier stage. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24823) Cancel a job that contains barrier stage(s) if the barrier tasks don't get launched within a configured time
Jiang Xingbo created SPARK-24823: Summary: Cancel a job that contains barrier stage(s) if the barrier tasks don't get launched within a configured time Key: SPARK-24823 URL: https://issues.apache.org/jira/browse/SPARK-24823 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Jiang Xingbo Cancel a job that contains barrier stage(s) if the barrier tasks don't get launched within a configured time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24822) Python support for barrier execution mode
Jiang Xingbo created SPARK-24822: Summary: Python support for barrier execution mode Key: SPARK-24822 URL: https://issues.apache.org/jira/browse/SPARK-24822 Project: Spark Issue Type: New Feature Components: PySpark, Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo Enable launch a job containing barrier stage(s) from PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24821) Fail fast when submitted job compute on a subset of all the partitions for a barrier stage
Jiang Xingbo created SPARK-24821: Summary: Fail fast when submitted job compute on a subset of all the partitions for a barrier stage Key: SPARK-24821 URL: https://issues.apache.org/jira/browse/SPARK-24821 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo Detect SparkContext.runJob() launch a barrier stage with a subset of all the partitions, one example is the `first()` operation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24820) Fail fast when submitted job contains PartitionPruningRDD in a barrier stage
Jiang Xingbo created SPARK-24820: Summary: Fail fast when submitted job contains PartitionPruningRDD in a barrier stage Key: SPARK-24820 URL: https://issues.apache.org/jira/browse/SPARK-24820 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo Detect SparkContext.runJob() launch a barrier stage including PartitionPruningRDD. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24819) Fail fast when no enough slots to launch the barrier stage on job submitted
Jiang Xingbo created SPARK-24819: Summary: Fail fast when no enough slots to launch the barrier stage on job submitted Key: SPARK-24819 URL: https://issues.apache.org/jira/browse/SPARK-24819 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo Check all the barrier stages on job submitted, to see whether the barrier stages require more slots (to be able to launch all the barrier tasks in the same stage together) than currently active slots in the cluster. If the job requires more slots than available (both busy and free slots), fail the job on submit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24818) Ensure all the barrier tasks in the same stage are launched together
Jiang Xingbo created SPARK-24818: Summary: Ensure all the barrier tasks in the same stage are launched together Key: SPARK-24818 URL: https://issues.apache.org/jira/browse/SPARK-24818 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo When some executors/hosts are blacklisted, it may happen that only a part of the tasks in the same barrier stage can be launched. We shall detect the case and revert the allocated resource offers. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24817) Implement BarrierTaskContext.barrier()
Jiang Xingbo created SPARK-24817: Summary: Implement BarrierTaskContext.barrier() Key: SPARK-24817 URL: https://issues.apache.org/jira/browse/SPARK-24817 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo Implement BarrierTaskContext.barrier(), to support global sync between all the tasks in a barrier stage. The global sync shall finish immediately once all tasks in the same barrier stage reaches the same barrier. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24795) Implement barrier execution mode
[ https://issues.apache.org/jira/browse/SPARK-24795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-24795: - Description: Implement barrier execution mode, as described in SPARK-24582 Include all the API changes and basic implementation (except for BarrierTaskContext.barrier()) > Implement barrier execution mode > > > Key: SPARK-24795 > URL: https://issues.apache.org/jira/browse/SPARK-24795 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > Implement barrier execution mode, as described in SPARK-24582 > Include all the API changes and basic implementation (except for > BarrierTaskContext.barrier()) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24795) Implement barrier execution mode
Jiang Xingbo created SPARK-24795: Summary: Implement barrier execution mode Key: SPARK-24795 URL: https://issues.apache.org/jira/browse/SPARK-24795 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24582) Design: Barrier execution mode
[ https://issues.apache.org/jira/browse/SPARK-24582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536331#comment-16536331 ] Jiang Xingbo commented on SPARK-24582: -- Design doc: [https://docs.google.com/document/d/1GvcYR6ZFto3dOnjfLjZMtTezX0W5VYN9w1l4-tQXaZk/edit#|https://docs.google.com/document/d/1GvcYR6ZFto3dOnjfLjZMtTezX0W5VYN9w1l4-tQXaZk/edit] > Design: Barrier execution mode > -- > > Key: SPARK-24582 > URL: https://issues.apache.org/jira/browse/SPARK-24582 > Project: Spark > Issue Type: Story > Components: ML, Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > [~jiangxb1987] and [~cloud_fan] outlined a design sketch in SPARK-24375, > which covers some basic scenarios. This story is for a formal design of the > barrier execution mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24564) Add test suite for RecordBinaryComparator
Jiang Xingbo created SPARK-24564: Summary: Add test suite for RecordBinaryComparator Key: SPARK-24564 URL: https://issues.apache.org/jira/browse/SPARK-24564 Project: Spark Issue Type: Test Components: SQL Affects Versions: 2.4.0 Reporter: Jiang Xingbo -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24552) Task attempt numbers are reused when stages are retried
[ https://issues.apache.org/jira/browse/SPARK-24552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511695#comment-16511695 ] Jiang Xingbo edited comment on SPARK-24552 at 6/13/18 9:47 PM: --- IIUC stageAttemptId + taskAttemptNumber shall probably define a unique task attempt, and it carries enough information to know how many failed attempts you had previously. was (Author: jiangxb1987): IIUC stageAttemptId + taskAttemptId shall probably define a unique task attempt, and it carries enough information to know how many failed attempts you had previously. > Task attempt numbers are reused when stages are retried > --- > > Key: SPARK-24552 > URL: https://issues.apache.org/jira/browse/SPARK-24552 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Ryan Blue >Priority: Major > > When stages are retried due to shuffle failures, task attempt numbers are > reused. This causes a correctness bug in the v2 data sources write path. > Data sources (both the original and v2) pass the task attempt to writers so > that writers can use the attempt number to track and clean up data from > failed or speculative attempts. In the v2 docs for DataWriterFactory, the > attempt number's javadoc states that "Implementations can use this attempt > number to distinguish writers of different task attempts." > When two attempts of a stage use the same (partition, attempt) pair, two > tasks can create the same data and attempt to commit. The commit coordinator > prevents both from committing and will abort the attempt that finishes last. > When using the (partition, attempt) pair to track data, the aborted task may > delete data associated with the (partition, attempt) pair. If that happens, > the data for the task that committed is also deleted as well, which is a > correctness bug. > For a concrete example, I have a data source that creates files in place > named with {{part---.}}. Because these > files are written in place, both tasks create the same file and the one that > is aborted deletes the file, leading to data corruption when the file is > added to the table. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24552) Task attempt numbers are reused when stages are retried
[ https://issues.apache.org/jira/browse/SPARK-24552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511695#comment-16511695 ] Jiang Xingbo commented on SPARK-24552: -- IIUC stageAttemptId + taskAttemptId shall probably define a unique task attempt, and it carries enough information to know how many failed attempts you had previously. > Task attempt numbers are reused when stages are retried > --- > > Key: SPARK-24552 > URL: https://issues.apache.org/jira/browse/SPARK-24552 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Ryan Blue >Priority: Major > > When stages are retried due to shuffle failures, task attempt numbers are > reused. This causes a correctness bug in the v2 data sources write path. > Data sources (both the original and v2) pass the task attempt to writers so > that writers can use the attempt number to track and clean up data from > failed or speculative attempts. In the v2 docs for DataWriterFactory, the > attempt number's javadoc states that "Implementations can use this attempt > number to distinguish writers of different task attempts." > When two attempts of a stage use the same (partition, attempt) pair, two > tasks can create the same data and attempt to commit. The commit coordinator > prevents both from committing and will abort the attempt that finishes last. > When using the (partition, attempt) pair to track data, the aborted task may > delete data associated with the (partition, attempt) pair. If that happens, > the data for the task that committed is also deleted as well, which is a > correctness bug. > For a concrete example, I have a data source that creates files in place > named with {{part---.}}. Because these > files are written in place, both tasks create the same file and the one that > is aborted deletes the file, leading to data corruption when the file is > added to the table. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again
[ https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508832#comment-16508832 ] Jiang Xingbo commented on SPARK-24387: -- {quote}So I think there's a race condition that the backend may make offers before killing the executor. And since this is the only executor left, it's offered to the TaskScheduler and the retried task is scheduled to it.{quote} IIUC removing an executor due to heartbeat timeout will be treated as a SlaveLost, which shall encounter a taskFailure for each task running on that executor, and therefore blacklist the task from running again on that executor, so why can we offer the executor to the retried task again? > Heartbeat-timeout executor is added back and used again > --- > > Key: SPARK-24387 > URL: https://issues.apache.org/jira/browse/SPARK-24387 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Rui Li >Priority: Major > > In our job, when there's only one task and one executor running, the > executor's heartbeat is lost and driver decides to remove it. However, the > executor is added again and the task's retry attempt is scheduled to that > executor, almost immediately after the executor is marked as lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24492) Endless attempted task when TaskCommitDenied exception writing to S3A
[ https://issues.apache.org/jira/browse/SPARK-24492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506232#comment-16506232 ] Jiang Xingbo commented on SPARK-24492: -- It is possible that one task attempt acquired the permission to commit output, but don't finish performing commit in a while. In the mean time, another attempt of the same task (e.g. speculative run) may also ask for commit but failed with TaskCommitDenied exception. Under this case the current behavior of retrying without counting the failure into task failure count could lead to the task retries for infinity times until it get the permission to commit, this can waste a lot of resources if the task is short. Instead, I purpose to skip retry the task attempt in case of TaskCommitDenied exception, since that means another attempt is performing commit for the same task, and we can wait till it finishes (If the commit finishes successfully then nothing left to be done, if it fail then we can still retry). > Endless attempted task when TaskCommitDenied exception writing to S3A > - > > Key: SPARK-24492 > URL: https://issues.apache.org/jira/browse/SPARK-24492 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Yu-Jhe Li >Priority: Critical > Attachments: retry_stage.png, 螢幕快照 2018-05-16 上午11.10.46.png, 螢幕快照 > 2018-05-16 上午11.10.57.png > > > Hi, when we run Spark application under spark-2.2.0 on AWS spot instance and > output file to S3, some tasks endless retry and all of them failed with > TaskCommitDenied exception. This happened when we run Spark application on > some network issue instances. (it runs well on healthy spot instances) > Sorry, I can find a easy way to reproduce this issue, here's all I can > provide. > The Spark UI shows (in attachments) one task of stage 112 failed due to > FetchFailedException (it is network issue) and attempt to retry a new stage > 112 (retry 1). But in stage 112 (retry 1), all task failed due to > TaskCommitDenied exception, and keep retry (it never succeed and cause lots > of S3 requests). > On the other side, driver logs shows: > # task 123.0 in stage 112.0 failed due to FetchFailedException (network > issue cause corrupted file) > # warning message from OutputCommitCoordinator > # task 92.0 in stage 112.1 failed when writing rows > # keep retry the failed tasks, but never succeed > {noformat} > 2018-05-16 02:38:055 WARN TaskSetManager:66 - Lost task 123.0 in stage 112.0 > (TID 42909, 10.47.20.17, executor 64): FetchFailed(BlockManagerId(137, > 10.235.164.113, 60758, None), shuffleId=39, mapId=59, reduceId=123, message= > org.apache.spark.shuffle.FetchFailedException: Stream is corrupted > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191) > at > org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80) > at > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109) > at > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503647#comment-16503647 ] Jiang Xingbo commented on SPARK-24375: -- The major problem is that tasks in the same stage of a MPI workload may rely on the internal results of other parallel running folk tasks to compute the final results, thus when a task fail, other tasks in the same stage may generate incorrect result or even hang, and it seems to be straight-forward to just retry the whole stage on task failure. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489764#comment-16489764 ] Jiang Xingbo commented on SPARK-24375: -- We proposal to add new RDDBarrier and BarrierTaskContext to support barrier scheduling in Spark, it also requires to modify how the job scheduling works a bit to accommodate the new feature. *Barrier Stage*: A barrier stage doesn’t launch any of its tasks until the available slots(free CPU cores can be used to launch pending tasks) satisfies the target to launch all the tasks at the same time, and always retry the whole stage when any task(s) fail. One way to identify whether a stage is a barrier stage can be tracing the RDD that the stage runs on, if the stage contains RDDBarrier or at least one of the ancestor RDD(s) are RDDBarrier then the stage is a barrier stage, the tracing shall stop at ShuffleRDD(s). *Schedule Barrier Tasks*: Currently TaskScheduler schedule pending tasks on available slots by best effort, so normally all tasks in the same stage don’t get launched at the same time. We may add a check of total available slots before scheduling tasks from a barrier stage taskset. It is still possible that only partial tasks of a whole barrier stage taskset get launched due to task locality issues, so we have to check again before launch to ensure that all tasks in the same barrier stage get launched at the same time. If we consider scheduling several jobs at the same time(both barrier and regular jobs), it may be possible that barrier tasks are block by regular tasks(when available slots are always less than that required by a barrier stage taskset), or barrier stage taskset may block another barrier stage taskset(when a barrier stage taskset that requires less slots is prone to be scheduled earlier). Currently we don’t have a perfect solution for all these scenarios, but at least we may avoid the worst case that a huge barrier stage taskset being blocked forever on a busy cluster, using a time-based weight approach(conceptionally, a taskset that have been pending for a longer time will be assigned greater priority weight to be scheduled). *Task Barrier*: Barrier tasks shall allow users to insert sync in the middle of task execution, this can be achieved by introducing a glocal barrier operation in TaskContext, which makes the current task wait until all tasks in the same stage hit this barrier. *Task Failure*: To ensure correctness, a barrier stage always retry the whole stage when any task(s) fail. Thus, it’s quite straightforward that we shall require kill all the running tasks of a failed stage, and that also guarantees at most one taskset shall be running for each single stage(no zombie tasks). *Speculative Task*: Since we require launch all tasks in a barrier stage at the same time, there is no need to launch a speculative task for a barrier stage taskset. *Share TaskInfo*: To share informations between tasks in a barrier stage, we may update them in `TaskContext.localProperties`. *Python Support*: Expose RDDBarrier and BarrierTaskContext to pyspark. [~cloud_fan] maybe you want to give additional information I didn't cover above? (esp. PySpark) > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24340) Clean up non-shuffle disk block manager files following executor death
Jiang Xingbo created SPARK-24340: Summary: Clean up non-shuffle disk block manager files following executor death Key: SPARK-24340 URL: https://issues.apache.org/jira/browse/SPARK-24340 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo Currently we only clean up local folders on application removed, and we don't clean up non-shuffle files, such as temp. shuffle blocks, cached RDD/broadcast blocks, spill files, etc. and this can cause disk space leaks when executors periodically die and are replaced. To avoid this source of disk space leak, we can clean up executor disk store files except for shuffle index and data files on executor finished. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23881) Flaky test: JobCancellationSuite."interruptible iterator of shuffle reader"
Jiang Xingbo created SPARK-23881: Summary: Flaky test: JobCancellationSuite."interruptible iterator of shuffle reader" Key: SPARK-23881 URL: https://issues.apache.org/jira/browse/SPARK-23881 Project: Spark Issue Type: Test Components: Spark Core Affects Versions: 2.4.0 Reporter: Jiang Xingbo The test JobCancellationSuite."interruptible iterator of shuffle reader" has been flaky: *branch-2.3* * [https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/lastCompletedBuild/testReport/org.apache.spark/JobCancellationSuite/interruptible_iterator_of_shuffle_reader/] *master* * [https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/4301/testReport/junit/org.apache.spark/JobCancellationSuite/interruptible_iterator_of_shuffle_reader/] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23525) ALTER TABLE CHANGE COLUMN doesn't work for external hive table
[ https://issues.apache.org/jira/browse/SPARK-23525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378546#comment-16378546 ] Jiang Xingbo commented on SPARK-23525: -- I'm working on a fix for it, and will try to backport the fix to 2.2. > ALTER TABLE CHANGE COLUMN doesn't work for external hive table > -- > > Key: SPARK-23525 > URL: https://issues.apache.org/jira/browse/SPARK-23525 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0 >Reporter: Pavlo Skliar >Priority: Major > > {code:java} > print(spark.sql(""" > SHOW CREATE TABLE test.trends > """).collect()[0].createtab_stmt) > /// OUTPUT > CREATE EXTERNAL TABLE `test`.`trends`(`id` string COMMENT '', `metric` string > COMMENT '', `amount` bigint COMMENT '') > COMMENT '' > PARTITIONED BY (`date` string COMMENT '') > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ( > 'serialization.format' = '1' > ) > STORED AS > INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' > LOCATION 's3://x/x/' > TBLPROPERTIES ( > 'transient_lastDdlTime' = '1519729384', > 'last_modified_time' = '1519645652', > 'last_modified_by' = 'pavlo', > 'last_castor_run_ts' = '1513561658.0' > ) > spark.sql(""" > DESCRIBE test.trends > """).collect() > // OUTPUT > [Row(col_name='id', data_type='string', comment=''), > Row(col_name='metric', data_type='string', comment=''), > Row(col_name='amount', data_type='bigint', comment=''), > Row(col_name='date', data_type='string', comment=''), > Row(col_name='# Partition Information', data_type='', comment=''), > Row(col_name='# col_name', data_type='data_type', comment='comment'), > Row(col_name='date', data_type='string', comment='')] > spark.sql("""alter table test.trends change column id id string comment > 'unique identifier'""") > spark.sql(""" > DESCRIBE test.trends > """).collect() > // OUTPUT > [Row(col_name='id', data_type='string', comment=''), Row(col_name='metric', > data_type='string', comment=''), Row(col_name='amount', data_type='bigint', > comment=''), Row(col_name='date', data_type='string', comment=''), > Row(col_name='# Partition Information', data_type='', comment=''), > Row(col_name='# col_name', data_type='data_type', comment='comment'), > Row(col_name='date', data_type='string', comment='')] > {code} > The strange is that I've assigned comment to the id field from hive > successfully, and it's visible in Hue UI, but it's still not visible in from > spark, and any spark requests doesn't have effect on the comments. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23525) ALTER TABLE CHANGE COLUMN doesn't work for external hive table
[ https://issues.apache.org/jira/browse/SPARK-23525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378533#comment-16378533 ] Jiang Xingbo commented on SPARK-23525: -- Thank you for reporting this. I believe the bug is caused by: https://github.com/apache/spark/blob/8077bb04f350fd35df83ef896135c0672dc3f7b0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L613 > ALTER TABLE CHANGE COLUMN doesn't work for external hive table > -- > > Key: SPARK-23525 > URL: https://issues.apache.org/jira/browse/SPARK-23525 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0 >Reporter: Pavlo Skliar >Priority: Major > > {code:java} > print(spark.sql(""" > SHOW CREATE TABLE test.trends > """).collect()[0].createtab_stmt) > /// OUTPUT > CREATE EXTERNAL TABLE `test`.`trends`(`id` string COMMENT '', `metric` string > COMMENT '', `amount` bigint COMMENT '') > COMMENT '' > PARTITIONED BY (`date` string COMMENT '') > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ( > 'serialization.format' = '1' > ) > STORED AS > INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' > LOCATION 's3://x/x/' > TBLPROPERTIES ( > 'transient_lastDdlTime' = '1519729384', > 'last_modified_time' = '1519645652', > 'last_modified_by' = 'pavlo', > 'last_castor_run_ts' = '1513561658.0' > ) > spark.sql(""" > DESCRIBE test.trends > """).collect() > // OUTPUT > [Row(col_name='id', data_type='string', comment=''), > Row(col_name='metric', data_type='string', comment=''), > Row(col_name='amount', data_type='bigint', comment=''), > Row(col_name='date', data_type='string', comment=''), > Row(col_name='# Partition Information', data_type='', comment=''), > Row(col_name='# col_name', data_type='data_type', comment='comment'), > Row(col_name='date', data_type='string', comment='')] > spark.sql("""alter table test.trends change column id id string comment > 'unique identifier'""") > spark.sql(""" > DESCRIBE test.trends > """).collect() > // OUTPUT > [Row(col_name='id', data_type='string', comment=''), Row(col_name='metric', > data_type='string', comment=''), Row(col_name='amount', data_type='bigint', > comment=''), Row(col_name='date', data_type='string', comment=''), > Row(col_name='# Partition Information', data_type='', comment=''), > Row(col_name='# col_name', data_type='data_type', comment='comment'), > Row(col_name='date', data_type='string', comment='')] > {code} > The strange is that I've assigned comment to the id field from hive > successfully, and it's visible in Hue UI, but it's still not visible in from > spark, and any spark requests doesn't have effect on the comments. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23525) ALTER TABLE CHANGE COLUMN doesn't work for external hive table
[ https://issues.apache.org/jira/browse/SPARK-23525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-23525: - Affects Version/s: 2.3.0 Priority: Major (was: Minor) Summary: ALTER TABLE CHANGE COLUMN doesn't work for external hive table (was: Update column comment doesn't work from spark) > ALTER TABLE CHANGE COLUMN doesn't work for external hive table > -- > > Key: SPARK-23525 > URL: https://issues.apache.org/jira/browse/SPARK-23525 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0 >Reporter: Pavlo Skliar >Priority: Major > > {code:java} > print(spark.sql(""" > SHOW CREATE TABLE test.trends > """).collect()[0].createtab_stmt) > /// OUTPUT > CREATE EXTERNAL TABLE `test`.`trends`(`id` string COMMENT '', `metric` string > COMMENT '', `amount` bigint COMMENT '') > COMMENT '' > PARTITIONED BY (`date` string COMMENT '') > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ( > 'serialization.format' = '1' > ) > STORED AS > INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' > LOCATION 's3://x/x/' > TBLPROPERTIES ( > 'transient_lastDdlTime' = '1519729384', > 'last_modified_time' = '1519645652', > 'last_modified_by' = 'pavlo', > 'last_castor_run_ts' = '1513561658.0' > ) > spark.sql(""" > DESCRIBE test.trends > """).collect() > // OUTPUT > [Row(col_name='id', data_type='string', comment=''), > Row(col_name='metric', data_type='string', comment=''), > Row(col_name='amount', data_type='bigint', comment=''), > Row(col_name='date', data_type='string', comment=''), > Row(col_name='# Partition Information', data_type='', comment=''), > Row(col_name='# col_name', data_type='data_type', comment='comment'), > Row(col_name='date', data_type='string', comment='')] > spark.sql("""alter table test.trends change column id id string comment > 'unique identifier'""") > spark.sql(""" > DESCRIBE test.trends > """).collect() > // OUTPUT > [Row(col_name='id', data_type='string', comment=''), Row(col_name='metric', > data_type='string', comment=''), Row(col_name='amount', data_type='bigint', > comment=''), Row(col_name='date', data_type='string', comment=''), > Row(col_name='# Partition Information', data_type='', comment=''), > Row(col_name='# col_name', data_type='data_type', comment='comment'), > Row(col_name='date', data_type='string', comment='')] > {code} > The strange is that I've assigned comment to the id field from hive > successfully, and it's visible in Hue UI, but it's still not visible in from > spark, and any spark requests doesn't have effect on the comments. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23139) Read eventLog file with mixed encodings
[ https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356507#comment-16356507 ] Jiang Xingbo edited comment on SPARK-23139 at 2/8/18 5:25 AM: -- {quote}EventLog may contain mixed encodings such as custom exception message{quote} Could you please elaborate on how this happened? was (Author: jiangxb1987): ``` EventLog may contain mixed encodings such as custom exception message ``` Could you please elaborate on how this happened? > Read eventLog file with mixed encodings > --- > > Key: SPARK-23139 > URL: https://issues.apache.org/jira/browse/SPARK-23139 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: DENG FEI >Priority: Major > > EventLog may contain mixed encodings such as custom exception message, but > caused to replay failure. > java.nio.charset.MalformedInputException: Input length = 1 > at java.nio.charset.CoderResult.throwException(CoderResult.java:281) > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) > at java.io.InputStreamReader.read(InputStreamReader.java:184) > at java.io.BufferedReader.fill(BufferedReader.java:161) > at java.io.BufferedReader.readLine(BufferedReader.java:324) > at java.io.BufferedReader.readLine(BufferedReader.java:389) > at > scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) > at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507) > at > org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23139) Read eventLog file with mixed encodings
[ https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356507#comment-16356507 ] Jiang Xingbo commented on SPARK-23139: -- ``` EventLog may contain mixed encodings such as custom exception message ``` Could you please elaborate on how this happened? > Read eventLog file with mixed encodings > --- > > Key: SPARK-23139 > URL: https://issues.apache.org/jira/browse/SPARK-23139 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: DENG FEI >Priority: Major > > EventLog may contain mixed encodings such as custom exception message, but > caused to replay failure. > java.nio.charset.MalformedInputException: Input length = 1 > at java.nio.charset.CoderResult.throwException(CoderResult.java:281) > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) > at java.io.InputStreamReader.read(InputStreamReader.java:184) > at java.io.BufferedReader.fill(BufferedReader.java:161) > at java.io.BufferedReader.readLine(BufferedReader.java:324) > at java.io.BufferedReader.readLine(BufferedReader.java:389) > at > scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) > at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507) > at > org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23330) Spark UI SQL executions page throws NPE
Jiang Xingbo created SPARK-23330: Summary: Spark UI SQL executions page throws NPE Key: SPARK-23330 URL: https://issues.apache.org/jira/browse/SPARK-23330 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 2.3.0 Reporter: Jiang Xingbo Spark UI SQL executions page throws the following error and the page crashes: ``` HTTP ERROR 500 Problem accessing /SQL/. Reason: Server Error Caused by: java.lang.NullPointerException at scala.collection.immutable.StringOps$.length$extension(StringOps.scala:47) at scala.collection.immutable.StringOps.length(StringOps.scala:47) at scala.collection.IndexedSeqOptimized$class.isEmpty(IndexedSeqOptimized.scala:27) at scala.collection.immutable.StringOps.isEmpty(StringOps.scala:29) at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111) at scala.collection.immutable.StringOps.nonEmpty(StringOps.scala:29) at org.apache.spark.sql.execution.ui.ExecutionTable.descriptionCell(AllExecutionsPage.scala:182) at org.apache.spark.sql.execution.ui.ExecutionTable.row(AllExecutionsPage.scala:155) at org.apache.spark.sql.execution.ui.ExecutionTable$$anonfun$8.apply(AllExecutionsPage.scala:204) at org.apache.spark.sql.execution.ui.ExecutionTable$$anonfun$8.apply(AllExecutionsPage.scala:204) at org.apache.spark.ui.UIUtils$$anonfun$listingTable$2.apply(UIUtils.scala:339) at org.apache.spark.ui.UIUtils$$anonfun$listingTable$2.apply(UIUtils.scala:339) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.ui.UIUtils$.listingTable(UIUtils.scala:339) at org.apache.spark.sql.execution.ui.ExecutionTable.toNodeSeq(AllExecutionsPage.scala:203) at org.apache.spark.sql.execution.ui.AllExecutionsPage.render(AllExecutionsPage.scala:67) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82) at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90) at javax.servlet.http.HttpServlet.service(HttpServlet.java:687) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) at org.eclipse.jetty.server.Server.handle(Server.java:534) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:108) at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) at java.lang.Thread.run(Thread.java:748) ``` Seems the bug is imported by https://github.com/apache/spark/pull/19681/files#diff-a74d84702d8d47d5269e96740a55a3caR63 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h
[jira] [Created] (SPARK-23243) Shuffle+Repartition on an RDD could lead to incorrect answers
Jiang Xingbo created SPARK-23243: Summary: Shuffle+Repartition on an RDD could lead to incorrect answers Key: SPARK-23243 URL: https://issues.apache.org/jira/browse/SPARK-23243 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0, 2.1.0, 2.0.0, 1.6.0, 2.3.0 Reporter: Jiang Xingbo The RDD repartition also uses the round-robin way to distribute data, this can also cause incorrect answers on RDD workload the similar way as in https://issues.apache.org/jira/browse/SPARK-23207 The approach that fixes DataFrame.repartition() doesn't apply on the RDD repartition issue, as discussed in https://github.com/apache/spark/pull/20393#issuecomment-360912451 We track for alternative solutions for this issue in this task. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23207) Shuffle+Repartition on an DataFrame could lead to incorrect answers
[ https://issues.apache.org/jira/browse/SPARK-23207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-23207: - Summary: Shuffle+Repartition on an DataFrame could lead to incorrect answers (was: Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss) > Shuffle+Repartition on an DataFrame could lead to incorrect answers > --- > > Key: SPARK-23207 > URL: https://issues.apache.org/jira/browse/SPARK-23207 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo >Assignee: Jiang Xingbo >Priority: Blocker > > Currently shuffle repartition uses RoundRobinPartitioning, the generated > result is nondeterministic since the sequence of input rows are not > determined. > The bug can be triggered when there is a repartition call following a shuffle > (which would lead to non-deterministic row ordering), as the pattern shows > below: > upstream stage -> repartition stage -> result stage > (-> indicate a shuffle) > When one of the executors process goes down, some tasks on the repartition > stage will be retried and generate inconsistent ordering, and some tasks of > the result stage will be retried generating different data. > The following code returns 931532, instead of 100: > {code} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => > x > }.repartition(200).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { > throw new Exception("pkill -f java".!!) > } > x > } > res.distinct().count() > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23207) Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss
[ https://issues.apache.org/jira/browse/SPARK-23207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16338419#comment-16338419 ] Jiang Xingbo commented on SPARK-23207: -- I'm working on this. > Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss > --- > > Key: SPARK-23207 > URL: https://issues.apache.org/jira/browse/SPARK-23207 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo >Priority: Major > > Currently shuffle repartition uses RoundRobinPartitioning, the generated > result is nondeterministic since the sequence of input rows are not > determined. > The bug can be triggered when there is a repartition call following a shuffle > (which would lead to non-deterministic row ordering), as the pattern shows > below: > upstream stage -> repartition stage -> result stage > (-> indicate a shuffle) > When one of the executors process goes down, some tasks on the repartition > stage will be retried and generate inconsistent ordering, and some tasks of > the result stage will be retried generating different data. > The following code returns 931532, instead of 100: > {code} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => > x > }.repartition(200).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { > throw new Exception("pkill -f java".!!) > } > x > } > res.distinct().count() > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23207) Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss
Jiang Xingbo created SPARK-23207: Summary: Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss Key: SPARK-23207 URL: https://issues.apache.org/jira/browse/SPARK-23207 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined. The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below: upstream stage -> repartition stage -> result stage (-> indicate a shuffle) When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data. The following code returns 931532, instead of 100: {code} import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => x }.repartition(200).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f java".!!) } x } res.distinct().count() {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23188) Make vectorized columar reader batch size configurable
Jiang Xingbo created SPARK-23188: Summary: Make vectorized columar reader batch size configurable Key: SPARK-23188 URL: https://issues.apache.org/jira/browse/SPARK-23188 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22360) Add unit test for Window Specifications
[ https://issues.apache.org/jira/browse/SPARK-22360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16332773#comment-16332773 ] Jiang Xingbo commented on SPARK-22360: -- Created https://issues.apache.org/jira/browse/SPARK-23160 > Add unit test for Window Specifications > --- > > Key: SPARK-22360 > URL: https://issues.apache.org/jira/browse/SPARK-22360 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo >Priority: Major > > * different partition clauses (none, one, multiple) > * different order clauses (none, one, multiple, asc/desc, nulls first/last) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23160) Add more window sql tests
Jiang Xingbo created SPARK-23160: Summary: Add more window sql tests Key: SPARK-23160 URL: https://issues.apache.org/jira/browse/SPARK-23160 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo We should also cover the window sql interface, example in `sql/core/src/test/resources/sql-tests/inputs/window.sql`, it should also be funny to see whether we can generate consistent results for window tests in other major databases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22360) Add unit test for Window Specifications
[ https://issues.apache.org/jira/browse/SPARK-22360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16332762#comment-16332762 ] Jiang Xingbo commented on SPARK-22360: -- Sorry for late response. It's great that we can cover the DataFrame test cases, I really think we should have them soon. Besides, we should also cover the window sql interface, example in `sql/core/src/test/resources/sql-tests/inputs/window.sql`, it should also be funny to see whether we can generate consistent results for window tests in other major databases. [~smilegator] WDYT? > Add unit test for Window Specifications > --- > > Key: SPARK-22360 > URL: https://issues.apache.org/jira/browse/SPARK-22360 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo >Priority: Major > > * different partition clauses (none, one, multiple) > * different order clauses (none, one, multiple, asc/desc, nulls first/last) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22297) Flaky test: BlockManagerSuite "Shuffle registration timeout and maxAttempts conf"
[ https://issues.apache.org/jira/browse/SPARK-22297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307501#comment-16307501 ] Jiang Xingbo commented on SPARK-22297: -- How often do we run into this? Personally I can't repro this test failure on my local environment. > Flaky test: BlockManagerSuite "Shuffle registration timeout and maxAttempts > conf" > - > > Key: SPARK-22297 > URL: https://issues.apache.org/jira/browse/SPARK-22297 > Project: Spark > Issue Type: Bug > Components: Spark Core, Tests >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin >Priority: Minor > > Ran into this locally; the test code seems to use timeouts which generally > end up in flakiness like this. > {noformat} > [info] - SPARK-20640: Shuffle registration timeout and maxAttempts conf are > working *** FAILED *** (1 second, 203 milliseconds) > [info] "Unable to register with external shuffle server due to : > java.util.concurrent.TimeoutException: Timeout waiting for task." did not > contain "test_spark_20640_try_again" (BlockManagerSuite.scala:1370) > [info] org.scalatest.exceptions.TestFailedException: > [info] at > org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528) > [info] at > org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) > [info] at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501) > [info] at > org.apache.spark.storage.BlockManagerSuite$$anonfun$14.apply$mcV$sp(BlockManagerSuite.scala:1370) > [info] at > org.apache.spark.storage.BlockManagerSuite$$anonfun$14.apply(BlockManagerSuite.scala:1323) > [info] at > org.apache.spark.storage.BlockManagerSuite$$anonfun$14.apply(BlockManagerSuite.scala:1323) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22359) Improve the test coverage of window functions
[ https://issues.apache.org/jira/browse/SPARK-22359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16289363#comment-16289363 ] Jiang Xingbo commented on SPARK-22359: -- [~smurakozi] Please feel free to PR for this. > Improve the test coverage of window functions > - > > Key: SPARK-22359 > URL: https://issues.apache.org/jira/browse/SPARK-22359 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 2.3.0 >Reporter: Jiang Xingbo > > There are already quite a few integration tests using window functions, but > the unit tests coverage for window funtions is not ideal. > We'd like to test the following aspects: > * Specifications > ** different partition clauses (none, one, multiple) > ** different order clauses (none, one, multiple, asc/desc, nulls first/last) > * Frames and their combinations > ** OffsetWindowFunctionFrame > ** UnboundedWindowFunctionFrame > ** SlidingWindowFunctionFrame > ** UnboundedPrecedingWindowFunctionFrame > ** UnboundedFollowingWindowFunctionFrame > * Aggregate function types > ** Declarative > ** Imperative > ** UDAF > * Spilling > ** Cover the conditions that WindowExec should spill at least once -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22757) Init-container in the driver/executor pods for downloading remote dependencies
[ https://issues.apache.org/jira/browse/SPARK-22757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288856#comment-16288856 ] Jiang Xingbo commented on SPARK-22757: -- Is this also targeted to 2.3 release? > Init-container in the driver/executor pods for downloading remote dependencies > -- > > Key: SPARK-22757 > URL: https://issues.apache.org/jira/browse/SPARK-22757 > Project: Spark > Issue Type: Sub-task > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Yinan Li > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22680) SparkSQL scan all partitions when the specified partitions are not exists in parquet formatted table
[ https://issues.apache.org/jira/browse/SPARK-22680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16281235#comment-16281235 ] Jiang Xingbo commented on SPARK-22680: -- Could you also post the result of EXPLAIN? Thanks! > SparkSQL scan all partitions when the specified partitions are not exists in > parquet formatted table > > > Key: SPARK-22680 > URL: https://issues.apache.org/jira/browse/SPARK-22680 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.2.0 > Environment: spark2.0.2 spark2.2.0 >Reporter: Xiaochen Ouyang > > 1. spark-sql --master local[2] > 2. create external table test (id int,name string) partitioned by (country > string,province string, day string,hour int) stored as parquet localtion > '/warehouse/test'; > 3.produce data into table test > 4. select count(1) from test where country = '185' and province = '021' and > day = '2017-11-12' and hour = 10; if the 4 filter conditions are not exists > in HDFS and MetaStore[mysql] , this sql will scan all partitions in table test -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22363) Add unit test for Window spilling
Jiang Xingbo created SPARK-22363: Summary: Add unit test for Window spilling Key: SPARK-22363 URL: https://issues.apache.org/jira/browse/SPARK-22363 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo Cover the senarios that WindowExec should spills for at least once. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22362) Add unit test for Window Aggregate Functions
Jiang Xingbo created SPARK-22362: Summary: Add unit test for Window Aggregate Functions Key: SPARK-22362 URL: https://issues.apache.org/jira/browse/SPARK-22362 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo * Declarative * Imperative * UDAF -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22361) Add unit test for Window Frames
Jiang Xingbo created SPARK-22361: Summary: Add unit test for Window Frames Key: SPARK-22361 URL: https://issues.apache.org/jira/browse/SPARK-22361 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo * OffsetWindowFunctionFrame * UnboundedWindowFunctionFrame * SlidingWindowFunctionFrame * UnboundedPrecedingWindowFunctionFrame * UnboundedFollowingWindowFunctionFrame -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22360) Add unit test for Window Specifications
Jiang Xingbo created SPARK-22360: Summary: Add unit test for Window Specifications Key: SPARK-22360 URL: https://issues.apache.org/jira/browse/SPARK-22360 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo * different partition clauses (none, one, multiple) * different order clauses (none, one, multiple, asc/desc, nulls first/last) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22359) Improve the test coverage of window functions
Jiang Xingbo created SPARK-22359: Summary: Improve the test coverage of window functions Key: SPARK-22359 URL: https://issues.apache.org/jira/browse/SPARK-22359 Project: Spark Issue Type: Test Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo There are already quite a few integration tests using window functions, but the unit tests coverage for window funtions is not ideal. We'd like to test the following aspects: * Specifications ** different partition clauses (none, one, multiple) ** different order clauses (none, one, multiple, asc/desc, nulls first/last) * Frames and their combinations ** OffsetWindowFunctionFrame ** UnboundedWindowFunctionFrame ** SlidingWindowFunctionFrame ** UnboundedPrecedingWindowFunctionFrame ** UnboundedFollowingWindowFunctionFrame * Aggregate function types ** Declarative ** Imperative ** UDAF * Spilling ** Cover the conditions that WindowExec should spill at least once -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22214) Refactor the list hive partitions code
Jiang Xingbo created SPARK-22214: Summary: Refactor the list hive partitions code Key: SPARK-22214 URL: https://issues.apache.org/jira/browse/SPARK-22214 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Jiang Xingbo Refactor the code for list hive partitions, to make it more extensible. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21608) Window rangeBetween() API should allow literal boundary
Jiang Xingbo created SPARK-21608: Summary: Window rangeBetween() API should allow literal boundary Key: SPARK-21608 URL: https://issues.apache.org/jira/browse/SPARK-21608 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Jiang Xingbo Window rangeBetween() API should allow literal boundary, that means, the window range frame can calculate frame of double/date/timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21496) Support codegen for TakeOrderedAndProjectExec
Jiang Xingbo created SPARK-21496: Summary: Support codegen for TakeOrderedAndProjectExec Key: SPARK-21496 URL: https://issues.apache.org/jira/browse/SPARK-21496 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Jiang Xingbo Priority: Minor The operator `SortExec` supports codegen, but `TakeOrderedAndProjectExec` doesn't. Perhaps we should also add codegen support for `TakeOrderedAndProjectExec`, but we should also do benchmark for it carefully. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21410) In RangePartitioner(partitions: Int, rdd: RDD[]), RangePartitioner.numPartitions is wrong if the number of elements in RDD (rdd.count()) is less than number of partiti
[ https://issues.apache.org/jira/browse/SPARK-21410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088970#comment-16088970 ] Jiang Xingbo commented on SPARK-21410: -- This is not a bug since it doesn't generate any wrong result, but it may be a improvement we'd better to have. > In RangePartitioner(partitions: Int, rdd: RDD[]), > RangePartitioner.numPartitions is wrong if the number of elements in RDD > (rdd.count()) is less than number of partitions (partitions in constructor). > --- > > Key: SPARK-21410 > URL: https://issues.apache.org/jira/browse/SPARK-21410 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.0.0, 2.1.0, 2.2.0 >Reporter: APeng Zhang >Priority: Minor > > In RangePartitioner(partitions: Int, rdd: RDD[]), > RangePartitioner.numPartitions is wrong if the number of elements in RDD > (rdd.count()) is less than number of partitions (partitions in constructor). > Code1 to reproduce: > {code:java} > import spark.implicits._ > val ds = spark.createDataset(Seq((1, 1))) > println(ds.sort("_1").rdd.getNumPartitions) > // The output of println is 2 > {code} > Code2 to reproduce: > {code:java} > test("Number of elements in RDD is less than number of partitions") { > val rdd = sc.parallelize(1 to 3).map(x => (x, x)) > val partitioner = new RangePartitioner(22, rdd) > assert(partitioner.numPartitions === 3) > } > {code} > This test will be failed because partitioner.numPartitions is 4. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21410) In RangePartitioner(partitions: Int, rdd: RDD[]), RangePartitioner.numPartitions is wrong if the number of elements in RDD (rdd.count()) is less than number of partition
[ https://issues.apache.org/jira/browse/SPARK-21410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-21410: - Issue Type: Improvement (was: Bug) > In RangePartitioner(partitions: Int, rdd: RDD[]), > RangePartitioner.numPartitions is wrong if the number of elements in RDD > (rdd.count()) is less than number of partitions (partitions in constructor). > --- > > Key: SPARK-21410 > URL: https://issues.apache.org/jira/browse/SPARK-21410 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.0.0, 2.1.0, 2.2.0 >Reporter: APeng Zhang >Priority: Minor > > In RangePartitioner(partitions: Int, rdd: RDD[]), > RangePartitioner.numPartitions is wrong if the number of elements in RDD > (rdd.count()) is less than number of partitions (partitions in constructor). > Code1 to reproduce: > {code:java} > import spark.implicits._ > val ds = spark.createDataset(Seq((1, 1))) > println(ds.sort("_1").rdd.getNumPartitions) > // The output of println is 2 > {code} > Code2 to reproduce: > {code:java} > test("Number of elements in RDD is less than number of partitions") { > val rdd = sc.parallelize(1 to 3).map(x => (x, x)) > val partitioner = new RangePartitioner(22, rdd) > assert(partitioner.numPartitions === 3) > } > {code} > This test will be failed because partitioner.numPartitions is 4. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14151) Propose to refactor and expose Metrics Sink and Source interface
[ https://issues.apache.org/jira/browse/SPARK-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082283#comment-16082283 ] Jiang Xingbo commented on SPARK-14151: -- Since this is purposed to add a set of public API, it would make more sense to start a SPIP for the topic. [~jerryshao] Would you like to do that? > Propose to refactor and expose Metrics Sink and Source interface > > > Key: SPARK-14151 > URL: https://issues.apache.org/jira/browse/SPARK-14151 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Saisai Shao >Priority: Minor > > MetricsSystem is designed for plug-in different sources and sinks, user could > write their own sources and sinks and configured through metrics.properties, > MetricsSystem will register it through reflection. But current Source and > Sink interface is private, which means user cannot create their own sources > and sinks unless using the same package. > So here propose to expose source and sink interface, this will let user build > and maintain their own source and sink, alleviate the maintenance overhead of > spark codebase. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21349) Make TASK_SIZE_TO_WARN_KB configurable
[ https://issues.apache.org/jira/browse/SPARK-21349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081518#comment-16081518 ] Jiang Xingbo commented on SPARK-21349: -- [~dongjoon] Are you running the test for Spark SQL? Or running some user-defined RDD directly? This information should help us narrowing down the scope of the problem. Thanks! > Make TASK_SIZE_TO_WARN_KB configurable > -- > > Key: SPARK-21349 > URL: https://issues.apache.org/jira/browse/SPARK-21349 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.3, 2.2.0 >Reporter: Dongjoon Hyun >Priority: Minor > > Since Spark 1.1.0, Spark emits warning when task size exceeds a threshold, > SPARK-2185. Although this is just a warning message, this issue tries to make > `TASK_SIZE_TO_WARN_KB` into a normal Spark configuration for advanced users. > According to the Jenkins log, we also have 123 warnings even in our unit test. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21366) Add sql test for window functions
Jiang Xingbo created SPARK-21366: Summary: Add sql test for window functions Key: SPARK-21366 URL: https://issues.apache.org/jira/browse/SPARK-21366 Project: Spark Issue Type: Task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo Priority: Minor -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19451) rangeBetween method should accept Long value as boundary
[ https://issues.apache.org/jira/browse/SPARK-19451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-19451: - Summary: rangeBetween method should accept Long value as boundary (was: Long values in Window function) > rangeBetween method should accept Long value as boundary > > > Key: SPARK-19451 > URL: https://issues.apache.org/jira/browse/SPARK-19451 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.1, 2.0.2 >Reporter: Julien Champ > > Hi there, > there seems to be a major limitation in spark window functions and > rangeBetween method. > If I have the following code : > {code:title=Exemple |borderStyle=solid} > val tw = Window.orderBy("date") > .partitionBy("id") > .rangeBetween( from , 0) > {code} > Everything seems ok, while *from* value is not too large... Even if the > rangeBetween() method supports Long parameters. > But If i set *-216000L* value to *from* it does not work ! > It is probably related to this part of code in the between() method, of the > WindowSpec class, called by rangeBetween() > {code:title=between() method|borderStyle=solid} > val boundaryStart = start match { > case 0 => CurrentRow > case Long.MinValue => UnboundedPreceding > case x if x < 0 => ValuePreceding(-start.toInt) > case x if x > 0 => ValueFollowing(start.toInt) > } > {code} > ( look at this *.toInt* ) > Does anybody know it there's a way to solve / patch this behavior ? > Any help will be appreciated > Thx -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21260) Remove the unused OutputFakerExec
Jiang Xingbo created SPARK-21260: Summary: Remove the unused OutputFakerExec Key: SPARK-21260 URL: https://issues.apache.org/jira/browse/SPARK-21260 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.1.1 Reporter: Jiang Xingbo Priority: Minor OutputFakerExec was added long ago and is not used anywhere now so we should remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21225) decrease the Mem using for variable 'tasks' in function resourceOffers
[ https://issues.apache.org/jira/browse/SPARK-21225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-21225: - Issue Type: Bug (was: Improvement) > decrease the Mem using for variable 'tasks' in function resourceOffers > -- > > Key: SPARK-21225 > URL: https://issues.apache.org/jira/browse/SPARK-21225 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.1.1 >Reporter: yangZhiguo >Priority: Minor > Original Estimate: 1h > Remaining Estimate: 1h > > In the function 'resourceOffers', It declare a variable 'tasks' for > storage the tasks which have allocated a executor. It declared like this: > *{color:#d04437}val tasks = shuffledOffers.map(o => new > ArrayBuffer[TaskDescription](o.cores)){color}* > But, I think this code only conside a situation for that one task per core. > If the user config the "spark.task.cpus" as 2 or 3, It really don't need so > much space. I think It can motify as follow: > {color:#14892c}*val tasks = shuffledOffers.map(o => new > ArrayBuffer[TaskDescription](Math.ceil(o.cores*1.0/CPUS_PER_TASK).toInt))*{color} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18294) Implement commit protocol to support `mapred` package's committer
[ https://issues.apache.org/jira/browse/SPARK-18294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16047229#comment-16047229 ] Jiang Xingbo commented on SPARK-18294: -- This is actually legacy code refactoring, it shouldn't affect common user case because the old code is still valid. Could you expand on why you need this? > Implement commit protocol to support `mapred` package's committer > - > > Key: SPARK-18294 > URL: https://issues.apache.org/jira/browse/SPARK-18294 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Jiang Xingbo > > Current `FileCommitProtocol` is based on `mapreduce` package, we should > implement a `HadoopMapRedCommitProtocol` that supports the older mapred > package's commiter. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20989) Fail to start multiple workers on one host if external shuffle service is enabled in standalone mode
Jiang Xingbo created SPARK-20989: Summary: Fail to start multiple workers on one host if external shuffle service is enabled in standalone mode Key: SPARK-20989 URL: https://issues.apache.org/jira/browse/SPARK-20989 Project: Spark Issue Type: Bug Components: Deploy, Spark Core Affects Versions: 2.1.1 Reporter: Jiang Xingbo In standalone mode, if we enable external shuffle service by setting `spark.shuffle.service.enabled` to true, and then we try to start multiple workers on one host(by setting `SPARK_WORKER_INSTANCES=3` in spark-env.sh, and then run `sbin/start-slaves.sh`), we can only launch one worker on each host successfully and the rest of the workers fail to launch. The reason is the port of external shuffle service if configed by `spark.shuffle.service.port`, so currently we could start no more than one external shuffle service on each host. In our case, each worker tries to start a external shuffle service, and only one of them successed doing this. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20832) Standalone master should explicitly inform drivers of worker deaths and invalidate external shuffle service outputs
[ https://issues.apache.org/jira/browse/SPARK-20832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16029778#comment-16029778 ] Jiang Xingbo commented on SPARK-20832: -- I'm working on this. > Standalone master should explicitly inform drivers of worker deaths and > invalidate external shuffle service outputs > --- > > Key: SPARK-20832 > URL: https://issues.apache.org/jira/browse/SPARK-20832 > Project: Spark > Issue Type: Bug > Components: Deploy, Scheduler >Affects Versions: 2.0.0 >Reporter: Josh Rosen > > In SPARK-17370 (a patch authored by [~ekhliang] and reviewed by me), we added > logic to the DAGScheduler to mark external shuffle service instances as > unavailable upon task failure when the task failure reason was "SlaveLost" > and this was known to be caused by worker death. If the Spark Master > discovered that a worker was dead then it would notify any drivers with > executors on those workers to mark those executors as dead. The linked patch > simply piggybacked on this logic to have the executor death notification also > imply worker death and to have worker-death-caused-executor-death imply > shuffle file loss. > However, there are modes of external shuffle service loss which this > mechanism does not detect, leaving the system prone race conditions. Consider > the following: > * Spark standalone is configured to run an external shuffle service embedded > in the Worker. > * Application has shuffle outputs and executors on Worker A. > * Stage depending on outputs of tasks that ran on Worker A starts. > * All executors on worker A are removed due to dying with exceptions, > scaling-down via the dynamic allocation APIs, but _not_ due to worker death. > Worker A is still healthy at this point. > * At this point the MapOutputTracker still records map output locations on > Worker A's shuffle service. This is expected behavior. > * Worker A dies at an instant where the application has no executors running > on it. > * The Master knows that Worker A died but does not inform the driver (which > had no executors on that worker at the time of its death). > * Some task from the running stage attempts to fetch map outputs from Worker > A but these requests time out because Worker A's shuffle service isn't > available. > * Due to other logic in the scheduler, these preventable FetchFailures don't > wind up invaliding the now-invalid unavailable map output locations (this is > a distinct bug / behavior which I'll discuss in a separate JIRA ticket). > * This behavior leads to several unsuccessful stage reattempts and ultimately > to a job failure. > A simple way to address this would be to have the Master explicitly notify > drivers of all Worker deaths, even if those drivers don't currently have > executors. The Spark Standalone scheduler backend can receive the explicit > WorkerLost message and can bubble up the right calls to the task scheduler > and DAGScheduler to invalidate map output locations from the now-dead > external shuffle service. > This relates to SPARK-20115 in the sense that both tickets aim to address > issues where the external shuffle service is unavailable. The key difference > is the mechanism for detection: SPARK-20115 marks the external shuffle > service as unavailable whenever any fetch failure occurs from it, whereas the > proposal here relies on more explicit signals. This JIRA ticket's proposal is > scoped only to Spark Standalone mode. As a compromise, we might be able to > consider "all of a single shuffle's outputs lost on a single external shuffle > service" following a fetch failure (to be discussed in separate JIRA). -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)
[ https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16013199#comment-16013199 ] Jiang Xingbo edited comment on SPARK-20700 at 5/16/17 10:23 PM: In the previous approach we used `aliasMap` to link an `Attribute` to the expression with potentially the form `f(a, b)`, but we only searched the `expressions` and `children.expressions` for this, which is not enough when an `Alias` may lies deep in the logical plan. In that case, we can't generate the valid equivalent constraint classes and thus we fail at preventing the recursive deductions. I'll send a PR to fix this later today. was (Author: jiangxb1987): In the previous approach we used `aliasMap` to link an `Attribute` to the expression with potentially the form `f(a, b)`, but we only searched the `expressions` and `children.expressions` for this, which is not enough when an `Alias` may lies deep in the logical plan. In that case, we can't generate the valid equivalent constraint classes and thus we fail to prevent the recursive deductions. I'll send a PR to fix this later today. > InferFiltersFromConstraints stackoverflows for query (v2) > - > > Key: SPARK-20700 > URL: https://issues.apache.org/jira/browse/SPARK-20700 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Josh Rosen >Assignee: Jiang Xingbo > > The following (complicated) query eventually fails with a stack overflow > during optimization: > {code} > CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, > int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES > ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', > TIMESTAMP('2015-01-14 00:00:00.0'), '947'), > ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', > TIMESTAMP('1999-08-15 00:00:00.0'), '437'), > ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', > TIMESTAMP('1991-05-23 00:00:00.0'), '630'), > ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), > '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'), > ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS > STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'), > ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', > CAST(NULL AS TIMESTAMP), '-740'), > ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL > AS TIMESTAMP), CAST(NULL AS STRING)), > ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', > CAST(NULL AS TIMESTAMP), '181'), > ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', > TIMESTAMP('2016-06-30 00:00:00.0'), '487'), > ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS > STRING), CAST(NULL AS TIMESTAMP), '-62'); > CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null); > SELECT > AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS > float_col, > COUNT(t1.smallint_col_2) AS int_col > FROM table_5 t1 > INNER JOIN ( > SELECT > (MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * > (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) > AS boolean_col, > t2.a, > (t1.int_col_4) * (t1.int_col_4) AS int_col > FROM table_5 t1 > LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4) > WHERE > (t1.smallint_col_2) > (t1.smallint_col_2) > GROUP BY > t2.a, > (t1.int_col_4) * (t1.int_col_4) > HAVING > ((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), > SUM(t1.int_col_4)) > ) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND > ((t2.a) = (t1.smallint_col_2)); > {code} > (I haven't tried to minimize this failing case yet). > Based on sampled jstacks from the driver, it looks like the query might be > repeatedly inferring filters from constraints and then pruning those filters. > Here's part of the stack at the point where it stackoverflows: > {code} > [... repeats ...] > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tra
[jira] [Commented] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)
[ https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16013199#comment-16013199 ] Jiang Xingbo commented on SPARK-20700: -- In the previous approach we used `aliasMap` to link an `Attribute` to the expression with potentially the form `f(a, b)`, but we only searched the `expressions` and `children.expressions` for this, which is not enough when an `Alias` may lies deep in the logical plan. In that case, we can't generate the valid equivalent constraint classes and thus we fail to prevent the recursive deductions. I'll send a PR to fix this later today. > InferFiltersFromConstraints stackoverflows for query (v2) > - > > Key: SPARK-20700 > URL: https://issues.apache.org/jira/browse/SPARK-20700 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Josh Rosen >Assignee: Jiang Xingbo > > The following (complicated) query eventually fails with a stack overflow > during optimization: > {code} > CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, > int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES > ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', > TIMESTAMP('2015-01-14 00:00:00.0'), '947'), > ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', > TIMESTAMP('1999-08-15 00:00:00.0'), '437'), > ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', > TIMESTAMP('1991-05-23 00:00:00.0'), '630'), > ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), > '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'), > ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS > STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'), > ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', > CAST(NULL AS TIMESTAMP), '-740'), > ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL > AS TIMESTAMP), CAST(NULL AS STRING)), > ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', > CAST(NULL AS TIMESTAMP), '181'), > ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', > TIMESTAMP('2016-06-30 00:00:00.0'), '487'), > ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS > STRING), CAST(NULL AS TIMESTAMP), '-62'); > CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null); > SELECT > AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS > float_col, > COUNT(t1.smallint_col_2) AS int_col > FROM table_5 t1 > INNER JOIN ( > SELECT > (MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * > (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) > AS boolean_col, > t2.a, > (t1.int_col_4) * (t1.int_col_4) AS int_col > FROM table_5 t1 > LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4) > WHERE > (t1.smallint_col_2) > (t1.smallint_col_2) > GROUP BY > t2.a, > (t1.int_col_4) * (t1.int_col_4) > HAVING > ((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), > SUM(t1.int_col_4)) > ) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND > ((t2.a) = (t1.smallint_col_2)); > {code} > (I haven't tried to minimize this failing case yet). > Based on sampled jstacks from the driver, it looks like the query might be > repeatedly inferring filters from constraints and then pruning those filters. > Here's part of the stack at the point where it stackoverflows: > {code} > [... repeats ...] > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.immutable.List.flatMap(List.scala:344) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$ca
[jira] [Commented] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)
[ https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16008286#comment-16008286 ] Jiang Xingbo commented on SPARK-20700: -- I've reproduced this case, will dive further into it this weekend. > InferFiltersFromConstraints stackoverflows for query (v2) > - > > Key: SPARK-20700 > URL: https://issues.apache.org/jira/browse/SPARK-20700 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Josh Rosen > > The following (complicated) query eventually fails with a stack overflow > during optimization: > {code} > CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, > int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES > ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', > TIMESTAMP('2015-01-14 00:00:00.0'), '947'), > ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', > TIMESTAMP('1999-08-15 00:00:00.0'), '437'), > ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', > TIMESTAMP('1991-05-23 00:00:00.0'), '630'), > ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), > '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'), > ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS > STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'), > ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', > CAST(NULL AS TIMESTAMP), '-740'), > ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL > AS TIMESTAMP), CAST(NULL AS STRING)), > ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', > CAST(NULL AS TIMESTAMP), '181'), > ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', > TIMESTAMP('2016-06-30 00:00:00.0'), '487'), > ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS > STRING), CAST(NULL AS TIMESTAMP), '-62'); > CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null); > SELECT > AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS > float_col, > COUNT(t1.smallint_col_2) AS int_col > FROM table_5 t1 > INNER JOIN ( > SELECT > (MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * > (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) > AS boolean_col, > t2.a, > (t1.int_col_4) * (t1.int_col_4) AS int_col > FROM table_5 t1 > LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4) > WHERE > (t1.smallint_col_2) > (t1.smallint_col_2) > GROUP BY > t2.a, > (t1.int_col_4) * (t1.int_col_4) > HAVING > ((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), > SUM(t1.int_col_4)) > ) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND > ((t2.a) = (t1.smallint_col_2)); > {code} > (I haven't tried to minimize this failing case yet). > Based on sampled jstacks from the driver, it looks like the query might be > repeatedly inferring filters from constraints and then pruning those filters. > Here's part of the stack at the point where it stackoverflows: > {code} > [... repeats ...] > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.immutable.List.flatMap(List.scala:344) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.Traversab
[jira] [Issue Comment Deleted] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)
[ https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-20700: - Comment: was deleted (was: I couldn't reproduce the failure on current master branch, the test case I use is like the following: {code} test("SPARK-20700: InferFiltersFromConstraints stackoverflows for query") { withTempView("table_5") { withView("bools") { sql( """CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES | ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', TIMESTAMP('2015-01-14 00:00:00.0'), '947'), | ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', TIMESTAMP('1999-08-15 00:00:00.0'), '437'), | ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', TIMESTAMP('1991-05-23 00:00:00.0'), '630'), | ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'), | ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'), | ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', CAST(NULL AS TIMESTAMP), '-740'), | ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL AS TIMESTAMP), CAST(NULL AS STRING)), | ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', CAST(NULL AS TIMESTAMP), '181'), | ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', TIMESTAMP('2016-06-30 00:00:00.0'), '487'), | ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS STRING), CAST(NULL AS TIMESTAMP), '-62') """. stripMargin) sql("CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null)") sql( """ SELECT |AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS float_col, |COUNT(t1.smallint_col_2) AS int_col |FROM table_5 t1 |INNER JOIN ( |SELECT |(MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) AS boolean_col, |t2.a, |(t1.int_col_4) * (t1.int_col_4) AS int_col |FROM table_5 t1 |LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4) |WHERE |(t1.smallint_col_2) > (t1.smallint_col_2) |GROUP BY |t2.a, |(t1.int_col_4) * (t1.int_col_4) |HAVING |((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), SUM(t1.int_col_4)) |) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND ((t2.a) = (t1.smallint_col_2)) """.stripMargin) } } } {code}) > InferFiltersFromConstraints stackoverflows for query (v2) > - > > Key: SPARK-20700 > URL: https://issues.apache.org/jira/browse/SPARK-20700 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Josh Rosen > > The following (complicated) query eventually fails with a stack overflow > during optimization: > {code} > CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, > int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES > ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', > TIMESTAMP('2015-01-14 00:00:00.0'), '947'), > ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', > TIMESTAMP('1999-08-15 00:00:00.0'), '437'), > ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', > TIMESTAMP('1991-05-23 00:00:00.0'), '630'), > ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), > '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'), > ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS > STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'), > ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', > CAST(NULL AS TIMESTAMP), '-740'), > ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL > AS TIMESTAMP), CAST(NULL AS STRING)), > ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', > CAST(NULL AS TIMESTAMP), '181'), > ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', > TIMESTAMP('2016-06-30 00:00:00.0'), '487'), > ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS > STRING), CAST(NULL AS TIMESTAMP), '-62'); > CREATE VIEW bools(a, b) as
[jira] [Commented] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)
[ https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007731#comment-16007731 ] Jiang Xingbo commented on SPARK-20700: -- I couldn't reproduce the failure on current master branch, the test case I use is like the following: {code} test("SPARK-20700: InferFiltersFromConstraints stackoverflows for query") { withTempView("table_5") { withView("bools") { sql( """CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES | ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', TIMESTAMP('2015-01-14 00:00:00.0'), '947'), | ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', TIMESTAMP('1999-08-15 00:00:00.0'), '437'), | ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', TIMESTAMP('1991-05-23 00:00:00.0'), '630'), | ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'), | ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'), | ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', CAST(NULL AS TIMESTAMP), '-740'), | ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL AS TIMESTAMP), CAST(NULL AS STRING)), | ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', CAST(NULL AS TIMESTAMP), '181'), | ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', TIMESTAMP('2016-06-30 00:00:00.0'), '487'), | ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS STRING), CAST(NULL AS TIMESTAMP), '-62') """. stripMargin) sql("CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null)") sql( """ SELECT |AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS float_col, |COUNT(t1.smallint_col_2) AS int_col |FROM table_5 t1 |INNER JOIN ( |SELECT |(MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) AS boolean_col, |t2.a, |(t1.int_col_4) * (t1.int_col_4) AS int_col |FROM table_5 t1 |LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4) |WHERE |(t1.smallint_col_2) > (t1.smallint_col_2) |GROUP BY |t2.a, |(t1.int_col_4) * (t1.int_col_4) |HAVING |((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), SUM(t1.int_col_4)) |) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND ((t2.a) = (t1.smallint_col_2)) """.stripMargin) } } } {code} > InferFiltersFromConstraints stackoverflows for query (v2) > - > > Key: SPARK-20700 > URL: https://issues.apache.org/jira/browse/SPARK-20700 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Josh Rosen > > The following (complicated) query eventually fails with a stack overflow > during optimization: > {code} > CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, > int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES > ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', > TIMESTAMP('2015-01-14 00:00:00.0'), '947'), > ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', > TIMESTAMP('1999-08-15 00:00:00.0'), '437'), > ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', > TIMESTAMP('1991-05-23 00:00:00.0'), '630'), > ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), > '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'), > ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS > STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'), > ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', > CAST(NULL AS TIMESTAMP), '-740'), > ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL > AS TIMESTAMP), CAST(NULL AS STRING)), > ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', > CAST(NULL AS TIMESTAMP), '181'), > ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', > TIMESTAMP('2016-06-30 00:00:00.0'), '487'), > ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS > STRING), CAST(NULL AS TIMESTAMP), '-62'); > CR
[jira] [Commented] (SPARK-20700) InferFiltersFromConstraints stackoverflows for query (v2)
[ https://issues.apache.org/jira/browse/SPARK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006201#comment-16006201 ] Jiang Xingbo commented on SPARK-20700: -- I'm working on this, thank you![~joshrosen] > InferFiltersFromConstraints stackoverflows for query (v2) > - > > Key: SPARK-20700 > URL: https://issues.apache.org/jira/browse/SPARK-20700 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Josh Rosen > > The following (complicated) query eventually fails with a stack overflow > during optimization: > {code} > CREATE TEMPORARY VIEW table_5(varchar0002_col_1, smallint_col_2, float_col_3, > int_col_4, string_col_5, timestamp_col_6, string_col_7) AS VALUES > ('68', CAST(NULL AS SMALLINT), CAST(244.90413 AS FLOAT), -137, '571', > TIMESTAMP('2015-01-14 00:00:00.0'), '947'), > ('82', CAST(213 AS SMALLINT), CAST(53.184647 AS FLOAT), -724, '-278', > TIMESTAMP('1999-08-15 00:00:00.0'), '437'), > ('-7', CAST(-15 AS SMALLINT), CAST(NULL AS FLOAT), -890, '778', > TIMESTAMP('1991-05-23 00:00:00.0'), '630'), > ('22', CAST(676 AS SMALLINT), CAST(385.27386 AS FLOAT), CAST(NULL AS INT), > '-10', TIMESTAMP('1996-09-29 00:00:00.0'), '641'), > ('16', CAST(430 AS SMALLINT), CAST(187.23717 AS FLOAT), 989, CAST(NULL AS > STRING), TIMESTAMP('2024-04-21 00:00:00.0'), '-234'), > ('83', CAST(760 AS SMALLINT), CAST(-695.45386 AS FLOAT), -970, '330', > CAST(NULL AS TIMESTAMP), '-740'), > ('68', CAST(-930 AS SMALLINT), CAST(NULL AS FLOAT), -915, '-766', CAST(NULL > AS TIMESTAMP), CAST(NULL AS STRING)), > ('48', CAST(692 AS SMALLINT), CAST(-220.59615 AS FLOAT), 940, '-514', > CAST(NULL AS TIMESTAMP), '181'), > ('21', CAST(44 AS SMALLINT), CAST(NULL AS FLOAT), -175, '761', > TIMESTAMP('2016-06-30 00:00:00.0'), '487'), > ('50', CAST(953 AS SMALLINT), CAST(837.2948 AS FLOAT), 705, CAST(NULL AS > STRING), CAST(NULL AS TIMESTAMP), '-62'); > CREATE VIEW bools(a, b) as values (1, true), (1, true), (1, null); > SELECT > AVG(-13) OVER (ORDER BY COUNT(t1.smallint_col_2) DESC ROWS 27 PRECEDING ) AS > float_col, > COUNT(t1.smallint_col_2) AS int_col > FROM table_5 t1 > INNER JOIN ( > SELECT > (MIN(-83) OVER (PARTITION BY t2.a ORDER BY t2.a, (t1.int_col_4) * > (t1.int_col_4) ROWS BETWEEN CURRENT ROW AND 15 FOLLOWING)) NOT IN (-222, 928) > AS boolean_col, > t2.a, > (t1.int_col_4) * (t1.int_col_4) AS int_col > FROM table_5 t1 > LEFT JOIN bools t2 ON (t2.a) = (t1.int_col_4) > WHERE > (t1.smallint_col_2) > (t1.smallint_col_2) > GROUP BY > t2.a, > (t1.int_col_4) * (t1.int_col_4) > HAVING > ((t1.int_col_4) * (t1.int_col_4)) IN ((t1.int_col_4) * (t1.int_col_4), > SUM(t1.int_col_4)) > ) t2 ON (((t2.int_col) = (t1.int_col_4)) AND ((t2.a) = (t1.int_col_4))) AND > ((t2.a) = (t1.smallint_col_2)); > {code} > (I haven't tried to minimize this failing case yet). > Based on sampled jstacks from the driver, it looks like the query might be > repeatedly inferring filters from constraints and then pruning those filters. > Here's part of the stack at the point where it stackoverflows: > {code} > [... repeats ...] > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.immutable.List.flatMap(List.scala:344) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$.org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > org.apache.spark.sql.catalyst.expressions.Canonicalize$$anonfun$org$apache$spark$sql$catalyst$expressions$Canonicalize$$gatherCommutative$1.apply(Canonicalize.scala:50) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap
[jira] [Commented] (SPARK-20680) Spark-sql do not support for void column datatype of view
[ https://issues.apache.org/jira/browse/SPARK-20680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006169#comment-16006169 ] Jiang Xingbo commented on SPARK-20680: -- [~hvanhovell]Sure, I'll look at this issue. > Spark-sql do not support for void column datatype of view > - > > Key: SPARK-20680 > URL: https://issues.apache.org/jira/browse/SPARK-20680 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0, 2.1.1 >Reporter: Lantao Jin > > Create a HIVE view: > {quote} > hive> create table bad as select 1 x, null z from dual; > {quote} > Because there's no type, Hive gives it the VOID type: > {quote} > hive> describe bad; > OK > x int > z void > {quote} > In Spark2.0.x, the behaviour to read this view is normal: > {quote} > spark-sql> describe bad; > x int NULL > z voidNULL > Time taken: 4.431 seconds, Fetched 2 row(s) > {quote} > But in Spark2.1.x, it failed with SparkException: Cannot recognize hive type > string: void > {quote} > spark-sql> describe bad; > 17/05/09 03:12:08 INFO execution.SparkSqlParser: Parsing command: describe bad > 17/05/09 03:12:08 INFO parser.CatalystSqlParser: Parsing command: int > 17/05/09 03:12:08 INFO parser.CatalystSqlParser: Parsing command: void > 17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad] > org.apache.spark.SparkException: Cannot recognize hive type string: void > at > org.apache.spark.sql.hive.client.HiveClientImpl.org$apache$spark$sql$hive$client$HiveClientImpl$$fromHiveColumn(HiveClientImpl.scala:789) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365) > > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:365) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:361) > Caused by: org.apache.spark.sql.catalyst.parser.ParseException: > DataType void() is not supported.(line 1, pos 0) > == SQL == > void > ^^^ > ... 61 more > org.apache.spark.SparkException: Cannot recognize hive type string: void > {quote} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20236) Overwrite a partitioned table should only overwrite related partitions
[ https://issues.apache.org/jira/browse/SPARK-20236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958448#comment-15958448 ] Jiang Xingbo commented on SPARK-20236: -- I‘m working on this. > Overwrite a partitioned table should only overwrite related partitions > -- > > Key: SPARK-20236 > URL: https://issues.apache.org/jira/browse/SPARK-20236 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan > > When we overwrite a partitioned table, currently Spark will truncate the > entire table to write new data, or truncate a bunch of partitions according > to the given static partitions. > For example, {{INSERT OVERWRITE tbl ...}} will truncate the entire table, > {{INSERT OVERWRITE tbl PARTITION (a=1, b)}} will truncate all the partitions > that starts with {{a=1}}. > This behavior is kind of reasonable as we can know which partitions will be > overwritten before runtime. However, hive has a different behavior that it > only overwrites related partitions, e.g. {{INSERT OVERWRITE tbl SELECT > 1,2,3}} will only overwrite partition {{a=2, b=3}}, assuming {{tbl}} has only > one data column and is partitioned by {{a}} and {{b}}. > It seems better if we can follow hive's behavior. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19960) Move `SparkHadoopWriter` to `internal/io/`
Jiang Xingbo created SPARK-19960: Summary: Move `SparkHadoopWriter` to `internal/io/` Key: SPARK-19960 URL: https://issues.apache.org/jira/browse/SPARK-19960 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 2.2.0 Reporter: Jiang Xingbo We should move `SparkHadoopWriter` to `internal/io/`, that will make it easier to consolidate `SparkHadoopWriter` and `SparkHadoopMapReduceWriter`. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19877) Restrict the nested level of a view
[ https://issues.apache.org/jira/browse/SPARK-19877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xingbo updated SPARK-19877: - Summary: Restrict the nested level of a view (was: Restrict the depth of view reference chains) > Restrict the nested level of a view > --- > > Key: SPARK-19877 > URL: https://issues.apache.org/jira/browse/SPARK-19877 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jiang Xingbo > > We should restrict the depth of of view reference chains, to avoid stack > overflow exception during resolution of nested views. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19877) Restrict the depth of view reference chains
Jiang Xingbo created SPARK-19877: Summary: Restrict the depth of view reference chains Key: SPARK-19877 URL: https://issues.apache.org/jira/browse/SPARK-19877 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.2.0 Reporter: Jiang Xingbo We should restrict the depth of of view reference chains, to avoid stack overflow exception during resolution of nested views. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19877) Restrict the depth of view reference chains
[ https://issues.apache.org/jira/browse/SPARK-19877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902324#comment-15902324 ] Jiang Xingbo commented on SPARK-19877: -- I'm working on this. > Restrict the depth of view reference chains > --- > > Key: SPARK-19877 > URL: https://issues.apache.org/jira/browse/SPARK-19877 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jiang Xingbo > > We should restrict the depth of of view reference chains, to avoid stack > overflow exception during resolution of nested views. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18389) Disallow cyclic view reference
[ https://issues.apache.org/jira/browse/SPARK-18389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891575#comment-15891575 ] Jiang Xingbo commented on SPARK-18389: -- I‘ve just figure out a way to work this out, will try to submit a PR later. > Disallow cyclic view reference > -- > > Key: SPARK-18389 > URL: https://issues.apache.org/jira/browse/SPARK-18389 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin > > The following should not be allowed: > {code} > CREATE VIEW testView AS SELECT id FROM jt > CREATE VIEW testView2 AS SELECT id FROM testView > ALTER VIEW testView AS SELECT * FROM testView2 > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org