[jira] [Commented] (SPARK-21479) Outer join filter pushdown in null supplying table when condition is on one of the joined columns
[ https://issues.apache.org/jira/browse/SPARK-21479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440475#comment-16440475 ] Apache Spark commented on SPARK-21479: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/21083 > Outer join filter pushdown in null supplying table when condition is on one > of the joined columns > - > > Key: SPARK-21479 > URL: https://issues.apache.org/jira/browse/SPARK-21479 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.1.0, 2.1.1, 2.2.0 >Reporter: Abhijit Bhole >Priority: Major > > Here are two different query plans - > {code:java} > df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}]) > df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": > 5, "c" : 8}]) > df1.join(df2, ['a'], 'right_outer').where("b = 2").explain() > == Physical Plan == > *Project [a#16299L, b#16295L, c#16300L] > +- *SortMergeJoin [a#16294L], [a#16299L], Inner >:- *Sort [a#16294L ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(a#16294L, 4) >: +- *Filter ((isnotnull(b#16295L) && (b#16295L = 2)) && > isnotnull(a#16294L)) >:+- Scan ExistingRDD[a#16294L,b#16295L] >+- *Sort [a#16299L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(a#16299L, 4) > +- *Filter isnotnull(a#16299L) > +- Scan ExistingRDD[a#16299L,c#16300L] > df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}]) > df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": > 5, "c" : 8}]) > df1.join(df2, ['a'], 'right_outer').where("a = 1").explain() > == Physical Plan == > *Project [a#16314L, b#16310L, c#16315L] > +- SortMergeJoin [a#16309L], [a#16314L], RightOuter >:- *Sort [a#16309L ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(a#16309L, 4) >: +- Scan ExistingRDD[a#16309L,b#16310L] >+- *Sort [a#16314L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(a#16314L, 4) > +- *Filter (isnotnull(a#16314L) && (a#16314L = 1)) > +- Scan ExistingRDD[a#16314L,c#16315L] > {code} > If condition on b can be pushed down on df1 then why not condition on a? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23564) the optimized logical plan about Left anti join should be further optimization
[ https://issues.apache.org/jira/browse/SPARK-23564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440476#comment-16440476 ] Apache Spark commented on SPARK-23564: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/21083 > the optimized logical plan about Left anti join should be further > optimization > --- > > Key: SPARK-23564 > URL: https://issues.apache.org/jira/browse/SPARK-23564 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: KaiXinXIaoLei >Priority: Major > > The Optimized Logical Plan of the query '*select * from tt1 left anti join > tt2 on tt2.i = tt1.i*' is > > {code:java} > == Optimized Logical Plan == > Join LeftAnti, (i#2 = i#0) > :- HiveTableRelation `default`.`tt1`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [i#0, s#1] > +- Project [i#2] > +- HiveTableRelation `default`.`tt2`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [i#2, s#3] > {code} > > > this plan can be further optimization by 'Filter isnotnull' of right table, > as follow: > {code:java} > == Optimized Logical Plan == > Join LeftAnti, (i#2 = i#0) > :- HiveTableRelation `default`.`tt1`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [i#0, s#1] > +- Project [i#2] > +- Filter isnotnull(i#3) > +- HiveTableRelation `default`.`tt2`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [i#2, s#3] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15703) Make ListenerBus event queue size configurable
[ https://issues.apache.org/jira/browse/SPARK-15703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440377#comment-16440377 ] rohit verma commented on SPARK-15703: - Hi Guys, I am worried weather this issue has been resolved or not in 2.0.x/2.1.x releases, because I am using spark 2.2.0 and facing the same issue. I did not find anything documented regarding this. Thanks. > Make ListenerBus event queue size configurable > -- > > Key: SPARK-15703 > URL: https://issues.apache.org/jira/browse/SPARK-15703 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Web UI >Affects Versions: 2.0.0 >Reporter: Thomas Graves >Assignee: Dhruve Ashar >Priority: Minor > Fix For: 2.0.1, 2.1.0 > > Attachments: Screen Shot 2016-06-01 at 11.21.32 AM.png, Screen Shot > 2016-06-01 at 11.23.48 AM.png, SparkListenerBus .png, > spark-dynamic-executor-allocation.png > > > The Spark UI doesn't seem to be showing all the tasks and metrics. > I ran a job with 10 tasks but Detail stage page says it completed 93029: > Summary Metrics for 93029 Completed Tasks > The Stages for all jobs pages list that only 89519/10 tasks finished but > its completed. The metrics for shuffled write and input are also incorrect. > I will attach screen shots. > I checked the logs and it does show that all the tasks actually finished. > 16/06/01 16:15:42 INFO TaskSetManager: Finished task 59880.0 in stage 2.0 > (TID 54038) in 265309 ms on 10.213.45.51 (10/10) > 16/06/01 16:15:42 INFO YarnClusterScheduler: Removed TaskSet 2.0, whose tasks > have all completed, from pool -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23206) Additional Memory Tuning Metrics
[ https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440334#comment-16440334 ] Imran Rashid commented on SPARK-23206: -- thanks, shared doc works for me now! > Additional Memory Tuning Metrics > > > Key: SPARK-23206 > URL: https://issues.apache.org/jira/browse/SPARK-23206 > Project: Spark > Issue Type: Umbrella > Components: Spark Core >Affects Versions: 2.2.1 >Reporter: Edwina Lu >Priority: Major > Attachments: ExecutorsTab.png, ExecutorsTab2.png, > MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png > > > At LinkedIn, we have multiple clusters, running thousands of Spark > applications, and these numbers are growing rapidly. We need to ensure that > these Spark applications are well tuned – cluster resources, including > memory, should be used efficiently so that the cluster can support running > more applications concurrently, and applications should run quickly and > reliably. > Currently there is limited visibility into how much memory executors are > using, and users are guessing numbers for executor and driver memory sizing. > These estimates are often much larger than needed, leading to memory wastage. > Examining the metrics for one cluster for a month, the average percentage of > used executor memory (max JVM used memory across executors / > spark.executor.memory) is 35%, leading to an average of 591GB unused memory > per application (number of executors * (spark.executor.memory - max JVM used > memory)). Spark has multiple memory regions (user memory, execution memory, > storage memory, and overhead memory), and to understand how memory is being > used and fine-tune allocation between regions, it would be useful to have > information about how much memory is being used for the different regions. > To improve visibility into memory usage for the driver and executors and > different memory regions, the following additional memory metrics can be be > tracked for each executor and driver: > * JVM used memory: the JVM heap size for the executor/driver. > * Execution memory: memory used for computation in shuffles, joins, sorts > and aggregations. > * Storage memory: memory used caching and propagating internal data across > the cluster. > * Unified memory: sum of execution and storage memory. > The peak values for each memory metric can be tracked for each executor, and > also per stage. This information can be shown in the Spark UI and the REST > APIs. Information for peak JVM used memory can help with determining > appropriate values for spark.executor.memory and spark.driver.memory, and > information about the unified memory region can help with determining > appropriate values for spark.memory.fraction and > spark.memory.storageFraction. Stage memory information can help identify > which stages are most memory intensive, and users can look into the relevant > code to determine if it can be optimized. > The memory metrics can be gathered by adding the current JVM used memory, > execution memory and storage memory to the heartbeat. SparkListeners are > modified to collect the new metrics for the executors, stages and Spark > history log. Only interesting values (peak values per stage per executor) are > recorded in the Spark history log, to minimize the amount of additional > logging. > We have attached our design documentation with this ticket and would like to > receive feedback from the community for this proposal. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23206) Additional Memory Tuning Metrics
[ https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440326#comment-16440326 ] Edwina Lu commented on SPARK-23206: --- [~smilegator], please try: https://docs.google.com/document/d/1fIL2XMHPnqs6kaeHr822iTvs08uuYnjP5roSGZfejyA/edit?usp=sharing > Additional Memory Tuning Metrics > > > Key: SPARK-23206 > URL: https://issues.apache.org/jira/browse/SPARK-23206 > Project: Spark > Issue Type: Umbrella > Components: Spark Core >Affects Versions: 2.2.1 >Reporter: Edwina Lu >Priority: Major > Attachments: ExecutorsTab.png, ExecutorsTab2.png, > MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png > > > At LinkedIn, we have multiple clusters, running thousands of Spark > applications, and these numbers are growing rapidly. We need to ensure that > these Spark applications are well tuned – cluster resources, including > memory, should be used efficiently so that the cluster can support running > more applications concurrently, and applications should run quickly and > reliably. > Currently there is limited visibility into how much memory executors are > using, and users are guessing numbers for executor and driver memory sizing. > These estimates are often much larger than needed, leading to memory wastage. > Examining the metrics for one cluster for a month, the average percentage of > used executor memory (max JVM used memory across executors / > spark.executor.memory) is 35%, leading to an average of 591GB unused memory > per application (number of executors * (spark.executor.memory - max JVM used > memory)). Spark has multiple memory regions (user memory, execution memory, > storage memory, and overhead memory), and to understand how memory is being > used and fine-tune allocation between regions, it would be useful to have > information about how much memory is being used for the different regions. > To improve visibility into memory usage for the driver and executors and > different memory regions, the following additional memory metrics can be be > tracked for each executor and driver: > * JVM used memory: the JVM heap size for the executor/driver. > * Execution memory: memory used for computation in shuffles, joins, sorts > and aggregations. > * Storage memory: memory used caching and propagating internal data across > the cluster. > * Unified memory: sum of execution and storage memory. > The peak values for each memory metric can be tracked for each executor, and > also per stage. This information can be shown in the Spark UI and the REST > APIs. Information for peak JVM used memory can help with determining > appropriate values for spark.executor.memory and spark.driver.memory, and > information about the unified memory region can help with determining > appropriate values for spark.memory.fraction and > spark.memory.storageFraction. Stage memory information can help identify > which stages are most memory intensive, and users can look into the relevant > code to determine if it can be optimized. > The memory metrics can be gathered by adding the current JVM used memory, > execution memory and storage memory to the heartbeat. SparkListeners are > modified to collect the new metrics for the executors, stages and Spark > history log. Only interesting values (peak values per stage per executor) are > recorded in the Spark history log, to minimize the amount of additional > logging. > We have attached our design documentation with this ticket and would like to > receive feedback from the community for this proposal. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23206) Additional Memory Tuning Metrics
[ https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438879#comment-16438879 ] Edwina Lu edited comment on SPARK-23206 at 4/17/18 3:15 AM: After discussion with [~irashid] on the PR, we've decided to move ExecutorMetricsUpdate logging to stage end, to minimize the amount of extra logging. The updated design doc: https://docs.google.com/document/d/1fIL2XMHPnqs6kaeHr822iTvs08uuYnjP5roSGZfejyA/edit?usp=sharing [^SPARK-23206 Design Doc.pdf] was (Author: elu): After discussion with [~irashid] on the PR, we've decided to move ExecutorMetricsUpdate logging to stage end, to minimize the amount of extra logging. The updated design doc: [https://docs.google.com/document/d/1vLojop9I4WkpUdbrSnoHzJ6jkCMnH2Ot5JTSk7YEX5s/edit?usp=sharing|https://docs.google.com/document/d/1fIL2XMHPnqs6kaeHr822iTvs08uuYnjP5roSGZfejyA/edit?usp=sharing] [^SPARK-23206 Design Doc.pdf] > Additional Memory Tuning Metrics > > > Key: SPARK-23206 > URL: https://issues.apache.org/jira/browse/SPARK-23206 > Project: Spark > Issue Type: Umbrella > Components: Spark Core >Affects Versions: 2.2.1 >Reporter: Edwina Lu >Priority: Major > Attachments: ExecutorsTab.png, ExecutorsTab2.png, > MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png > > > At LinkedIn, we have multiple clusters, running thousands of Spark > applications, and these numbers are growing rapidly. We need to ensure that > these Spark applications are well tuned – cluster resources, including > memory, should be used efficiently so that the cluster can support running > more applications concurrently, and applications should run quickly and > reliably. > Currently there is limited visibility into how much memory executors are > using, and users are guessing numbers for executor and driver memory sizing. > These estimates are often much larger than needed, leading to memory wastage. > Examining the metrics for one cluster for a month, the average percentage of > used executor memory (max JVM used memory across executors / > spark.executor.memory) is 35%, leading to an average of 591GB unused memory > per application (number of executors * (spark.executor.memory - max JVM used > memory)). Spark has multiple memory regions (user memory, execution memory, > storage memory, and overhead memory), and to understand how memory is being > used and fine-tune allocation between regions, it would be useful to have > information about how much memory is being used for the different regions. > To improve visibility into memory usage for the driver and executors and > different memory regions, the following additional memory metrics can be be > tracked for each executor and driver: > * JVM used memory: the JVM heap size for the executor/driver. > * Execution memory: memory used for computation in shuffles, joins, sorts > and aggregations. > * Storage memory: memory used caching and propagating internal data across > the cluster. > * Unified memory: sum of execution and storage memory. > The peak values for each memory metric can be tracked for each executor, and > also per stage. This information can be shown in the Spark UI and the REST > APIs. Information for peak JVM used memory can help with determining > appropriate values for spark.executor.memory and spark.driver.memory, and > information about the unified memory region can help with determining > appropriate values for spark.memory.fraction and > spark.memory.storageFraction. Stage memory information can help identify > which stages are most memory intensive, and users can look into the relevant > code to determine if it can be optimized. > The memory metrics can be gathered by adding the current JVM used memory, > execution memory and storage memory to the heartbeat. SparkListeners are > modified to collect the new metrics for the executors, stages and Spark > history log. Only interesting values (peak values per stage per executor) are > recorded in the Spark history log, to minimize the amount of additional > logging. > We have attached our design documentation with this ticket and would like to > receive feedback from the community for this proposal. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23206) Additional Memory Tuning Metrics
[ https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438879#comment-16438879 ] Edwina Lu edited comment on SPARK-23206 at 4/17/18 3:14 AM: After discussion with [~irashid] on the PR, we've decided to move ExecutorMetricsUpdate logging to stage end, to minimize the amount of extra logging. The updated design doc: [https://docs.google.com/document/d/1vLojop9I4WkpUdbrSnoHzJ6jkCMnH2Ot5JTSk7YEX5s/edit?usp=sharing|https://docs.google.com/document/d/1fIL2XMHPnqs6kaeHr822iTvs08uuYnjP5roSGZfejyA/edit?usp=sharing] [^SPARK-23206 Design Doc.pdf] was (Author: elu): After discussion with [~irashid] on the PR, we've decided to move ExecutorMetricsUpdate logging to stage end, to minimize the amount of extra logging. The updated design doc: [https://docs.google.com/document/d/1vLojop9I4WkpUdbrSnoHzJ6jkCMnH2Ot5JTSk7YEX5s/edit?usp=sharing] [^SPARK-23206 Design Doc.pdf] > Additional Memory Tuning Metrics > > > Key: SPARK-23206 > URL: https://issues.apache.org/jira/browse/SPARK-23206 > Project: Spark > Issue Type: Umbrella > Components: Spark Core >Affects Versions: 2.2.1 >Reporter: Edwina Lu >Priority: Major > Attachments: ExecutorsTab.png, ExecutorsTab2.png, > MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png > > > At LinkedIn, we have multiple clusters, running thousands of Spark > applications, and these numbers are growing rapidly. We need to ensure that > these Spark applications are well tuned – cluster resources, including > memory, should be used efficiently so that the cluster can support running > more applications concurrently, and applications should run quickly and > reliably. > Currently there is limited visibility into how much memory executors are > using, and users are guessing numbers for executor and driver memory sizing. > These estimates are often much larger than needed, leading to memory wastage. > Examining the metrics for one cluster for a month, the average percentage of > used executor memory (max JVM used memory across executors / > spark.executor.memory) is 35%, leading to an average of 591GB unused memory > per application (number of executors * (spark.executor.memory - max JVM used > memory)). Spark has multiple memory regions (user memory, execution memory, > storage memory, and overhead memory), and to understand how memory is being > used and fine-tune allocation between regions, it would be useful to have > information about how much memory is being used for the different regions. > To improve visibility into memory usage for the driver and executors and > different memory regions, the following additional memory metrics can be be > tracked for each executor and driver: > * JVM used memory: the JVM heap size for the executor/driver. > * Execution memory: memory used for computation in shuffles, joins, sorts > and aggregations. > * Storage memory: memory used caching and propagating internal data across > the cluster. > * Unified memory: sum of execution and storage memory. > The peak values for each memory metric can be tracked for each executor, and > also per stage. This information can be shown in the Spark UI and the REST > APIs. Information for peak JVM used memory can help with determining > appropriate values for spark.executor.memory and spark.driver.memory, and > information about the unified memory region can help with determining > appropriate values for spark.memory.fraction and > spark.memory.storageFraction. Stage memory information can help identify > which stages are most memory intensive, and users can look into the relevant > code to determine if it can be optimized. > The memory metrics can be gathered by adding the current JVM used memory, > execution memory and storage memory to the heartbeat. SparkListeners are > modified to collect the new metrics for the executors, stages and Spark > history log. Only interesting values (peak values per stage per executor) are > recorded in the Spark history log, to minimize the amount of additional > logging. > We have attached our design documentation with this ticket and would like to > receive feedback from the community for this proposal. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440250#comment-16440250 ] liuxian commented on SPARK-23989: - If we make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' disable, a lot of unit tests in 'DataFrameAggregateSuite.scala' will fail > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22239: Assignee: (was: Apache Spark) > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Priority: Major > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440094#comment-16440094 ] Apache Spark commented on SPARK-22239: -- User 'icexelloss' has created a pull request for this issue: https://github.com/apache/spark/pull/21082 > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Priority: Major > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22239: Assignee: Apache Spark > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Assignee: Apache Spark >Priority: Major > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19618) Inconsistency wrt max. buckets allowed from Dataframe API vs SQL
[ https://issues.apache.org/jira/browse/SPARK-19618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440085#comment-16440085 ] Fernando Pereira commented on SPARK-19618: -- Opened [SPARK-23997|https://issues.apache.org/jira/browse/SPARK-23997] Thanks > Inconsistency wrt max. buckets allowed from Dataframe API vs SQL > > > Key: SPARK-19618 > URL: https://issues.apache.org/jira/browse/SPARK-19618 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Tejas Patil >Assignee: Tejas Patil >Priority: Major > Fix For: 2.2.0 > > > High number of buckets is allowed while creating a table via SQL query: > {code} > sparkSession.sql(""" > CREATE TABLE bucketed_table(col1 INT) USING parquet > CLUSTERED BY (col1) SORTED BY (col1) INTO 147483647 BUCKETS > """) > sparkSession.sql("DESC FORMATTED bucketed_table").collect.foreach(println) > > [Num Buckets:,147483647,] > [Bucket Columns:,[col1],] > [Sort Columns:,[col1],] > > {code} > Trying the same via dataframe API does not work: > {code} > > df.write.format("orc").bucketBy(147483647, > > "j","k").sortBy("j","k").saveAsTable("bucketed_table") > java.lang.IllegalArgumentException: requirement failed: Bucket number must be > greater than 0 and less than 10. > at scala.Predef$.require(Predef.scala:224) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$getBucketSpec$2.apply(DataFrameWriter.scala:293) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$getBucketSpec$2.apply(DataFrameWriter.scala:291) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.sql.DataFrameWriter.getBucketSpec(DataFrameWriter.scala:291) > at > org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:429) > at > org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:410) > at > org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:365) > ... 50 elided > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23997) Configurable max number of buckets
Fernando Pereira created SPARK-23997: Summary: Configurable max number of buckets Key: SPARK-23997 URL: https://issues.apache.org/jira/browse/SPARK-23997 Project: Spark Issue Type: Bug Components: Input/Output, SQL Affects Versions: 2.3.0, 2.2.1 Reporter: Fernando Pereira When exporting data as a table the user can choose to split data in buckets by choosing the columns and the number of buckets. Currently there is a hard-coded limit of 99'999 buckets. However, for heavy workloads this limit might be too restrictive, a situation that will eventually become more common as workloads grow. As per the comments in SPARK-19618 this limit could be made configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23638) Spark on k8s: spark.kubernetes.initContainer.image has no effect
[ https://issues.apache.org/jira/browse/SPARK-23638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440067#comment-16440067 ] Yinan Li commented on SPARK-23638: -- Can this be closed? > Spark on k8s: spark.kubernetes.initContainer.image has no effect > > > Key: SPARK-23638 > URL: https://issues.apache.org/jira/browse/SPARK-23638 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.3.0 > Environment: K8 server: Ubuntu 16.04 > Submission client: macOS Sierra 10.12.x > Client Version: version.Info\{Major:"1", Minor:"9", GitVersion:"v1.9.3", > GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", GitTreeState:"clean", > BuildDate:"2018-02-07T12:22:21Z", GoVersion:"go1.9.2", Compiler:"gc", > Platform:"darwin/amd64"} > Server Version: version.Info\{Major:"1", Minor:"8", GitVersion:"v1.8.3", > GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", GitTreeState:"clean", > BuildDate:"2017-11-08T18:27:48Z", GoVersion:"go1.8.3", Compiler:"gc", > Platform:"linux/amd64"} >Reporter: maheshvra >Priority: Major > > Hi all - I am trying to use initContainer to download remote dependencies. To > begin with, I ran a test with initContainer which basically "echo hello > world". However, when i triggered the pod deployment via spark-submit, I did > not see any trace of initContainer execution in my kubernetes cluster. > > {code:java} > SPARK_DRIVER_MEMORY: 1g > SPARK_DRIVER_CLASS: com.bigdata.App SPARK_DRIVER_ARGS: -c > /opt/spark/work-dir/app/main/environments/int -w > ./../../workflows/workflow_main.json -e prod -n features -v off > SPARK_DRIVER_BIND_ADDRESS: > SPARK_JAVA_OPT_0: -Dspark.submit.deployMode=cluster > SPARK_JAVA_OPT_1: -Dspark.driver.blockManager.port=7079 > SPARK_JAVA_OPT_2: -Dspark.app.name=fg-am00-raw12 > SPARK_JAVA_OPT_3: > -Dspark.kubernetes.container.image=docker.com/cmapp/fg-am00-raw:1.0.0 > SPARK_JAVA_OPT_4: -Dspark.app.id=spark-4fa9a5ce1b1d401fa9c1e413ff030d44 > SPARK_JAVA_OPT_5: > -Dspark.jars=/opt/spark/jars/aws-java-sdk-1.7.4.jar,/opt/spark/jars/hadoop-aws-2.7.3.jar,/opt/spark/jars/guava-14.0.1.jar,/opt/spark/jars/SparkApp.jar,/opt/spark/jars/datacleanup-component-1.0-SNAPSHOT.jar > > SPARK_JAVA_OPT_6: -Dspark.driver.port=7078 > SPARK_JAVA_OPT_7: > -Dspark.kubernetes.initContainer.image=docker.com/cmapp/custombusybox:1.0.0 > SPARK_JAVA_OPT_8: > -Dspark.kubernetes.executor.podNamePrefix=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615 > > SPARK_JAVA_OPT_9: > -Dspark.kubernetes.driver.pod.name=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver > > SPARK_JAVA_OPT_10: > -Dspark.driver.host=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver-svc.experimental.svc > SPARK_JAVA_OPT_11: -Dspark.executor.instances=5 > SPARK_JAVA_OPT_12: > -Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256 > SPARK_JAVA_OPT_13: -Dspark.kubernetes.namespace=experimental > SPARK_JAVA_OPT_14: > -Dspark.kubernetes.authenticate.driver.serviceAccountName=experimental-service-account > SPARK_JAVA_OPT_15: -Dspark.master=k8s://https://bigdata > {code} > > Further, I did not see spec.initContainers section in the generated pod. > Please see the details below > > {code:java} > > { > "kind": "Pod", > "apiVersion": "v1", > "metadata": { > "name": "fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver", > "namespace": "experimental", > "selfLink": > "/api/v1/namespaces/experimental/pods/fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver", > "uid": "adc5a50a-2342-11e8-87dc-12c5b3954044", > "resourceVersion": "299054", > "creationTimestamp": "2018-03-09T02:36:32Z", > "labels": { > "spark-app-selector": "spark-4fa9a5ce1b1d401fa9c1e413ff030d44", > "spark-role": "driver" > }, > "annotations": { > "spark-app-name": "fg-am00-raw12" > } > }, > "spec": { > "volumes": [ > { > "name": "experimental-service-account-token-msmth", > "secret": { > "secretName": "experimental-service-account-token-msmth", > "defaultMode": 420 > } > } > ], > "containers": [ > { > "name": "spark-kubernetes-driver", > "image": "docker.com/cmapp/fg-am00-raw:1.0.0", > "args": [ > "driver" > ], > "env": [ > { > "name": "SPARK_DRIVER_MEMORY", > "value": "1g" > }, > { > "name": "SPARK_DRIVER_CLASS", > "value": "com.myapp.App" > }, > { > "name": "SPARK_DRIVER_ARGS", > "value": "-c /opt/spark/work-dir/app/main/environments/int -w > ./../../workflows/workflow_main.json -e prod -n features -v off" > }, > { > "name": "SPARK_DRIVER_BIND_ADDRESS", > "valueFrom": { > "fieldRef": { > "apiVersion": "v1", > "fieldPath": "status.podIP" > } > } > }, > { > "name": "SPARK_MOUNTED_CLASSPATH", > "value": > "/opt/spark/jars/aws-java-sdk-1.7.4.jar:/opt/spark/jars/hadoop-aws-2.7.3.jar:/opt/spark/jars/guava-14.0.1.jar:/opt/spark/jars/datacleanup-component-1.0-SNAPSH
[jira] [Resolved] (SPARK-23873) Use accessors in interpreted LambdaVariable
[ https://issues.apache.org/jira/browse/SPARK-23873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell resolved SPARK-23873. --- Resolution: Fixed Assignee: Liang-Chi Hsieh Fix Version/s: 2.4.0 > Use accessors in interpreted LambdaVariable > --- > > Key: SPARK-23873 > URL: https://issues.apache.org/jira/browse/SPARK-23873 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > Currently, interpreted execution of {{LambdaVariable}} just uses > {{InternalRow.get}} to access element. We should use specified accessors if > possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining
[ https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439915#comment-16439915 ] Michel Davit commented on SPARK-23986: -- Thx [~mgaido]. I didn't have time to setup the environment to submit the pull request this weekend :) > CompileException when using too many avg aggregation after joining > -- > > Key: SPARK-23986 > URL: https://issues.apache.org/jira/browse/SPARK-23986 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michel Davit >Priority: Major > Attachments: spark-generated.java > > > Considering the following code: > {code:java} > val df1: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) > .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") > val df2: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, "val1", "val2"))) > .toDF("key", "dummy1", "dummy2") > val agg = df1 > .join(df2, df1("key") === df2("key"), "leftouter") > .groupBy(df1("key")) > .agg( > avg("col2").as("avg2"), > avg("col3").as("avg3"), > avg("col4").as("avg4"), > avg("col1").as("avg1"), > avg("col5").as("avg5"), > avg("col6").as("avg6") > ) > val head = agg.take(1) > {code} > This logs the following exception: > {code:java} > ERROR CodeGenerator: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 467, Column 28: Redefinition of parameter "agg_expr_11" > {code} > I am not a spark expert but after investigation, I realized that the > generated {{doConsume}} method is responsible of the exception. > Indeed, {{avg}} calls several times > {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. > The 1st time with the 'avg' Expr and a second time for the base aggregation > Expr (count and sum). > The problem comes from the generation of parameters in CodeGenerator: > {code:java} > /** >* Returns a term name that is unique within this instance of a > `CodegenContext`. >*/ > def freshName(name: String): String = synchronized { > val fullName = if (freshNamePrefix == "") { > name > } else { > s"${freshNamePrefix}_$name" > } > if (freshNameIds.contains(fullName)) { > val id = freshNameIds(fullName) > freshNameIds(fullName) = id + 1 > s"$fullName$id" > } else { > freshNameIds += fullName -> 1 > fullName > } > } > {code} > The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. > The second call is made with {{agg_expr_[1..12]}} and generates the > following names: > {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name > conflicts in the generated code: {{agg_expr_11.}} > Appending the 'id' in s"$fullName$id" to generate unique term name is source > of conflict. Maybe simply using undersoce can solve this issue : > $fullName_$id" -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23996) Implement the optimal KLL algorithms for quantiles in streams
Timothy Hunter created SPARK-23996: -- Summary: Implement the optimal KLL algorithms for quantiles in streams Key: SPARK-23996 URL: https://issues.apache.org/jira/browse/SPARK-23996 Project: Spark Issue Type: Improvement Components: MLlib, SQL Affects Versions: 2.3.0 Reporter: Timothy Hunter The current implementation for approximate quantiles - a variant of Grunwald-Khanna, which I implemented - is not the best in light of recent papers: - it is not exactly the one from the paper for performance reasons, but the changes are not documented beyond comments on the code - there are now more optimal algorithms with proven bounds (unlike q-digest, the other contender at the time) I propose that we revisit the current implementation and look at the Karnin-Lang-Liberty algorithm (KLL) for example: [https://arxiv.org/abs/1603.05346] [https://edoliberty.github.io//papers/streamingQuantiles.pdf] This algorithm seems to have favorable characteristics for streaming and a distributed implementation, and there is a python implementation for reference. It is a fairly standalone piece, and in that respect available to people who don't know too much about spark internals. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23975) Allow Clustering to take Arrays of Double as input features
[ https://issues.apache.org/jira/browse/SPARK-23975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley updated SPARK-23975: -- Shepherd: Joseph K. Bradley > Allow Clustering to take Arrays of Double as input features > --- > > Key: SPARK-23975 > URL: https://issues.apache.org/jira/browse/SPARK-23975 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.3.0 >Reporter: Lu Wang >Priority: Major > > Clustering algorithms should accept Arrays in addition to Vectors as input > features. The python interface should also be changed so that it would make > PySpark a lot easier to use. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23904) Big execution plan cause OOM
[ https://issues.apache.org/jira/browse/SPARK-23904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439815#comment-16439815 ] Ruben Berenguel commented on SPARK-23904: - I'll give it a look, maybe there is a way to avoid it being generated when it is definitely not needed. > Big execution plan cause OOM > > > Key: SPARK-23904 > URL: https://issues.apache.org/jira/browse/SPARK-23904 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Izek Greenfield >Priority: Major > Labels: SQL, query > > I create a question in > [StackOverflow|https://stackoverflow.com/questions/49508683/spark-physicalplandescription-string-is-to-big] > > Spark create the text representation of query in any case even if I don't > need it. > That causes many garbage object and unneeded GC... > [Gist with code to > reproduce|https://gist.github.com/igreenfield/584c3336f03ba7d63e9026774eaf5e23] > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23975) Allow Clustering to take Arrays of Double as input features
[ https://issues.apache.org/jira/browse/SPARK-23975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23975: Assignee: Apache Spark > Allow Clustering to take Arrays of Double as input features > --- > > Key: SPARK-23975 > URL: https://issues.apache.org/jira/browse/SPARK-23975 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.3.0 >Reporter: Lu Wang >Assignee: Apache Spark >Priority: Major > > Clustering algorithms should accept Arrays in addition to Vectors as input > features. The python interface should also be changed so that it would make > PySpark a lot easier to use. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23975) Allow Clustering to take Arrays of Double as input features
[ https://issues.apache.org/jira/browse/SPARK-23975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23975: Assignee: (was: Apache Spark) > Allow Clustering to take Arrays of Double as input features > --- > > Key: SPARK-23975 > URL: https://issues.apache.org/jira/browse/SPARK-23975 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.3.0 >Reporter: Lu Wang >Priority: Major > > Clustering algorithms should accept Arrays in addition to Vectors as input > features. The python interface should also be changed so that it would make > PySpark a lot easier to use. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23975) Allow Clustering to take Arrays of Double as input features
[ https://issues.apache.org/jira/browse/SPARK-23975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439793#comment-16439793 ] Apache Spark commented on SPARK-23975: -- User 'ludatabricks' has created a pull request for this issue: https://github.com/apache/spark/pull/21081 > Allow Clustering to take Arrays of Double as input features > --- > > Key: SPARK-23975 > URL: https://issues.apache.org/jira/browse/SPARK-23975 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.3.0 >Reporter: Lu Wang >Priority: Major > > Clustering algorithms should accept Arrays in addition to Vectors as input > features. The python interface should also be changed so that it would make > PySpark a lot easier to use. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23995) initial job has not accept any resources and executor keep exit
[ https://issues.apache.org/jira/browse/SPARK-23995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cong Shen updated SPARK-23995: -- Environment: Spark version:2.3.0 JDK version: 1.8.0_131 System: CentOS v7 {\{export JAVA_HOME=/usr/java/jdk1.8.0_144 }} {{export SPARK_MASTER_IP=IP }} {{export PYSPARK_PYTHON=/opt/anaconda3/bin/python }} {{export SPARK_WORKER_MEMORY=2g }} {{export SPARK_WORK_INSTANCES=1 }} {{export SPARK_WORkER_CORES=4 export SPARK_EXECUTOR_MEMORY=1g}} The firewalls are stopped. was: Spark version:2.3.0 JDK version: 1.8.0_131 System: CentOS v7 {{export JAVA_HOME=/usr/java/jdk1.8.0_144 }} {{export SPARK_MASTER_IP=IP export PYSPARK_PYTHON=/opt/anaconda3/bin/python export SPARK_WORKER_MEMORY=2g export SPARK_WORK_INSTANCES=1 export SPARK_WORkER_CORES=4 export SPARK_EXECUTOR_MEMORY=1g}} The firewalls are stopped. > initial job has not accept any resources and executor keep exit > --- > > Key: SPARK-23995 > URL: https://issues.apache.org/jira/browse/SPARK-23995 > Project: Spark > Issue Type: Bug > Components: Deploy >Affects Versions: 2.3.0 > Environment: Spark version:2.3.0 > JDK version: 1.8.0_131 > System: CentOS v7 > {\{export JAVA_HOME=/usr/java/jdk1.8.0_144 }} > {{export SPARK_MASTER_IP=IP }} > {{export PYSPARK_PYTHON=/opt/anaconda3/bin/python }} > {{export SPARK_WORKER_MEMORY=2g }} > {{export SPARK_WORK_INSTANCES=1 }} > {{export SPARK_WORkER_CORES=4 export SPARK_EXECUTOR_MEMORY=1g}} > > The firewalls are stopped. >Reporter: Cong Shen >Priority: Major > Labels: executor, standalone, > > I have a spark cluster using cloud resource in two instances. One as master > and one as worker. The total resource is 4 cores and 10G ram. I can start > shell, and worker can register successfully.But when I run simple code. > The error from shell is: > TaskSchedulerImpl:66 - Initial job has not accept any resources. > master log: > > {code:java} > // code placeholder > 2018-04-12 13:09:14 INFO Master:54 - Registering app Spark shell 2018-04-12 > 13:09:14 INFO Master:54 - Registered app Spark shell with ID > app-20180412130914- 2018-04-12 13:09:14 INFO Master:54 - Launching > executor app-20180412130914-/0 on worker > worker-20180411144020-192.**.**.**-44986 2018-04-12 13:11:15 INFO Master:54 - > Removing executor app-20180412130914-/0 because it is EXITED 2018-04-12 > 13:11:15 INFO Master:54 - Launching executor app-20180412130914-/1 on > worker worker-20180411144020-192.**.**.**-44986 2018-04-12 13:13:16 INFO > Master:54 - Removing executor app-20180412130914-/1 because it is EXITED > 2018-04-12 13:13:16 INFO Master:54 - Launching executor > app-20180412130914-/2 on worker worker-20180411144020-192.**.**.**-44986 > 2018-04-12 13:15:17 INFO Master:54 - Removing executor > app-20180412130914-/2 because it is EXITED 2018-04-12 13:15:17 INFO > Master:54 - Launching executor app-20180412130914-/3 on worker > worker-20180411144020-192.**.**.**-44986 2018-04-12 13:16:15 INFO Master:54 - > Removing app app-20180412130914- 2018-04-12 13:16:15 INFO Master:54 - > 192.**.**.**:39766 got disassociated, removing it. 2018-04-12 13:16:15 INFO > Master:54 - IP:39928 got disassociated, removing it. 2018-04-12 13:16:15 WARN > Master:66 - Got status update for unknown executor app-20180412130914-/3 > {code} > Worker log: > > {code:java} > // code placeholder > 2018-04-12 13:09:12 INFO Worker:54 - Asked to launch executor > app-20180412130914-/0 for Spark shell > 2018-04-12 13:09:12 INFO SecurityManager:54 - Changing view acls to: root > 2018-04-12 13:09:12 INFO SecurityManager:54 - Changing modify acls to: root > 2018-04-12 13:09:12 INFO SecurityManager:54 - Changing view acls groups to: > 2018-04-12 13:09:12 INFO SecurityManager:54 - Changing modify acls groups > to: > 2018-04-12 13:09:12 INFO SecurityManager:54 - SecurityManager: > authentication disabled; ui acls disabled; users with view permissions: > Set(root); groups with view permissions: Set(); users with modify > permissions: Set(root); groups with modify permissions: Set() > 2018-04-12 13:09:12 INFO ExecutorRunner:54 - Launch command: > "/usr/java/jdk1.8.0_144/bin/java" "-cp" > "/opt/spark-2.3.0-bin-hadoop2.7/conf/:/opt/spark-2.3.0-bin-hadoop2.7/jars/*" > "-Xmx1024M" "-Dspark.driver.port=39928" > "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" > "spark://CoarseGrainedScheduler@IP:39928" "--executor-id" "0" "--hostname" > "192.**.**.**" "--cores" "4" "--app-id" "app-20180412130914-" > "--worker-url" "spark://Worker@192.**.**.**:44986" > 2018-04-12 13:11:13 INFO Worker:54 - Executor app-20180412130914-/0 > finished with state EXITED message Command exited with code 1 exitStatus 1
[jira] [Commented] (SPARK-23030) Decrease memory consumption with toPandas() collection using Arrow
[ https://issues.apache.org/jira/browse/SPARK-23030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439727#comment-16439727 ] Bryan Cutler commented on SPARK-23030: -- Hi [~icexelloss], I have something working, just need to write it up then we can discuss on the PR. You're right though, if we want to keep collection as fast as possible, it must be fully asynchronous. Then unfortunately there is no way to avoid the worst case of having all data in the JVM driver memory. I did improve the average case and got a little speedup, so hopefully it will be worth it. I'll put up a PR soon. > Decrease memory consumption with toPandas() collection using Arrow > -- > > Key: SPARK-23030 > URL: https://issues.apache.org/jira/browse/SPARK-23030 > Project: Spark > Issue Type: Sub-task > Components: PySpark, SQL >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Major > > Currently with Arrow enabled, calling {{toPandas()}} results in a collection > of all partitions in the JVM in the form of batches of Arrow file format. > Once collected in the JVM, they are served to the Python driver process. > I believe using the Arrow stream format can help to optimize this and reduce > memory consumption in the JVM by only loading one record batch at a time > before sending it to Python. This might also reduce the latency between > making the initial call in Python and receiving the first batch of records. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23995) initial job has not accept any resources and executor keep exit
Cong Shen created SPARK-23995: - Summary: initial job has not accept any resources and executor keep exit Key: SPARK-23995 URL: https://issues.apache.org/jira/browse/SPARK-23995 Project: Spark Issue Type: Bug Components: Deploy Affects Versions: 2.3.0 Environment: Spark version:2.3.0 JDK version: 1.8.0_131 System: CentOS v7 {{export JAVA_HOME=/usr/java/jdk1.8.0_144 }} {{export SPARK_MASTER_IP=IP export PYSPARK_PYTHON=/opt/anaconda3/bin/python export SPARK_WORKER_MEMORY=2g export SPARK_WORK_INSTANCES=1 export SPARK_WORkER_CORES=4 export SPARK_EXECUTOR_MEMORY=1g}} The firewalls are stopped. Reporter: Cong Shen I have a spark cluster using cloud resource in two instances. One as master and one as worker. The total resource is 4 cores and 10G ram. I can start shell, and worker can register successfully.But when I run simple code. The error from shell is: TaskSchedulerImpl:66 - Initial job has not accept any resources. master log: {code:java} // code placeholder 2018-04-12 13:09:14 INFO Master:54 - Registering app Spark shell 2018-04-12 13:09:14 INFO Master:54 - Registered app Spark shell with ID app-20180412130914- 2018-04-12 13:09:14 INFO Master:54 - Launching executor app-20180412130914-/0 on worker worker-20180411144020-192.**.**.**-44986 2018-04-12 13:11:15 INFO Master:54 - Removing executor app-20180412130914-/0 because it is EXITED 2018-04-12 13:11:15 INFO Master:54 - Launching executor app-20180412130914-/1 on worker worker-20180411144020-192.**.**.**-44986 2018-04-12 13:13:16 INFO Master:54 - Removing executor app-20180412130914-/1 because it is EXITED 2018-04-12 13:13:16 INFO Master:54 - Launching executor app-20180412130914-/2 on worker worker-20180411144020-192.**.**.**-44986 2018-04-12 13:15:17 INFO Master:54 - Removing executor app-20180412130914-/2 because it is EXITED 2018-04-12 13:15:17 INFO Master:54 - Launching executor app-20180412130914-/3 on worker worker-20180411144020-192.**.**.**-44986 2018-04-12 13:16:15 INFO Master:54 - Removing app app-20180412130914- 2018-04-12 13:16:15 INFO Master:54 - 192.**.**.**:39766 got disassociated, removing it. 2018-04-12 13:16:15 INFO Master:54 - IP:39928 got disassociated, removing it. 2018-04-12 13:16:15 WARN Master:66 - Got status update for unknown executor app-20180412130914-/3 {code} Worker log: {code:java} // code placeholder 2018-04-12 13:09:12 INFO Worker:54 - Asked to launch executor app-20180412130914-/0 for Spark shell 2018-04-12 13:09:12 INFO SecurityManager:54 - Changing view acls to: root 2018-04-12 13:09:12 INFO SecurityManager:54 - Changing modify acls to: root 2018-04-12 13:09:12 INFO SecurityManager:54 - Changing view acls groups to: 2018-04-12 13:09:12 INFO SecurityManager:54 - Changing modify acls groups to: 2018-04-12 13:09:12 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions:Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 2018-04-12 13:09:12 INFO ExecutorRunner:54 - Launch command: "/usr/java/jdk1.8.0_144/bin/java" "-cp" "/opt/spark-2.3.0-bin-hadoop2.7/conf/:/opt/spark-2.3.0-bin-hadoop2.7/jars/*" "-Xmx1024M" "-Dspark.driver.port=39928" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@IP:39928" "--executor-id" "0" "--hostname" "192.**.**.**" "--cores" "4" "--app-id" "app-20180412130914-" "--worker-url" "spark://Worker@192.**.**.**:44986" 2018-04-12 13:11:13 INFO Worker:54 - Executor app-20180412130914-/0 finished with state EXITED message Command exited with code 1 exitStatus 1 2018-04-12 13:11:13 INFO Worker:54 - Asked to launch executor app-20180412130914-/1 for Spark shell 2018-04-12 13:11:13 INFO SecurityManager:54 - Changing view acls to: root 2018-04-12 13:11:13 INFO SecurityManager:54 - Changing modify acls to: root 2018-04-12 13:11:13 INFO SecurityManager:54 - Changing view acls groups to: 2018-04-12 13:11:13 INFO SecurityManager:54 - Changing modify acls groups to: 2018-04-12 13:11:13 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 2018-04-12 13:11:13 INFO ExecutorRunner:54 - Launch command: "/usr/java/jdk1.8.0_144/bin/java" "-cp" "/opt/spark-2.3.0-bin-hadoop2.7/conf/:/opt/spark-2.3.0-bin-hadoop2.7/jars/*" "-Xmx1024M" "-Dspark.driver.port=39928" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-master.novalocal:39928" "--executor-id" "1" "--hostname" "192.**.**.**" "--cores" "4" "--app-id" "app
[jira] [Resolved] (SPARK-21088) CrossValidator, TrainValidationSplit should collect all models when fitting: Python API
[ https://issues.apache.org/jira/browse/SPARK-21088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley resolved SPARK-21088. --- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 19627 [https://github.com/apache/spark/pull/19627] > CrossValidator, TrainValidationSplit should collect all models when fitting: > Python API > --- > > Key: SPARK-21088 > URL: https://issues.apache.org/jira/browse/SPARK-21088 > Project: Spark > Issue Type: Sub-task > Components: ML, PySpark >Affects Versions: 2.2.0 >Reporter: Joseph K. Bradley >Assignee: Weichen Xu >Priority: Major > Fix For: 2.4.0 > > > In pyspark: > We add a parameter whether to collect the full model list when > CrossValidator/TrainValidationSplit training (Default is NOT, avoid the > change cause OOM) > Add a method in CrossValidatorModel/TrainValidationSplitModel, allow user to > get the model list > CrossValidatorModelWriter add a “option”, allow user to control whether to > persist the model list to disk. > Note: when persisting the model list, use indices as the sub-model path -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21088) CrossValidator, TrainValidationSplit should collect all models when fitting: Python API
[ https://issues.apache.org/jira/browse/SPARK-21088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley reassigned SPARK-21088: - Assignee: Weichen Xu > CrossValidator, TrainValidationSplit should collect all models when fitting: > Python API > --- > > Key: SPARK-21088 > URL: https://issues.apache.org/jira/browse/SPARK-21088 > Project: Spark > Issue Type: Sub-task > Components: ML, PySpark >Affects Versions: 2.2.0 >Reporter: Joseph K. Bradley >Assignee: Weichen Xu >Priority: Major > > In pyspark: > We add a parameter whether to collect the full model list when > CrossValidator/TrainValidationSplit training (Default is NOT, avoid the > change cause OOM) > Add a method in CrossValidatorModel/TrainValidationSplitModel, allow user to > get the model list > CrossValidatorModelWriter add a “option”, allow user to control whether to > persist the model list to disk. > Note: when persisting the model list, use indices as the sub-model path -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-9312) The OneVsRest model does not provide rawPrediction
[ https://issues.apache.org/jira/browse/SPARK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley updated SPARK-9312: - Summary: The OneVsRest model does not provide rawPrediction (was: The OneVsRest model does not provide confidence factor(not probability) along with the prediction) > The OneVsRest model does not provide rawPrediction > -- > > Key: SPARK-9312 > URL: https://issues.apache.org/jira/browse/SPARK-9312 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 1.4.0, 1.4.1 >Reporter: Badari Madhav >Assignee: Lu Wang >Priority: Major > Labels: features > Fix For: 2.4.0 > > Original Estimate: 72h > Remaining Estimate: 72h > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-9312) The OneVsRest model does not provide confidence factor(not probability) along with the prediction
[ https://issues.apache.org/jira/browse/SPARK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley reassigned SPARK-9312: Assignee: Lu Wang > The OneVsRest model does not provide confidence factor(not probability) along > with the prediction > - > > Key: SPARK-9312 > URL: https://issues.apache.org/jira/browse/SPARK-9312 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 1.4.0, 1.4.1 >Reporter: Badari Madhav >Assignee: Lu Wang >Priority: Major > Labels: features > Fix For: 2.4.0 > > Original Estimate: 72h > Remaining Estimate: 72h > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-9312) The OneVsRest model does not provide confidence factor(not probability) along with the prediction
[ https://issues.apache.org/jira/browse/SPARK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley resolved SPARK-9312. -- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21044 [https://github.com/apache/spark/pull/21044] > The OneVsRest model does not provide confidence factor(not probability) along > with the prediction > - > > Key: SPARK-9312 > URL: https://issues.apache.org/jira/browse/SPARK-9312 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 1.4.0, 1.4.1 >Reporter: Badari Madhav >Assignee: Lu Wang >Priority: Major > Labels: features > Fix For: 2.4.0 > > Original Estimate: 72h > Remaining Estimate: 72h > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23981) ShuffleBlockFetcherIterator - Spamming Logs
[ https://issues.apache.org/jira/browse/SPARK-23981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439614#comment-16439614 ] BELUGA BEHR commented on SPARK-23981: - Or maybe lower per-block logging and debug and produce one over-all logging message if fetches cannot be completed. > ShuffleBlockFetcherIterator - Spamming Logs > --- > > Key: SPARK-23981 > URL: https://issues.apache.org/jira/browse/SPARK-23981 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.3.0 >Reporter: BELUGA BEHR >Priority: Major > > If a remote host shuffle service fails, Spark Executors produce a huge amount > of logging. > {code:java} > 2018-04-10 20:24:44,834 INFO [Block Fetch Retry-1] > shuffle.RetryingBlockFetcher (RetryingBlockFetcher.java:initiateRetry(163)) - > Retrying fetch (3/3) for 1753 outstanding blocks after 5000 ms > 2018-04-10 20:24:49,865 ERROR [Block Fetch Retry-1] > storage.ShuffleBlockFetcherIterator (Logging.scala:logError(95)) - Failed to > get block(s) from myhost.local:7337 > java.io.IOException: Failed to connect to myhost.local/10.11.12.13:7337 > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) > at > org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.net.ConnectException: Connection refused: > myhost.local/12.13.14.15:7337 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > ... 1 more > {code} > > We can see from the code, that if a block fetch fails, a "listener" is > updated once for each block. From the error messages previously, it can be > seen that 1753 blocks were being fetched. However, since the remote host has > become unavailable, they all fail and every block is alerted on. > > {code:java|title=RetryingBlockFetcher.java} > if (shouldRetry(e)) { > initiateRetry(); > } else { > for (String bid : blockIdsToFetch) { > listener.onBlockFetchFailure(bid, e); > } > } > {code} > {code:java|title=ShuffleBlockFetcherIterator.scala} > override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { > logError(s"Failed to get block(s) from > ${req.address.host}:${req.address.port}", e) > results.put(new FailureFetchResult(BlockId(blockId), address, e)) > } > {code} > So what we get here, is 1753 ERROR stack traces in the logging all printing > the same message: > {quote}Failed to get block(s) from myhost.local:7337 > ... > {quote} > Perhaps it would be better if the method signature {{onBlockFetchFailure}} > was changed to accept an entire Collection of block IDs instead of one-by-one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23994) Add Host To Blacklist If Shuffle Cannot Complete
BELUGA BEHR created SPARK-23994: --- Summary: Add Host To Blacklist If Shuffle Cannot Complete Key: SPARK-23994 URL: https://issues.apache.org/jira/browse/SPARK-23994 Project: Spark Issue Type: Improvement Components: Block Manager, Shuffle Affects Versions: 2.3.0 Reporter: BELUGA BEHR If a node cannot be reached for shuffling data, add the node to the blacklist and retry the current stage. {code:java} 2018-04-10 20:25:55,065 ERROR [Block Fetch Retry-3] shuffle.RetryingBlockFetcher (RetryingBlockFetcher.java:fetchAllOutstanding(142)) - Exception while beginning fetch of 711 outstanding blocks (after 3 retries) java.io.IOException: Failed to connect to host.local/10.11.12.13:7337 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) at org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.ConnectException: Connection refused: host.local/10.11.12.13:7337 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23986) CompileException when using too many avg aggregation after joining
[ https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23986: Assignee: (was: Apache Spark) > CompileException when using too many avg aggregation after joining > -- > > Key: SPARK-23986 > URL: https://issues.apache.org/jira/browse/SPARK-23986 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michel Davit >Priority: Major > Attachments: spark-generated.java > > > Considering the following code: > {code:java} > val df1: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) > .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") > val df2: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, "val1", "val2"))) > .toDF("key", "dummy1", "dummy2") > val agg = df1 > .join(df2, df1("key") === df2("key"), "leftouter") > .groupBy(df1("key")) > .agg( > avg("col2").as("avg2"), > avg("col3").as("avg3"), > avg("col4").as("avg4"), > avg("col1").as("avg1"), > avg("col5").as("avg5"), > avg("col6").as("avg6") > ) > val head = agg.take(1) > {code} > This logs the following exception: > {code:java} > ERROR CodeGenerator: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 467, Column 28: Redefinition of parameter "agg_expr_11" > {code} > I am not a spark expert but after investigation, I realized that the > generated {{doConsume}} method is responsible of the exception. > Indeed, {{avg}} calls several times > {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. > The 1st time with the 'avg' Expr and a second time for the base aggregation > Expr (count and sum). > The problem comes from the generation of parameters in CodeGenerator: > {code:java} > /** >* Returns a term name that is unique within this instance of a > `CodegenContext`. >*/ > def freshName(name: String): String = synchronized { > val fullName = if (freshNamePrefix == "") { > name > } else { > s"${freshNamePrefix}_$name" > } > if (freshNameIds.contains(fullName)) { > val id = freshNameIds(fullName) > freshNameIds(fullName) = id + 1 > s"$fullName$id" > } else { > freshNameIds += fullName -> 1 > fullName > } > } > {code} > The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. > The second call is made with {{agg_expr_[1..12]}} and generates the > following names: > {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name > conflicts in the generated code: {{agg_expr_11.}} > Appending the 'id' in s"$fullName$id" to generate unique term name is source > of conflict. Maybe simply using undersoce can solve this issue : > $fullName_$id" -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23986) CompileException when using too many avg aggregation after joining
[ https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23986: Assignee: Apache Spark > CompileException when using too many avg aggregation after joining > -- > > Key: SPARK-23986 > URL: https://issues.apache.org/jira/browse/SPARK-23986 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michel Davit >Assignee: Apache Spark >Priority: Major > Attachments: spark-generated.java > > > Considering the following code: > {code:java} > val df1: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) > .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") > val df2: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, "val1", "val2"))) > .toDF("key", "dummy1", "dummy2") > val agg = df1 > .join(df2, df1("key") === df2("key"), "leftouter") > .groupBy(df1("key")) > .agg( > avg("col2").as("avg2"), > avg("col3").as("avg3"), > avg("col4").as("avg4"), > avg("col1").as("avg1"), > avg("col5").as("avg5"), > avg("col6").as("avg6") > ) > val head = agg.take(1) > {code} > This logs the following exception: > {code:java} > ERROR CodeGenerator: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 467, Column 28: Redefinition of parameter "agg_expr_11" > {code} > I am not a spark expert but after investigation, I realized that the > generated {{doConsume}} method is responsible of the exception. > Indeed, {{avg}} calls several times > {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. > The 1st time with the 'avg' Expr and a second time for the base aggregation > Expr (count and sum). > The problem comes from the generation of parameters in CodeGenerator: > {code:java} > /** >* Returns a term name that is unique within this instance of a > `CodegenContext`. >*/ > def freshName(name: String): String = synchronized { > val fullName = if (freshNamePrefix == "") { > name > } else { > s"${freshNamePrefix}_$name" > } > if (freshNameIds.contains(fullName)) { > val id = freshNameIds(fullName) > freshNameIds(fullName) = id + 1 > s"$fullName$id" > } else { > freshNameIds += fullName -> 1 > fullName > } > } > {code} > The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. > The second call is made with {{agg_expr_[1..12]}} and generates the > following names: > {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name > conflicts in the generated code: {{agg_expr_11.}} > Appending the 'id' in s"$fullName$id" to generate unique term name is source > of conflict. Maybe simply using undersoce can solve this issue : > $fullName_$id" -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining
[ https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439417#comment-16439417 ] Apache Spark commented on SPARK-23986: -- User 'mgaido91' has created a pull request for this issue: https://github.com/apache/spark/pull/21080 > CompileException when using too many avg aggregation after joining > -- > > Key: SPARK-23986 > URL: https://issues.apache.org/jira/browse/SPARK-23986 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michel Davit >Priority: Major > Attachments: spark-generated.java > > > Considering the following code: > {code:java} > val df1: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) > .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") > val df2: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, "val1", "val2"))) > .toDF("key", "dummy1", "dummy2") > val agg = df1 > .join(df2, df1("key") === df2("key"), "leftouter") > .groupBy(df1("key")) > .agg( > avg("col2").as("avg2"), > avg("col3").as("avg3"), > avg("col4").as("avg4"), > avg("col1").as("avg1"), > avg("col5").as("avg5"), > avg("col6").as("avg6") > ) > val head = agg.take(1) > {code} > This logs the following exception: > {code:java} > ERROR CodeGenerator: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 467, Column 28: Redefinition of parameter "agg_expr_11" > {code} > I am not a spark expert but after investigation, I realized that the > generated {{doConsume}} method is responsible of the exception. > Indeed, {{avg}} calls several times > {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. > The 1st time with the 'avg' Expr and a second time for the base aggregation > Expr (count and sum). > The problem comes from the generation of parameters in CodeGenerator: > {code:java} > /** >* Returns a term name that is unique within this instance of a > `CodegenContext`. >*/ > def freshName(name: String): String = synchronized { > val fullName = if (freshNamePrefix == "") { > name > } else { > s"${freshNamePrefix}_$name" > } > if (freshNameIds.contains(fullName)) { > val id = freshNameIds(fullName) > freshNameIds(fullName) = id + 1 > s"$fullName$id" > } else { > freshNameIds += fullName -> 1 > fullName > } > } > {code} > The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. > The second call is made with {{agg_expr_[1..12]}} and generates the > following names: > {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name > conflicts in the generated code: {{agg_expr_11.}} > Appending the 'id' in s"$fullName$id" to generate unique term name is source > of conflict. Maybe simply using undersoce can solve this issue : > $fullName_$id" -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining
[ https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439398#comment-16439398 ] Marco Gaido commented on SPARK-23986: - [~RustedBones] I was able to reproduce. Yes, I do agree with you in all your analysis and also with your proposal of solution. I am submitting a patch. Thanks for reporting this. > CompileException when using too many avg aggregation after joining > -- > > Key: SPARK-23986 > URL: https://issues.apache.org/jira/browse/SPARK-23986 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michel Davit >Priority: Major > Attachments: spark-generated.java > > > Considering the following code: > {code:java} > val df1: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) > .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") > val df2: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, "val1", "val2"))) > .toDF("key", "dummy1", "dummy2") > val agg = df1 > .join(df2, df1("key") === df2("key"), "leftouter") > .groupBy(df1("key")) > .agg( > avg("col2").as("avg2"), > avg("col3").as("avg3"), > avg("col4").as("avg4"), > avg("col1").as("avg1"), > avg("col5").as("avg5"), > avg("col6").as("avg6") > ) > val head = agg.take(1) > {code} > This logs the following exception: > {code:java} > ERROR CodeGenerator: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 467, Column 28: Redefinition of parameter "agg_expr_11" > {code} > I am not a spark expert but after investigation, I realized that the > generated {{doConsume}} method is responsible of the exception. > Indeed, {{avg}} calls several times > {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. > The 1st time with the 'avg' Expr and a second time for the base aggregation > Expr (count and sum). > The problem comes from the generation of parameters in CodeGenerator: > {code:java} > /** >* Returns a term name that is unique within this instance of a > `CodegenContext`. >*/ > def freshName(name: String): String = synchronized { > val fullName = if (freshNamePrefix == "") { > name > } else { > s"${freshNamePrefix}_$name" > } > if (freshNameIds.contains(fullName)) { > val id = freshNameIds(fullName) > freshNameIds(fullName) = id + 1 > s"$fullName$id" > } else { > freshNameIds += fullName -> 1 > fullName > } > } > {code} > The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. > The second call is made with {{agg_expr_[1..12]}} and generates the > following names: > {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name > conflicts in the generated code: {{agg_expr_11.}} > Appending the 'id' in s"$fullName$id" to generate unique term name is source > of conflict. Maybe simply using undersoce can solve this issue : > $fullName_$id" -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439384#comment-16439384 ] Wenchen Fan commented on SPARK-23989: - do you have an end-to-end case to show this bug? IIRC we always copy the unsafe row before sending it to something like `SortShuffleWriter`. > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23993) Support DESC FORMATTED table_name column_name
Volodymyr Glushak created SPARK-23993: - Summary: Support DESC FORMATTED table_name column_name Key: SPARK-23993 URL: https://issues.apache.org/jira/browse/SPARK-23993 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.1.2 Reporter: Volodymyr Glushak Hive and Spark both supports: {code} DESC FORMATTED table_name{code} which gives table metadata. If you want to get metadata for particular column in hive you can execute: {code} DESC FORMATTED table_name column_name{code} Thos is not supported in Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23901) Data Masking Functions
[ https://issues.apache.org/jira/browse/SPARK-23901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16437401#comment-16437401 ] Marco Gaido edited comment on SPARK-23901 at 4/16/18 12:19 PM: --- Actually I am facing some issues in the implementation. I have a couple of questions: 1 - In the mask function Hive accepts only constant values as parameters (other than the main string to replace). Shall we enforce this in Spark too? 2 - Despite in the documentation these methods are said to accept strings as parameters, actually they allow basically any type and any type is considered differently. Shall we reproduce the same Hive behavior or shall we support only String? 3 - Moreover, and this is connected with point 2, Hive accepts many more parameters than the ones in the documentation, shall we support them too? was (Author: mgaido): Actually I am facing some issues in the implementation. I have a couple of questions: 1 - In the mask function Hive accepts only constant values as parameters (other than the main string to replace). Shall we enforce this in Spark too? 2 - Despite in the documentation these methods are said to accept strings as parameters, actually they allow basically any type and any type is considered differently. Shall we reproduce the same Hive behavior or shall we support only String? > Data Masking Functions > -- > > Key: SPARK-23901 > URL: https://issues.apache.org/jira/browse/SPARK-23901 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Priority: Major > > - mask() > - mask_first_n() > - mask_last_n() > - mask_hash() > - mask_show_first_n() > - mask_show_last_n() > Reference: > [1] > [https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DataMaskingFunctions] > [2] https://issues.apache.org/jira/browse/HIVE-13568 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23901) Data Masking Functions
[ https://issues.apache.org/jira/browse/SPARK-23901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439354#comment-16439354 ] Marco Gaido commented on SPARK-23901: - [~ueshin] maybe you have some inputs on my questions above. > Data Masking Functions > -- > > Key: SPARK-23901 > URL: https://issues.apache.org/jira/browse/SPARK-23901 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Priority: Major > > - mask() > - mask_first_n() > - mask_last_n() > - mask_hash() > - mask_show_first_n() > - mask_show_last_n() > Reference: > [1] > [https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DataMaskingFunctions] > [2] https://issues.apache.org/jira/browse/HIVE-13568 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23992) ShuffleDependency does not need to be deserialized every time
[ https://issues.apache.org/jira/browse/SPARK-23992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439349#comment-16439349 ] Apache Spark commented on SPARK-23992: -- User '10110346' has created a pull request for this issue: https://github.com/apache/spark/pull/21079 > ShuffleDependency does not need to be deserialized every time > - > > Key: SPARK-23992 > URL: https://issues.apache.org/jira/browse/SPARK-23992 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Minor > > In the same stage, 'ShuffleDependency' is not necessary to be deserialized > each time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23992) ShuffleDependency does not need to be deserialized every time
[ https://issues.apache.org/jira/browse/SPARK-23992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23992: Assignee: Apache Spark > ShuffleDependency does not need to be deserialized every time > - > > Key: SPARK-23992 > URL: https://issues.apache.org/jira/browse/SPARK-23992 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Assignee: Apache Spark >Priority: Minor > > In the same stage, 'ShuffleDependency' is not necessary to be deserialized > each time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23992) ShuffleDependency does not need to be deserialized every time
[ https://issues.apache.org/jira/browse/SPARK-23992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23992: Assignee: (was: Apache Spark) > ShuffleDependency does not need to be deserialized every time > - > > Key: SPARK-23992 > URL: https://issues.apache.org/jira/browse/SPARK-23992 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Minor > > In the same stage, 'ShuffleDependency' is not necessary to be deserialized > each time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23992) ShuffleDependency does not need to be deserialized every time
liuxian created SPARK-23992: --- Summary: ShuffleDependency does not need to be deserialized every time Key: SPARK-23992 URL: https://issues.apache.org/jira/browse/SPARK-23992 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.3.0 Reporter: liuxian In the same stage, 'ShuffleDependency' is not necessary to be deserialized each time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23986) CompileException when using too many avg aggregation after joining
[ https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michel Davit updated SPARK-23986: - Description: Considering the following code: {code:java} val df1: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") val df2: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, "val1", "val2"))) .toDF("key", "dummy1", "dummy2") val agg = df1 .join(df2, df1("key") === df2("key"), "leftouter") .groupBy(df1("key")) .agg( avg("col2").as("avg2"), avg("col3").as("avg3"), avg("col4").as("avg4"), avg("col1").as("avg1"), avg("col5").as("avg5"), avg("col6").as("avg6") ) val head = agg.take(1) {code} This logs the following exception: {code:java} ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 467, Column 28: Redefinition of parameter "agg_expr_11" {code} I am not a spark expert but after investigation, I realized that the generated {{doConsume}} method is responsible of the exception. Indeed, {{avg}} calls several times {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. The 1st time with the 'avg' Expr and a second time for the base aggregation Expr (count and sum). The problem comes from the generation of parameters in CodeGenerator: {code:java} /** * Returns a term name that is unique within this instance of a `CodegenContext`. */ def freshName(name: String): String = synchronized { val fullName = if (freshNamePrefix == "") { name } else { s"${freshNamePrefix}_$name" } if (freshNameIds.contains(fullName)) { val id = freshNameIds(fullName) freshNameIds(fullName) = id + 1 s"$fullName$id" } else { freshNameIds += fullName -> 1 fullName } } {code} The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. The second call is made with {{agg_expr_[1..12]}} and generates the following names: {{agg_expr_[11|21|31|41|51|61|11|12]}}. We then have a parameter name conflicts in the generated code: {{agg_expr_11.}} Appending the 'id' in s"$fullName$id" to generate unique term name is source of conflict. Maybe simply using undersoce can solve this issue : $fullName_$id" was: Considering the following code: {code:java} val df1: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") val df2: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, "val1", "val2"))) .toDF("key", "dummy1", "dummy2") val agg = df1 .join(df2, df1("key") === df2("key"), "leftouter") .groupBy(df1("key")) .agg( avg("col2").as("avg2"), avg("col3").as("avg3"), avg("col4").as("avg4"), avg("col1").as("avg1"), avg("col5").as("avg5"), avg("col6").as("avg6") ) val head = agg.take(1) {code} This logs the following exception: {code:java} ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 467, Column 28: Redefinition of parameter "agg_expr_11" {code} I am not a spark expert but after investigation, I realized that the generated {{doConsume}} method is responsible of the exception. Indeed, {{avg}} calls several times {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. The 1st time with the 'avg' Expr and a second time for the base aggregation Expr (count and sum). The problem comes from the generation of parameters in CodeGenerator: {code:java} /** * Returns a term name that is unique within this instance of a `CodegenContext`. */ def freshName(name: String): String = synchronized { val fullName = if (freshNamePrefix == "") { name } else { s"${freshNamePrefix}_$name" } if (freshNameIds.contains(fullName)) { val id = freshNameIds(fullName) freshNameIds(fullName) = id + 1 s"$fullName$id" } else { freshNameIds += fullName -> 1 fullName } } {code} The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. The second call is made with {{agg_expr_[1..12]}} and generates the following names: {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have a parameter name conflicts in the generated code: {{agg_expr_11.}} Appending the 'id' in s"$fullName$id" to generate unique term name is source of conflict. Maybe simply using undersoce can solve this issue : $fullName_$id" > CompileException when using too many avg aggregation after joining > -- > > Key: SPARK-23986 > URL: https://issues.apache
[jira] [Updated] (SPARK-23986) CompileException when using too many avg aggregation after joining
[ https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michel Davit updated SPARK-23986: - Description: Considering the following code: {code:java} val df1: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") val df2: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, "val1", "val2"))) .toDF("key", "dummy1", "dummy2") val agg = df1 .join(df2, df1("key") === df2("key"), "leftouter") .groupBy(df1("key")) .agg( avg("col2").as("avg2"), avg("col3").as("avg3"), avg("col4").as("avg4"), avg("col1").as("avg1"), avg("col5").as("avg5"), avg("col6").as("avg6") ) val head = agg.take(1) {code} This logs the following exception: {code:java} ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 467, Column 28: Redefinition of parameter "agg_expr_11" {code} I am not a spark expert but after investigation, I realized that the generated {{doConsume}} method is responsible of the exception. Indeed, {{avg}} calls several times {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. The 1st time with the 'avg' Expr and a second time for the base aggregation Expr (count and sum). The problem comes from the generation of parameters in CodeGenerator: {code:java} /** * Returns a term name that is unique within this instance of a `CodegenContext`. */ def freshName(name: String): String = synchronized { val fullName = if (freshNamePrefix == "") { name } else { s"${freshNamePrefix}_$name" } if (freshNameIds.contains(fullName)) { val id = freshNameIds(fullName) freshNameIds(fullName) = id + 1 s"$fullName$id" } else { freshNameIds += fullName -> 1 fullName } } {code} The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. The second call is made with {{agg_expr_[1..12]}} and generates the following names: {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have a parameter name conflicts in the generated code: {{agg_expr_11.}} Appending the 'id' in s"$fullName$id" to generate unique term name is source of conflict. Maybe simply using undersoce can solve this issue : $fullName_$id" was: Considering the following code: {code:java} val df1: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") val df2: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, "val1", "val2"))) .toDF("key", "dummy1", "dummy2") val agg = df1 .join(df2, df1("key") === df2("key"), "leftouter") .groupBy(df1("key")) .agg( avg("col2").as("avg2"), avg("col3").as("avg3"), avg("col4").as("avg4"), avg("col1").as("avg1"), avg("col5").as("avg5"), avg("col6").as("avg6") ) val head = agg.take(1) {code} This logs the following exception: {code:java} ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 467, Column 28: Redefinition of parameter "agg_expr_11" {code} I am not a spark expert but after investigation, I realized that the generated {{doConsume}} method is responsible of the exception. Indeed, {{avg}} calls several times {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. The 1st time with the 'avg' Expr and a second time for the base aggregation Expr (count and sum). The problem comes from the generation of parameters in CodeGenerator: {code:java} /** * Returns a term name that is unique within this instance of a `CodegenContext`. */ def freshName(name: String): String = synchronized { val fullName = if (freshNamePrefix == "") { name } else { s"${freshNamePrefix}_$name" } if (freshNameIds.contains(fullName)) { val id = freshNameIds(fullName) freshNameIds(fullName) = id + 1 s"$fullName$id" } else { freshNameIds += fullName -> 1 fullName } } {code} The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. The second call is made with {{agg_expr_[1..12]}} and generates the following names: {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name conflicts in the generated code: {{agg_expr_11}} and {{agg_expr_12}}. Appending the 'id' in s"$fullName$id" to generate unique term name is source of conflict. Maybe simply using undersoce can solve this issue : $fullName_$id" > CompileException when using too many avg aggregation after joining > -- > > Key: SPARK-23986 > URL: ht
[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining
[ https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439318#comment-16439318 ] Michel Davit commented on SPARK-23986: -- I tested on Spark v2.3.0 I attached the generated code: [^spark-generated.java] . Here is the faulty line (467): {code:java} private void agg_doConsume1(int agg_expr_01, double agg_expr_11, boolean agg_exprIsNull_1, long agg_expr_21, boolean agg_exprIsNull_2, double agg_expr_31, boolean agg_exprIsNull_3, long agg_expr_41, boolean agg_exprIsNull_4, double agg_expr_51, boolean agg_exprIsNull_5, long agg_expr_61, boolean agg_exprIsNull_6, double agg_expr_7, boolean agg_exprIsNull_7, long agg_expr_8, boolean agg_exprIsNull_8, double agg_expr_9, boolean agg_exprIsNull_9, long agg_expr_10, boolean agg_exprIsNull_10, double agg_expr_11, boolean agg_exprIsNull_11, long agg_expr_12, boolean agg_exprIsNull_12) throws java.io.IOException {code} Maybe a precision: the code does not throw, it just logs an error. I also checked the computed average values, everything seems correct. > CompileException when using too many avg aggregation after joining > -- > > Key: SPARK-23986 > URL: https://issues.apache.org/jira/browse/SPARK-23986 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michel Davit >Priority: Major > Attachments: spark-generated.java > > > Considering the following code: > {code:java} > val df1: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) > .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") > val df2: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, "val1", "val2"))) > .toDF("key", "dummy1", "dummy2") > val agg = df1 > .join(df2, df1("key") === df2("key"), "leftouter") > .groupBy(df1("key")) > .agg( > avg("col2").as("avg2"), > avg("col3").as("avg3"), > avg("col4").as("avg4"), > avg("col1").as("avg1"), > avg("col5").as("avg5"), > avg("col6").as("avg6") > ) > val head = agg.take(1) > {code} > This logs the following exception: > {code:java} > ERROR CodeGenerator: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 467, Column 28: Redefinition of parameter "agg_expr_11" > {code} > I am not a spark expert but after investigation, I realized that the > generated {{doConsume}} method is responsible of the exception. > Indeed, {{avg}} calls several times > {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. > The 1st time with the 'avg' Expr and a second time for the base aggregation > Expr (count and sum). > The problem comes from the generation of parameters in CodeGenerator: > {code:java} > /** >* Returns a term name that is unique within this instance of a > `CodegenContext`. >*/ > def freshName(name: String): String = synchronized { > val fullName = if (freshNamePrefix == "") { > name > } else { > s"${freshNamePrefix}_$name" > } > if (freshNameIds.contains(fullName)) { > val id = freshNameIds(fullName) > freshNameIds(fullName) = id + 1 > s"$fullName$id" > } else { > freshNameIds += fullName -> 1 > fullName > } > } > {code} > The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. > The second call is made with {{agg_expr_[1..12]}} and generates the > following names: > {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name > conflicts in the generated code: {{agg_expr_11}} and {{agg_expr_12}}. > Appending the 'id' in s"$fullName$id" to generate unique term name is source > of conflict. Maybe simply using undersoce can solve this issue : > $fullName_$id" -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23986) CompileException when using too many avg aggregation after joining
[ https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michel Davit updated SPARK-23986: - Attachment: spark-generated.java > CompileException when using too many avg aggregation after joining > -- > > Key: SPARK-23986 > URL: https://issues.apache.org/jira/browse/SPARK-23986 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michel Davit >Priority: Major > Attachments: spark-generated.java > > > Considering the following code: > {code:java} > val df1: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) > .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") > val df2: DataFrame = sparkSession.sparkContext > .makeRDD(Seq((0, "val1", "val2"))) > .toDF("key", "dummy1", "dummy2") > val agg = df1 > .join(df2, df1("key") === df2("key"), "leftouter") > .groupBy(df1("key")) > .agg( > avg("col2").as("avg2"), > avg("col3").as("avg3"), > avg("col4").as("avg4"), > avg("col1").as("avg1"), > avg("col5").as("avg5"), > avg("col6").as("avg6") > ) > val head = agg.take(1) > {code} > This logs the following exception: > {code:java} > ERROR CodeGenerator: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 467, Column 28: Redefinition of parameter "agg_expr_11" > {code} > I am not a spark expert but after investigation, I realized that the > generated {{doConsume}} method is responsible of the exception. > Indeed, {{avg}} calls several times > {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. > The 1st time with the 'avg' Expr and a second time for the base aggregation > Expr (count and sum). > The problem comes from the generation of parameters in CodeGenerator: > {code:java} > /** >* Returns a term name that is unique within this instance of a > `CodegenContext`. >*/ > def freshName(name: String): String = synchronized { > val fullName = if (freshNamePrefix == "") { > name > } else { > s"${freshNamePrefix}_$name" > } > if (freshNameIds.contains(fullName)) { > val id = freshNameIds(fullName) > freshNameIds(fullName) = id + 1 > s"$fullName$id" > } else { > freshNameIds += fullName -> 1 > fullName > } > } > {code} > The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call. > The second call is made with {{agg_expr_[1..12]}} and generates the > following names: > {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name > conflicts in the generated code: {{agg_expr_11}} and {{agg_expr_12}}. > Appending the 'id' in s"$fullName$id" to generate unique term name is source > of conflict. Maybe simply using undersoce can solve this issue : > $fullName_$id" -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23991) data loss when allocateBlocksToBatch
[ https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kevin fu updated SPARK-23991: - Description: with checkpoint and WAL enabled, driver will write the allocation of blocks to batch into hdfs. however, if it fails as following, the blocks of this batch cannot be computed by the DAG. Because the blocks have been dequeued from the receivedBlockQueue and get lost. {panel:title=error log} 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch 152376548 ms needs to be processed again in WAL recovery{panel} the concerning codes are showed below: {code} /** * Allocate all unallocated blocks to the given batch. * This event will get written to the write ahead log (if enabled). */ def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed again, // so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } {code} was: with checkpoint and WAL enabled, driver will write the allocation of blocks to batch into hdfs. however, if it fails as following, the blocks of this batch cannot be computed by the DAG. Because the blocks have been dequeued from the receivedBlockQueue and get lost. {panel:title=error log} 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocat
[jira] [Updated] (SPARK-23991) data loss when allocateBlocksToBatch
[ https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kevin fu updated SPARK-23991: - Description: with checkpoint and WAL enabled, driver will write the allocation of blocks to batch into hdfs. however, if it fails as following, the blocks of this batch cannot be computed by the DAG. Because the blocks have been dequeued from the receivedBlockQueue and get lost. {panel:title=error log} 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 more {panel} the concerning codes are showed below: {code:scala} /** * Allocate all unallocated blocks to the given batch. * This event will get written to the write ahead log (if enabled). */ def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed again, // so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } {code} was: with checkpoint and WAL enabled, driver will write the allocation of blocks to batch into hdfs. however, if it fails as following, the blocks of this batch cannot be computed by the DAG. Because the blocks have been dequeued from the receivedBlockQueue and get lost. {quote}18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverT
[jira] [Created] (SPARK-23991) data loss when allocateBlocksToBatch
kevin fu created SPARK-23991: Summary: data loss when allocateBlocksToBatch Key: SPARK-23991 URL: https://issues.apache.org/jira/browse/SPARK-23991 Project: Spark Issue Type: Bug Components: DStreams, Input/Output Affects Versions: 2.2.0 Environment: spark 2.11 Reporter: kevin fu with checkpoint and WAL enabled, driver will write the allocation of blocks to batch into hdfs. however, if it fails as following, the blocks of this batch cannot be computed by the DAG. Because the blocks have been dequeued from the receivedBlockQueue and get lost. {quote}18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 more{quote} the concerning codes are showed below: {quote} /** * Allocate all unallocated blocks to the given batch. * This event will get written to the write ahead log (if enabled). */ def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed again, // so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23990) Instruments logging improvements - ML regression package
[ https://issues.apache.org/jira/browse/SPARK-23990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23990: Assignee: (was: Apache Spark) > Instruments logging improvements - ML regression package > > > Key: SPARK-23990 > URL: https://issues.apache.org/jira/browse/SPARK-23990 > Project: Spark > Issue Type: Sub-task > Components: ML >Affects Versions: 2.3.0 > Environment: Instruments logging improvements - ML regression package >Reporter: Weichen Xu >Priority: Major > Original Estimate: 120h > Remaining Estimate: 120h > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23990) Instruments logging improvements - ML regression package
[ https://issues.apache.org/jira/browse/SPARK-23990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439268#comment-16439268 ] Apache Spark commented on SPARK-23990: -- User 'WeichenXu123' has created a pull request for this issue: https://github.com/apache/spark/pull/21078 > Instruments logging improvements - ML regression package > > > Key: SPARK-23990 > URL: https://issues.apache.org/jira/browse/SPARK-23990 > Project: Spark > Issue Type: Sub-task > Components: ML >Affects Versions: 2.3.0 > Environment: Instruments logging improvements - ML regression package >Reporter: Weichen Xu >Priority: Major > Original Estimate: 120h > Remaining Estimate: 120h > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23990) Instruments logging improvements - ML regression package
[ https://issues.apache.org/jira/browse/SPARK-23990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23990: Assignee: Apache Spark > Instruments logging improvements - ML regression package > > > Key: SPARK-23990 > URL: https://issues.apache.org/jira/browse/SPARK-23990 > Project: Spark > Issue Type: Sub-task > Components: ML >Affects Versions: 2.3.0 > Environment: Instruments logging improvements - ML regression package >Reporter: Weichen Xu >Assignee: Apache Spark >Priority: Major > Original Estimate: 120h > Remaining Estimate: 120h > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23990) Instruments logging improvements - ML regression package
[ https://issues.apache.org/jira/browse/SPARK-23990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weichen Xu updated SPARK-23990: --- Issue Type: Sub-task (was: Bug) Parent: SPARK-23686 > Instruments logging improvements - ML regression package > > > Key: SPARK-23990 > URL: https://issues.apache.org/jira/browse/SPARK-23990 > Project: Spark > Issue Type: Sub-task > Components: ML >Affects Versions: 2.3.0 > Environment: Instruments logging improvements - ML regression package >Reporter: Weichen Xu >Priority: Major > Original Estimate: 120h > Remaining Estimate: 120h > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23990) Instruments logging improvements - ML regression package
Weichen Xu created SPARK-23990: -- Summary: Instruments logging improvements - ML regression package Key: SPARK-23990 URL: https://issues.apache.org/jira/browse/SPARK-23990 Project: Spark Issue Type: Bug Components: ML Affects Versions: 2.3.0 Environment: Instruments logging improvements - ML regression package Reporter: Weichen Xu -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439231#comment-16439231 ] liuxian edited comment on SPARK-23989 at 4/16/18 10:18 AM: --- For {color:#33}`SortShuffleWriter`{color}, `records: {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 'UnsafeRow' type. For example ,we insert the first record {color:#33}into `PartitionedPairBuffer`, we only save the 'AnyRef{color}', but the {color:#33} 'AnyRef{color}' of next record(only value, not key) is same as the first record , so the first record is overwritten. was (Author: 10110346): For {color:#33}`SortShuffleWriter`{color}, `records: {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 'UnsafeRow' type. For example ,we insert the first record {color:#33}into `PartitionedPairBuffer`, we only save the '{color:#cc7832}AnyRef{color}', but the {color:#33} '{color:#cc7832}AnyRef{color}'{color} of next {color}record(only value, not key) is same as the first record , so the first record is overwritten. h1. overwritten > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439231#comment-16439231 ] liuxian commented on SPARK-23989: - For {color:#33}`SortShuffleWriter`{color}, `records: {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 'UnsafeRow' type. For example ,we insert the first record {color:#33}into `PartitionedPairBuffer`, we only save the '{color:#cc7832}AnyRef{color}', but the {color:#33} '{color:#cc7832}AnyRef{color}'{color} of next {color}record(only value, not key) is same as the first record , so the first record is overwritten. h1. overwritten > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439205#comment-16439205 ] Wenchen Fan commented on SPARK-23989: - Can you be more specific about what the problem is? > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439148#comment-16439148 ] liuxian edited comment on SPARK-23989 at 4/16/18 9:00 AM: -- [~joshrosen] [~cloud_fan] was (Author: 10110346): [~joshrosen] > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439148#comment-16439148 ] liuxian commented on SPARK-23989: - [~joshrosen] > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
liuxian created SPARK-23989: --- Summary: When using `SortShuffleWriter`, the data will be overwritten Key: SPARK-23989 URL: https://issues.apache.org/jira/browse/SPARK-23989 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0 Reporter: liuxian {color:#33}When using `SortShuffleWriter`, we only insert '{color}{color:#cc7832}AnyRef{color}{color:#33}' into '{color} PartitionedAppendOnlyMap{color:#33}' or '{color}PartitionedPairBuffer{color:#33}'.{color} {color:#33}For this function:{color} {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) the value of 'records' is `UnsafeRow`, so the value will be overwritten {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liuxian updated SPARK-23989: Description: {color:#33}When using `SortShuffleWriter`, we only insert '{color}{color:#cc7832}AnyRef{color}{color:#33}' into '{color}PartitionedAppendOnlyMap{color:#33}' or '{color}PartitionedPairBuffer{color:#33}'.{color} {color:#33}For this function:{color} {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) the value of 'records' is `UnsafeRow`, so the value will be overwritten {color:#33} {color} was: {color:#33}When using `SortShuffleWriter`, we only insert '{color}{color:#cc7832}AnyRef{color}{color:#33}' into '{color} PartitionedAppendOnlyMap{color:#33}' or '{color}PartitionedPairBuffer{color:#33}'.{color} {color:#33}For this function:{color} {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) the value of 'records' is `UnsafeRow`, so the value will be overwritten {color:#33} {color} > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21033) fix the potential OOM in UnsafeExternalSorter
[ https://issues.apache.org/jira/browse/SPARK-21033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439118#comment-16439118 ] Apache Spark commented on SPARK-21033: -- User 'wangyum' has created a pull request for this issue: https://github.com/apache/spark/pull/21077 > fix the potential OOM in UnsafeExternalSorter > - > > Key: SPARK-21033 > URL: https://issues.apache.org/jira/browse/SPARK-21033 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 2.3.0 > > > In `UnsafeInMemorySorter`, one record may take 32 bytes: 1 `long` for > pointer, 1 `long` for key-prefix, and another 2 `long`s as the temporary > buffer for radix sort. > In `UnsafeExternalSorter`, we set the > `DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to be `1024 * 1024 * 1024 / 2`, > and hoping the max size of point array to be 8 GB. However this is wrong, > `1024 * 1024 * 1024 / 2 * 32` is actually 16 GB, and if we grow the point > array before reach this limitation, we may hit the max-page-size error. > Users may see exception like this on large dataset: > {code} > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > at > org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241) > at > org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94) > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439114#comment-16439114 ] Ohad Raviv commented on SPARK-23985: I see in the Optimizer that filters are getting pushed only if they appear in the partitionSpec as they are. Looks like we need to add to Expression some kind of property that indicates weather we can push through it. More trivial example than Concat could bu Struct. [~cloud_fan] - I see you have dealt with this code about a year ago, could you please take a look? Ohad. > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org