[jira] [Updated] (SPARK-30666) Reliable single-stage accumulators

2020-02-18 Thread Enrico Minack (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Enrico Minack updated SPARK-30666:
--
Description: 
This proposes a pragmatic improvement to allow for reliable single-stage 
accumulators. Under the assumption that a given stage / partition / rdd 
produces identical results, non-deterministic code produces identical 
accumulator increments on success. Rerunning partitions for any reason should 
always produce the same increments on success.

With this pragmatic approach, increments from individual partitions / tasks are 
compared to earlier increments. Depending on the strategy of how a new 
increment updates over an earlier increment from the same partition, different 
semantics of accumulators (here called accumulator modes) can be implemented:
 - {{ALL}} sums over all increments of each partition: this represents the 
current implementation of accumulators
 - {{LARGEST}} over all increments of each partition: accumulators aggregate 
multiple increments while a partition is processed, a successful task provides 
the most accumulated values that has always the largest cardinality than any 
accumulated value of failed tasks, hence it paramounts any failed task's value. 
This produces reliable accumulator values. This does not require 
{{countFailedValues == false}}. This should only be used in a single stage. The 
naming may be confused with {{MAX}}.
 - {{FIRST}} increment: allows to retrieve the first accumulator value for each 
partition only. This is only useful for accumulators registered with 
{{countFailedValues == false}}.

The implementations for {{LARGEST}} and {{FIRST}} require extra memory that 
scales with the number of partitions. The current {{ALL}} implementation does 
not require extra memory.

  was:
This proposes a pragmatic improvement to allow for reliable single-stage 
accumulators. Under the assumption that a given stage / partition / rdd 
produces identical results, non-deterministic code incrementing accumulators 
also produces identical accumulator increments on success. Rerunning partitions 
for any reason should always produce the same increments on success.

With this pragmatic approach, increments from individual partitions / tasks are 
compared to earlier increments. Depending on the strategy of how a new 
increment updates over an earlier increment from the same partition, different 
semantics of accumulators (here called accumulator modes) can be implemented:
 - ALL sums over all increments of each partition: this represents the current 
implementation of accumulators
 - MAX over all increments of each partition: assuming accumulators only 
increment while a partition is processed, a successful task provides an 
accumulator value that is always larger than any value of failed tasks, hence 
it paramounts any failed task's value. This produces reliable accumulator 
values. This should only be used in a single stage.
 - LAST increment: allows to retrieve the latest increment for each partition 
only.

The implementation for MAX and LAST requires extra memory that scales with the 
number of partitions. The current ALL implementation does not require extra 
memory.


> Reliable single-stage accumulators
> --
>
> Key: SPARK-30666
> URL: https://issues.apache.org/jira/browse/SPARK-30666
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.1.0
>Reporter: Enrico Minack
>Priority: Major
>
> This proposes a pragmatic improvement to allow for reliable single-stage 
> accumulators. Under the assumption that a given stage / partition / rdd 
> produces identical results, non-deterministic code produces identical 
> accumulator increments on success. Rerunning partitions for any reason should 
> always produce the same increments on success.
> With this pragmatic approach, increments from individual partitions / tasks 
> are compared to earlier increments. Depending on the strategy of how a new 
> increment updates over an earlier increment from the same partition, 
> different semantics of accumulators (here called accumulator modes) can be 
> implemented:
>  - {{ALL}} sums over all increments of each partition: this represents the 
> current implementation of accumulators
>  - {{LARGEST}} over all increments of each partition: accumulators aggregate 
> multiple increments while a partition is processed, a successful task 
> provides the most accumulated values that has always the largest cardinality 
> than any accumulated value of failed tasks, hence it paramounts any failed 
> task's value. This produces reliable accumulator values. This does not 
> require {{countFailedValues == false}}. This should only be used in a single 
> stage. The naming may be confused with {{MAX}}.
>  - {{FIRST}} increment: allows to retrie

[jira] [Updated] (SPARK-30666) Reliable single-stage accumulators

2020-02-18 Thread Enrico Minack (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Enrico Minack updated SPARK-30666:
--
Description: 
This proposes a pragmatic improvement to allow for reliable single-stage 
accumulators. Under the assumption that a given stage / partition / rdd 
produces identical results, non-deterministic code produces identical 
accumulator increments on success. Rerunning partitions for any reason should 
always produce the same increments on success.

With this pragmatic approach, increments from individual partitions / tasks are 
compared to earlier increments. Depending on the strategy of how a new 
increment updates over an earlier increment from the same partition, different 
semantics of accumulators (here called accumulator modes) can be implemented:
 - {{ALL}} sums over all increments of each partition: this represents the 
current implementation of accumulators
 - {{FIRST}} increment: allows to retrieve the first accumulator value for each 
partition only. This is only useful for accumulators registered with 
{{countFailedValues == false}}.
 - {{LARGEST}} over all increments of each partition: accumulators aggregate 
multiple increments while a partition is processed, a successful task provides 
the most accumulated values that has always the largest cardinality than any 
accumulated value of failed tasks, hence it paramounts any failed task's value. 
This produces reliable accumulator values. This does not require 
{{countFailedValues == false}}. This should only be used in a single stage. The 
naming may be confused with {{MAX}}.

The implementations for {{LARGEST}} and {{FIRST}} require extra memory that 
scales with the number of partitions. The current {{ALL}} implementation does 
not require extra memory.

  was:
This proposes a pragmatic improvement to allow for reliable single-stage 
accumulators. Under the assumption that a given stage / partition / rdd 
produces identical results, non-deterministic code produces identical 
accumulator increments on success. Rerunning partitions for any reason should 
always produce the same increments on success.

With this pragmatic approach, increments from individual partitions / tasks are 
compared to earlier increments. Depending on the strategy of how a new 
increment updates over an earlier increment from the same partition, different 
semantics of accumulators (here called accumulator modes) can be implemented:
 - {{ALL}} sums over all increments of each partition: this represents the 
current implementation of accumulators
 - {{LARGEST}} over all increments of each partition: accumulators aggregate 
multiple increments while a partition is processed, a successful task provides 
the most accumulated values that has always the largest cardinality than any 
accumulated value of failed tasks, hence it paramounts any failed task's value. 
This produces reliable accumulator values. This does not require 
{{countFailedValues == false}}. This should only be used in a single stage. The 
naming may be confused with {{MAX}}.
 - {{FIRST}} increment: allows to retrieve the first accumulator value for each 
partition only. This is only useful for accumulators registered with 
{{countFailedValues == false}}.

The implementations for {{LARGEST}} and {{FIRST}} require extra memory that 
scales with the number of partitions. The current {{ALL}} implementation does 
not require extra memory.


> Reliable single-stage accumulators
> --
>
> Key: SPARK-30666
> URL: https://issues.apache.org/jira/browse/SPARK-30666
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.1.0
>Reporter: Enrico Minack
>Priority: Major
>
> This proposes a pragmatic improvement to allow for reliable single-stage 
> accumulators. Under the assumption that a given stage / partition / rdd 
> produces identical results, non-deterministic code produces identical 
> accumulator increments on success. Rerunning partitions for any reason should 
> always produce the same increments on success.
> With this pragmatic approach, increments from individual partitions / tasks 
> are compared to earlier increments. Depending on the strategy of how a new 
> increment updates over an earlier increment from the same partition, 
> different semantics of accumulators (here called accumulator modes) can be 
> implemented:
>  - {{ALL}} sums over all increments of each partition: this represents the 
> current implementation of accumulators
>  - {{FIRST}} increment: allows to retrieve the first accumulator value for 
> each partition only. This is only useful for accumulators registered with 
> {{countFailedValues == false}}.
>  - {{LARGEST}} over all increments of each partition: accumulators aggregate 
> multiple increments while a partition is processed

[jira] [Updated] (SPARK-30666) Reliable single-stage accumulators

2020-02-18 Thread Enrico Minack (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Enrico Minack updated SPARK-30666:
--
Description: 
This proposes a pragmatic improvement to allow for reliable single-stage 
accumulators. Under the assumption that a given stage / partition / rdd 
produces identical results, non-deterministic code produces identical 
accumulator increments on success. Rerunning partitions for any reason should 
always produce the same increments on success.

With this pragmatic approach, increments from individual partitions / tasks are 
compared to earlier increments. Depending on the strategy of how a new 
increment updates over an earlier increment from the same partition, different 
semantics of accumulators (here called accumulator modes) can be implemented:
 - {{ALL}} sums over all increments of each partition: this represents the 
current implementation of accumulators
 - {{FIRST}} increment: allows to retrieve the first accumulator value for each 
partition only. This is useful for accumulators registered with 
{{countFailedValues == false}}.
 - {{LARGEST}} over all increments of each partition: accumulators aggregate 
multiple increments while a partition is processed, a successful task provides 
the most accumulated values that has always the largest cardinality than any 
accumulated value of failed tasks, hence it paramounts any failed task's value. 
This produces reliable accumulator values. This does not require 
{{countFailedValues == false}}. This should only be used in a single stage. The 
naming may be confused with {{MAX}}.

The implementations for {{LARGEST}} and {{FIRST}} require extra memory that 
scales with the number of partitions. The current {{ALL}} implementation does 
not require extra memory.

  was:
This proposes a pragmatic improvement to allow for reliable single-stage 
accumulators. Under the assumption that a given stage / partition / rdd 
produces identical results, non-deterministic code produces identical 
accumulator increments on success. Rerunning partitions for any reason should 
always produce the same increments on success.

With this pragmatic approach, increments from individual partitions / tasks are 
compared to earlier increments. Depending on the strategy of how a new 
increment updates over an earlier increment from the same partition, different 
semantics of accumulators (here called accumulator modes) can be implemented:
 - {{ALL}} sums over all increments of each partition: this represents the 
current implementation of accumulators
 - {{FIRST}} increment: allows to retrieve the first accumulator value for each 
partition only. This is only useful for accumulators registered with 
{{countFailedValues == false}}.
 - {{LARGEST}} over all increments of each partition: accumulators aggregate 
multiple increments while a partition is processed, a successful task provides 
the most accumulated values that has always the largest cardinality than any 
accumulated value of failed tasks, hence it paramounts any failed task's value. 
This produces reliable accumulator values. This does not require 
{{countFailedValues == false}}. This should only be used in a single stage. The 
naming may be confused with {{MAX}}.

The implementations for {{LARGEST}} and {{FIRST}} require extra memory that 
scales with the number of partitions. The current {{ALL}} implementation does 
not require extra memory.


> Reliable single-stage accumulators
> --
>
> Key: SPARK-30666
> URL: https://issues.apache.org/jira/browse/SPARK-30666
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.1.0
>Reporter: Enrico Minack
>Priority: Major
>
> This proposes a pragmatic improvement to allow for reliable single-stage 
> accumulators. Under the assumption that a given stage / partition / rdd 
> produces identical results, non-deterministic code produces identical 
> accumulator increments on success. Rerunning partitions for any reason should 
> always produce the same increments on success.
> With this pragmatic approach, increments from individual partitions / tasks 
> are compared to earlier increments. Depending on the strategy of how a new 
> increment updates over an earlier increment from the same partition, 
> different semantics of accumulators (here called accumulator modes) can be 
> implemented:
>  - {{ALL}} sums over all increments of each partition: this represents the 
> current implementation of accumulators
>  - {{FIRST}} increment: allows to retrieve the first accumulator value for 
> each partition only. This is useful for accumulators registered with 
> {{countFailedValues == false}}.
>  - {{LARGEST}} over all increments of each partition: accumulators aggregate 
> multiple increments while a partition is processed, a succes

[jira] [Updated] (SPARK-27733) Upgrade to Avro 1.9.2

2020-02-18 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-27733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated SPARK-27733:
-
Summary: Upgrade to Avro 1.9.2  (was: Upgrade to Avro 1.9.x)

> Upgrade to Avro 1.9.2
> -
>
> Key: SPARK-27733
> URL: https://issues.apache.org/jira/browse/SPARK-27733
> Project: Spark
>  Issue Type: Improvement
>  Components: Build, SQL
>Affects Versions: 3.0.0
>Reporter: Ismaël Mejía
>Priority: Minor
>
> Avro 1.9.0 was released with many nice features including reduced size (1MB 
> less), and removed dependencies, no paranmer, no shaded guava, security 
> updates, so probably a worth upgrade.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30851) Add 'path' field to the 'LoadInstanceEnd' ML listener event

2020-02-18 Thread Vladislav Glinskiy (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038917#comment-17038917
 ] 

Vladislav Glinskiy commented on SPARK-30851:


This issue can be bypassed by caching loaded instances paths from 
{{LoadInstanceStart}} events using {{reader}} instance as a key.
{code:java}
private[this] val loadedInstancePaths = new mutable.WeakHashMap[MLReader[_], 
String]()
 ...
 case loadInstanceStart: LoadInstanceStart[_] => 
loadedInstancePaths.put(loadInstanceStart.reader, loadInstanceStart.path)
 case loadInstanceEnd: LoadInstanceEnd[_] => val path = 
loadedInstancePaths.get(loadInstanceEnd.reader)
 ...
{code}

> Add 'path' field to the 'LoadInstanceEnd' ML listener event
> ---
>
> Key: SPARK-30851
> URL: https://issues.apache.org/jira/browse/SPARK-30851
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 3.0.0
>Reporter: Vladislav Glinskiy
>Priority: Major
> Fix For: 3.0.0
>
>
> The 
> [LoadInstanceEnd|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L92]
>  ML listener event that is added on the scope of SPARK-23674 has no 'path' 
> field which makes it impossible to determine from what path an ML instance 
> was loaded as well as there is no way to get instance's 'uid' via 
> [LoadInstanceStart|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L84]
>  event.
>  
> The 
> [LoadInstanceEnd|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L92]
>  must be changed to include 'path' field. Please, refer 
> [SaveInstanceStart|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L101]
>  and 
> [SaveInstanceEnd|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L109]
>  events. Both of them have `path` but 
> [LoadInstanceStart|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L84]
>  and 
> [LoadInstanceEnd|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L92]
>  are not.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30851) Add 'path' field to the 'LoadInstanceEnd' ML listener event

2020-02-18 Thread Vladislav Glinskiy (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladislav Glinskiy resolved SPARK-30851.

Resolution: Workaround

> Add 'path' field to the 'LoadInstanceEnd' ML listener event
> ---
>
> Key: SPARK-30851
> URL: https://issues.apache.org/jira/browse/SPARK-30851
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 3.0.0
>Reporter: Vladislav Glinskiy
>Priority: Major
> Fix For: 3.0.0
>
>
> The 
> [LoadInstanceEnd|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L92]
>  ML listener event that is added on the scope of SPARK-23674 has no 'path' 
> field which makes it impossible to determine from what path an ML instance 
> was loaded as well as there is no way to get instance's 'uid' via 
> [LoadInstanceStart|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L84]
>  event.
>  
> The 
> [LoadInstanceEnd|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L92]
>  must be changed to include 'path' field. Please, refer 
> [SaveInstanceStart|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L101]
>  and 
> [SaveInstanceEnd|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L109]
>  events. Both of them have `path` but 
> [LoadInstanceStart|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L84]
>  and 
> [LoadInstanceEnd|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/events.scala#L92]
>  are not.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-30863) Distinguish Cast and AnsiCast in toString()

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-30863:
---

Assignee: wuyi

> Distinguish Cast and AnsiCast in toString()
> ---
>
> Key: SPARK-30863
> URL: https://issues.apache.org/jira/browse/SPARK-30863
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>
> Currently, we can not distinguish Cast and AnsiCast while comparing plans 
> literally. This brings difficulty when we do dome investigations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30863) Distinguish Cast and AnsiCast in toString()

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-30863.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

Issue resolved by pull request 27608
[https://github.com/apache/spark/pull/27608]

> Distinguish Cast and AnsiCast in toString()
> ---
>
> Key: SPARK-30863
> URL: https://issues.apache.org/jira/browse/SPARK-30863
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, we can not distinguish Cast and AnsiCast while comparing plans 
> literally. This brings difficulty when we do dome investigations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28990) SparkSQL invalid call to toAttribute on unresolved object, tree: *

2020-02-18 Thread Gary Scott (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038983#comment-17038983
 ] 

Gary Scott commented on SPARK-28990:


[~fengchaoge] [~stephenwoo] [~xiaozhang] - I have the same issue, can you 
please advise on when this will be resolved?

> SparkSQL invalid call to toAttribute on unresolved object, tree: *
> --
>
> Key: SPARK-28990
> URL: https://issues.apache.org/jira/browse/SPARK-28990
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: fengchaoge
>Priority: Major
>
> SparkSQL create table as select from one table which may not exists throw 
> exceptions like:
> {code}
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> toAttribute on unresolved object, tree:
> {code}
> This is not friendly, spark user may have no idea about what's wrong.
> Simple sql can reproduce it,like this:
> {code}
> spark-sql (default)> create table default.spark as select * from default.dual;
> {code}
> {code}
> 2019-09-05 16:27:24,127 INFO (main) [Logging.scala:logInfo(54)] - Parsing 
> command: create table default.spark as select * from default.dual
> 2019-09-05 16:27:24,772 ERROR (main) [Logging.scala:logError(91)] - Failed in 
> [create table default.spark as select * from default.dual]
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> toAttribute on unresolved object, tree: *
> at 
> org.apache.spark.sql.catalyst.analysis.Star.toAttribute(unresolved.scala:245)
> at 
> org.apache.spark.sql.catalyst.plans.logical.Project$$anonfun$output$1.apply(basicLogicalOperators.scala:52)
> at 
> org.apache.spark.sql.catalyst.plans.logical.Project$$anonfun$output$1.apply(basicLogicalOperators.scala:52)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at 
> org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:52)
> at 
> org.apache.spark.sql.hive.HiveAnalysis$$anonfun$apply$3.applyOrElse(HiveStrategies.scala:160)
> at 
> org.apache.spark.sql.hive.HiveAnalysis$$anonfun$apply$3.applyOrElse(HiveStrategies.scala:148)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsDown$1$$anonfun$2.apply(AnalysisHelper.scala:108)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsDown$1$$anonfun$2.apply(AnalysisHelper.scala:108)
> at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsDown$1.apply(AnalysisHelper.scala:107)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsDown$1.apply(AnalysisHelper.scala:106)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.resolveOperatorsDown(AnalysisHelper.scala:106)
> at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.resolveOperators(AnalysisHelper.scala:73)
> at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
> at org.apache.spark.sql.hive.HiveAnalysis$.apply(HiveStrategies.scala:148)
> at org.apache.spark.sql.hive.HiveAnalysis$.apply(HiveStrategies.scala:147)
> at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
> at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
> at 
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> at 
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
> at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
> at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
> at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Anal

[jira] [Commented] (SPARK-28883) Fix a flaky test: ThriftServerQueryTestSuite

2020-02-18 Thread Yuming Wang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039005#comment-17039005
 ] 

Yuming Wang commented on SPARK-28883:
-

I think the issue has been fixed.

> Fix a flaky test: ThriftServerQueryTestSuite
> 
>
> Key: SPARK-28883
> URL: https://issues.apache.org/jira/browse/SPARK-28883
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Blocker
>
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/109764/testReport/org.apache.spark.sql.hive.thriftserver/ThriftServerQueryTestSuite/
>  (2 failures)
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/109768/testReport/org.apache.spark.sql.hive.thriftserver/ThriftServerQueryTestSuite/
>  (4 failures)
> Error message:
> {noformat}
> java.sql.SQLException: Could not open client transport with JDBC Uri: 
> jdbc:hive2://localhost:14431: java.net.ConnectException: Connection refused 
> (Connection refused)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30852) Use Long instead of Int as argument type in Dataset limit method

2020-02-18 Thread Rakesh Raushan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039031#comment-17039031
 ] 

Rakesh Raushan commented on SPARK-30852:


Ahh. In that case we cannot allow long values. Tail also give an array only.

Can we mark this issue as won't fix then?

> Use Long instead of Int as argument type in Dataset limit method
> 
>
> Key: SPARK-30852
> URL: https://issues.apache.org/jira/browse/SPARK-30852
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.4
>Reporter: Damianos Christophides
>Priority: Minor
>
> The Dataset limit method takes an input of type Int, which is a 32bit 
> integer. The numerical upper limit of this type is 2,147,483,647. I found in 
> my work to need to apply a limit to a Dataset higher than that which gives an 
> error:
> "py4j.Py4JException: Method limit([class java.lang.Long]) does not exist"
>  
> Could the input type of the limit method be changed to a Long (64bit)?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-30812) Revise boolean config name according to new config naming policy

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-30812:
---

Assignee: wuyi

> Revise boolean config name according to new config naming policy
> 
>
> Key: SPARK-30812
> URL: https://issues.apache.org/jira/browse/SPARK-30812
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, SQL
>Affects Versions: 3.0.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>
> config naming policy:
> http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-naming-policy-of-Spark-configs-td28875.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30812) Revise boolean config name according to new config naming policy

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-30812.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

Issue resolved by pull request 27563
[https://github.com/apache/spark/pull/27563]

> Revise boolean config name according to new config naming policy
> 
>
> Key: SPARK-30812
> URL: https://issues.apache.org/jira/browse/SPARK-30812
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, SQL
>Affects Versions: 3.0.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
> Fix For: 3.0.0
>
>
> config naming policy:
> http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-naming-policy-of-Spark-configs-td28875.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30866) FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

2020-02-18 Thread Jungtaek Lim (Jira)
Jungtaek Lim created SPARK-30866:


 Summary: FileStreamSource: Cache fetched list of files beyond 
maxFilesPerTrigger as unread files
 Key: SPARK-30866
 URL: https://issues.apache.org/jira/browse/SPARK-30866
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.0.0
Reporter: Jungtaek Lim


FileStreamSource fetches the available files per batch which is a "heavy cost" 
operation.

(E.g. It took around 5 seconds to list leaf files for 95 paths which contain 
674,811 files. It's not even in HDFS path - it's local filesystem.)

If "maxFilesPerTrigger" is not set, Spark would consume all the fetched files. 
After the batch has been completed, it's obvious for Spark to fetch per micro 
batch.

If "latestFirst" is true (regardless of "maxFilesPerTrigger"), the files to 
process should be updated per batch, so it's also obvious for Spark to fetch 
per micro batch.

Except above cases (in short, maxFilesPerTrigger is being set and latestFirst 
is false), the files to process can be "continuous" - we can "cache" the 
fetched list of files and consume until the list has been exhausted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30857) Wrong truncations of timestamps before the epoch to hours and days

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-30857.
-
Fix Version/s: 2.4.6
   Resolution: Fixed

Issue resolved by pull request 27612
[https://github.com/apache/spark/pull/27612]

> Wrong truncations of timestamps before the epoch to hours and days
> --
>
> Key: SPARK-30857
> URL: https://issues.apache.org/jira/browse/SPARK-30857
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.4, 2.4.5
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Major
>  Labels: correctness
> Fix For: 2.4.6
>
>
> Truncations to seconds and minutes of timestamps after the epoch are correct:
> {code:sql}
> spark-sql> select date_trunc('HOUR', '2020-02-11 00:01:02.123'), 
> date_trunc('HOUR', '2020-02-11 00:01:02.789');
> 2020-02-11 00:00:00   2020-02-11 00:00:00
> {code}
> but truncations of timestamps before the epoch are incorrect:
> {code:sql}
> spark-sql> select date_trunc('HOUR', '1960-02-11 00:01:02.123'), 
> date_trunc('HOUR', '1960-02-11 00:01:02.789');
> 1960-02-11 01:00:00   1960-02-11 01:00:00
> {code}
> The result must be *1960-02-11 00:00:00 1960-02-11 00:00:00*
> The same for the DAY level:
> {code:sql}
> spark-sql> select date_trunc('DAY', '1960-02-11 00:01:02.123'), 
> date_trunc('DAY', '1960-02-11 00:01:02.789');
> 1960-02-12 00:00:00   1960-02-12 00:00:00
> {code}
> The result must be *1960-02-11 00:00:00 1960-02-11 00:00:00*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-30857) Wrong truncations of timestamps before the epoch to hours and days

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-30857:
---

Assignee: Maxim Gekk

> Wrong truncations of timestamps before the epoch to hours and days
> --
>
> Key: SPARK-30857
> URL: https://issues.apache.org/jira/browse/SPARK-30857
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.4, 2.4.5
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Major
>  Labels: correctness
>
> Truncations to seconds and minutes of timestamps after the epoch are correct:
> {code:sql}
> spark-sql> select date_trunc('HOUR', '2020-02-11 00:01:02.123'), 
> date_trunc('HOUR', '2020-02-11 00:01:02.789');
> 2020-02-11 00:00:00   2020-02-11 00:00:00
> {code}
> but truncations of timestamps before the epoch are incorrect:
> {code:sql}
> spark-sql> select date_trunc('HOUR', '1960-02-11 00:01:02.123'), 
> date_trunc('HOUR', '1960-02-11 00:01:02.789');
> 1960-02-11 01:00:00   1960-02-11 01:00:00
> {code}
> The result must be *1960-02-11 00:00:00 1960-02-11 00:00:00*
> The same for the DAY level:
> {code:sql}
> spark-sql> select date_trunc('DAY', '1960-02-11 00:01:02.123'), 
> date_trunc('DAY', '1960-02-11 00:01:02.789');
> 1960-02-12 00:00:00   1960-02-12 00:00:00
> {code}
> The result must be *1960-02-11 00:00:00 1960-02-11 00:00:00*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27733) Upgrade to Avro 1.9.2

2020-02-18 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-27733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated SPARK-27733:
-
Description: Avro 1.9.2 was released with many nice features including 
reduced size (1MB less), and removed dependencies, no paranamer, no shaded 
guava, security updates, so probably a worth upgrade.  (was: Avro 1.9.2 was 
released with many nice features including reduced size (1MB less), and removed 
dependencies, no paranmer, no shaded guava, security updates, so probably a 
worth upgrade.)

> Upgrade to Avro 1.9.2
> -
>
> Key: SPARK-27733
> URL: https://issues.apache.org/jira/browse/SPARK-27733
> Project: Spark
>  Issue Type: Improvement
>  Components: Build, SQL
>Affects Versions: 3.0.0
>Reporter: Ismaël Mejía
>Priority: Minor
>
> Avro 1.9.2 was released with many nice features including reduced size (1MB 
> less), and removed dependencies, no paranamer, no shaded guava, security 
> updates, so probably a worth upgrade.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27733) Upgrade to Avro 1.9.2

2020-02-18 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-27733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated SPARK-27733:
-
Description: Avro 1.9.2 was rel%ased with many nice features including 
reduced size (1MB less), and removed dependencies, no paranmer, no shaded 
guava, security updates, so probably a worth upgrade.  (was: Avro 1.9.0 was 
released with many nice features including reduced size (1MB less), and removed 
dependencies, no paranmer, no shaded guava, security updates, so probably a 
worth upgrade.)

> Upgrade to Avro 1.9.2
> -
>
> Key: SPARK-27733
> URL: https://issues.apache.org/jira/browse/SPARK-27733
> Project: Spark
>  Issue Type: Improvement
>  Components: Build, SQL
>Affects Versions: 3.0.0
>Reporter: Ismaël Mejía
>Priority: Minor
>
> Avro 1.9.2 was released with many nice features including reduced size (1MB 
> less), and removed dependencies, no paranmer, no shaded guava, security 
> updates, so probably a worth upgrade.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26346) Upgrade parquet to 1.11.1

2020-02-18 Thread H. Vetinari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039254#comment-17039254
 ] 

H. Vetinari commented on SPARK-26346:
-

Parquet has bumped it's Avro-dependency to 1.9.2 recently (PARQUET-1796), but 
it's only tagged for parquet 1.12.0; Parquet 1.11.1 seems to have been finished 
content-wise, but has not been released yet.

> Upgrade parquet to 1.11.1
> -
>
> Key: SPARK-26346
> URL: https://issues.apache.org/jira/browse/SPARK-26346
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30867) add FValueRegressionTest

2020-02-18 Thread Huaxin Gao (Jira)
Huaxin Gao created SPARK-30867:
--

 Summary: add FValueRegressionTest
 Key: SPARK-30867
 URL: https://issues.apache.org/jira/browse/SPARK-30867
 Project: Spark
  Issue Type: Sub-task
  Components: ML
Affects Versions: 3.1.0
Reporter: Huaxin Gao


Add FValueRegressionTest in ML.stat. This will be used for 
FValueRegressionSelector. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30868) Throw Exception if runHive(sql) failed

2020-02-18 Thread Jackey Lee (Jira)
Jackey Lee created SPARK-30868:
--

 Summary: Throw Exception if runHive(sql) failed
 Key: SPARK-30868
 URL: https://issues.apache.org/jira/browse/SPARK-30868
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0, 3.1.0
Reporter: Jackey Lee


At present, HiveClientImpl.runHive will not throw an exception when it runs 
incorrectly, which will cause it to fail to feedback error information normally.
Example
{code:scala}
spark.sql("add jar file:///tmp/test.jar").show()
spark.sql("show databases").show()
{code}
/tmp/test.jar doesn't exist, thus add jar is failed. However this code will run 
completely without causing application failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30838) Add missing pages to documentation top navigation menu

2020-02-18 Thread Nicholas Chammas (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-30838:
-
Description: 
There are a few pages tracked in `docs/` that are not linked to from the top 
navigation or from the index page. Seems unintentional.

To make these pages easier to discover, we should at least list them in the 
index and link to them from other pages as appropriate.

  was:
There are a few pages tracked in `docs/` that are not linked to from the top 
navigation. Seems unintentional.

To make these pages easier to discover, we should listed up top and link to 
them from other documentation pages as appropriate.


> Add missing pages to documentation top navigation menu
> --
>
> Key: SPARK-30838
> URL: https://issues.apache.org/jira/browse/SPARK-30838
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 3.0.0
>Reporter: Nicholas Chammas
>Priority: Trivial
>
> There are a few pages tracked in `docs/` that are not linked to from the top 
> navigation or from the index page. Seems unintentional.
> To make these pages easier to discover, we should at least list them in the 
> index and link to them from other pages as appropriate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30838) Add missing pages to documentation index

2020-02-18 Thread Nicholas Chammas (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-30838:
-
Summary: Add missing pages to documentation index  (was: Add missing pages 
to documentation top navigation menu)

> Add missing pages to documentation index
> 
>
> Key: SPARK-30838
> URL: https://issues.apache.org/jira/browse/SPARK-30838
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 3.0.0
>Reporter: Nicholas Chammas
>Priority: Trivial
>
> There are a few pages tracked in `docs/` that are not linked to from the top 
> navigation or from the index page. Seems unintentional.
> To make these pages easier to discover, we should at least list them in the 
> index and link to them from other pages as appropriate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30838) Add missing pages to documentation index

2020-02-18 Thread Nicholas Chammas (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039351#comment-17039351
 ] 

Nicholas Chammas commented on SPARK-30838:
--

Actually, it looks like the pages I wanted to add (hadoop-provided, cloud 
infra, web ui) already have references from one place or other. They're not as 
discoverable as I'd like, but I don't have a good idea as to how to improve 
things right now so I'm going to close this.

> Add missing pages to documentation index
> 
>
> Key: SPARK-30838
> URL: https://issues.apache.org/jira/browse/SPARK-30838
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 3.0.0
>Reporter: Nicholas Chammas
>Priority: Trivial
>
> There are a few pages tracked in `docs/` that are not linked to from the top 
> navigation or from the index page. Seems unintentional.
> To make these pages easier to discover, we should at least list them in the 
> index and link to them from other pages as appropriate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30838) Add missing pages to documentation index

2020-02-18 Thread Nicholas Chammas (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas resolved SPARK-30838.
--
Resolution: Won't Fix

> Add missing pages to documentation index
> 
>
> Key: SPARK-30838
> URL: https://issues.apache.org/jira/browse/SPARK-30838
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 3.0.0
>Reporter: Nicholas Chammas
>Priority: Trivial
>
> There are a few pages tracked in `docs/` that are not linked to from the top 
> navigation or from the index page. Seems unintentional.
> To make these pages easier to discover, we should at least list them in the 
> index and link to them from other pages as appropriate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30869) Convert dates to/from timestamps in microseconds precision

2020-02-18 Thread Maxim Gekk (Jira)
Maxim Gekk created SPARK-30869:
--

 Summary: Convert dates to/from timestamps in microseconds precision
 Key: SPARK-30869
 URL: https://issues.apache.org/jira/browse/SPARK-30869
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maxim Gekk


Currently, Spark converts dates to/from "timestamp" in millisecond precision 
but internally Catalyst's TimestampType values are stored as microseconds since 
epoch. When such conversion is needed in other date-timestamp functions like 
DateTimeUtils.monthsBetween, the function has to convert microseconds to 
milliseconds and then to days, see 
https://github.com/apache/spark/blob/06217cfded8d32962e7c54c315f8e684eb9f0999/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L577-L580
 which just brings additional overhead w/o any benefits.

In later versions, it makes sense because milliseconds can be passed to 
TimeZone.getOffset but recently Spark switched to Java 8 time API and ZoneId. 
And supporting conversions to milliseconds are not needed any more.

The ticket aims to replace millisToDays by microsToDays, and daysToMillis by 
daysToMicros in DateTimeUtils.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28067) Incorrect results in decimal aggregation with whole-stage code gen enabled

2020-02-18 Thread Sunitha Kambhampati (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039403#comment-17039403
 ] 

Sunitha Kambhampati commented on SPARK-28067:
-

I looked into this issue and here are some of my notes. 

*Issue:*

Wrong results are returned for aggregate sum with decimals with whole stage 
codegen enabled 

*Repro:* 

Whole Stage codegen enabled -> Wrong results

Whole Stage codegen disabled -> Returns exception Decimal precision 39 exceeds 
max precision 38 

*Issues:* 

1: Wrong results are returned which is bad 

2: Inconsistency between whole stage enabled and disabled. 

 

*Cause:*

Sum does not take care of possibility of decimal overflow for the intermediate 
steps.  ie the updateExpressions and mergeExpressions.  

 

*Some ways to fix this:* 

+Approach 1:+  Do not return wrong results for this scenario, throw exception 
like whole stage enabled.  DB’s do similar, so there is precedence.  

Pros: 

- No wrong results

- Consistent behavior between wholestage enabled and disabled

- DB’s have similar existing behavior, there is precedence

 

+Approach 2:+ 

By default: Return null on overflow in the sum operation

But if you set spark.sql.ansi.enabled to true, and then it will throw 
exception. 

 

Pros:

- Maybe ok for users who can tolerate sum to be null on overflow. 

- Consistent with the spark.sql.ansi.enabled behavior

 

Cons:

- This will still keep inconsistency between codegen enabled and disabled. 

 

For those interested, there are some JIRA’s that were fixed for v3.0 which do 
the following: 
 * SPARK-23179, Throw null on overflow for decimal operations.   This does not 
kick in for sum for the use case above. 
 * SPARK-28224, that took care of decimal overflow for sum only partially for 2 
values.   If you add another row into the dataset, it will return wrong results

 --

That said, I think both Approach 1 and  Approach 2 will resolve the wrong 
results which is bad.  

 

Approach 1 is straightforward.   But in the pr’s related to overflow, I think 
the preference is to have it under a spark.sql.ansi.enabled flag which defaults 
to false and return null on overflow. 

I think Approach 2 is not as straightforward.  I have an implementation that 
will fix this. 

I can open 2 prs that implement each of the approach, and would like to get 
comments.  I have run the sql, catalyst and hive tests and they all pass.   

Please let me know your comments.   Thanks. 

 cc [~dongjoon], [~LI,Xiao], [~cloud_fan] [~hyukjin.kwon] [~hvanhovell] 
[~javier_ivanov] [~msirek]

> Incorrect results in decimal aggregation with whole-stage code gen enabled
> --
>
> Key: SPARK-28067
> URL: https://issues.apache.org/jira/browse/SPARK-28067
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.4, 2.4.4, 3.0.0
>Reporter: Mark Sirek
>Priority: Critical
>  Labels: correctness
>
> The following test case involving a join followed by a sum aggregation 
> returns the wrong answer for the sum:
>  
> {code:java}
> val df = Seq(
>  (BigDecimal("1000"), 1),
>  (BigDecimal("1000"), 1),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2)).toDF("decNum", "intNum")
> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, 
> "intNum").agg(sum("decNum"))
> scala> df2.show(40,false)
>  ---
> sum(decNum)
> ---
> 4000.00
> ---
>  
> {code}
>  
> The result should be 104000..
> It appears a partial sum is computed for each join key, as the result 
> returned would be the answer for all rows matching intNum === 1.
> If only the rows with intNum === 2 are included, the answer given is null:
>  
> {code:java}
> scala> val df3 = df.filter($"intNum" === lit(2))
>  df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [decNum: 
> decimal(38,18), intNum: int]
> scala> val df4 = df3.withColumnRenamed("decNum", "decNum2").join(df3, 
> "intNum").agg(sum("decNum"))
>  df4: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]
> scala> df4.show(40,false)
>  ---
> sum(decNum)
> ---
> null
> ---
>  
> {code}
>  
> The correct answer, 10.

[jira] [Commented] (SPARK-30858) IntegralDivide's dataType should not depend on SQLConf.get

2020-02-18 Thread Maxim Gekk (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039410#comment-17039410
 ] 

Maxim Gekk commented on SPARK-30858:


> This is a problem because the configuration can change between different 
> phases of planning

[~hvanhovell] Is the code below right solution for the problem?
{code:scala}
case class IntegralDivide(
  left: Expression,
  right: Expression,
  integralDivideReturnLong: Boolean) extends DivModLike {

  def this(left: Expression, right: Expression) = {
this(left, right, SQLConf.get.integralDivideReturnLong)
  }

  override def dataType: DataType = if (integralDivideReturnLong) {
LongType
  } else {
left.dataType
  }
{code}

> IntegralDivide's dataType should not depend on SQLConf.get
> --
>
> Key: SPARK-30858
> URL: https://issues.apache.org/jira/browse/SPARK-30858
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Herman van Hövell
>Priority: Blocker
>
> {{IntegralDivide}}'s dataType depends on the value of 
> {{SQLConf.get.integralDivideReturnLong}}. This is a problem because the 
> configuration can change between different phases of planning, and this can 
> silently break a query plan which can lead to crashes or data corruption. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30858) IntegralDivide's dataType should not depend on SQLConf.get

2020-02-18 Thread Jira


[ 
https://issues.apache.org/jira/browse/SPARK-30858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039414#comment-17039414
 ] 

Herman van Hövell commented on SPARK-30858:
---

That would work. I would sort of prefer if we can fix this during analysis, 
instead of relying on SQLConf.get for which we sometimes are not super sure 
where it comes from.

> IntegralDivide's dataType should not depend on SQLConf.get
> --
>
> Key: SPARK-30858
> URL: https://issues.apache.org/jira/browse/SPARK-30858
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Herman van Hövell
>Priority: Blocker
>
> {{IntegralDivide}}'s dataType depends on the value of 
> {{SQLConf.get.integralDivideReturnLong}}. This is a problem because the 
> configuration can change between different phases of planning, and this can 
> silently break a query plan which can lead to crashes or data corruption. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-29663) Support sum with interval type values

2020-02-18 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-29663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hövell reopened SPARK-29663:
---

This was reverted in 
[https://github.com/apache/spark/commit/1b67d546bd96412943de0c7a3b4295cbde887bd2]

> Support sum with interval type values
> -
>
> Key: SPARK-29663
> URL: https://issues.apache.org/jira/browse/SPARK-29663
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Kent Yao
>Assignee: Kent Yao
>Priority: Major
> Fix For: 3.0.0
>
>
> {code:sql}
> postgres=# SELECT i, Sum(cast(v as interval)) OVER (ORDER BY i ROWS BETWEEN 
> CURRENT ROW AND UNBOUNDED FOLLOWING) FROM (VALUES(1,'1 sec'),(2,'2 
> sec'),(3,NULL),(4,NULL)) t(i,v); 
> i | sum ---+-- 
> 1 | 00:00:03 
> 2 | 00:00:02
> 3 | 
> 4 | 
> (4 rows)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-29663) Support sum with interval type values

2020-02-18 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-29663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hövell reassigned SPARK-29663:
-

Assignee: (was: Kent Yao)

> Support sum with interval type values
> -
>
> Key: SPARK-29663
> URL: https://issues.apache.org/jira/browse/SPARK-29663
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Kent Yao
>Priority: Major
> Fix For: 3.0.0
>
>
> {code:sql}
> postgres=# SELECT i, Sum(cast(v as interval)) OVER (ORDER BY i ROWS BETWEEN 
> CURRENT ROW AND UNBOUNDED FOLLOWING) FROM (VALUES(1,'1 sec'),(2,'2 
> sec'),(3,NULL),(4,NULL)) t(i,v); 
> i | sum ---+-- 
> 1 | 00:00:03 
> 2 | 00:00:02
> 3 | 
> 4 | 
> (4 rows)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-29688) Support average with interval type values

2020-02-18 Thread Jira


 [ 
https://issues.apache.org/jira/browse/SPARK-29688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hövell reopened SPARK-29688:
---
  Assignee: (was: Kent Yao)

This was reverted in 
[https://github.com/apache/spark/commit/1b67d546bd96412943de0c7a3b4295cbde887bd2]

> Support average with interval type values
> -
>
> Key: SPARK-29688
> URL: https://issues.apache.org/jira/browse/SPARK-29688
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Kent Yao
>Priority: Major
> Fix For: 3.0.0
>
>
> add average aggegate support for spark



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30858) IntegralDivide's dataType should not depend on SQLConf.get

2020-02-18 Thread Maxim Gekk (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039433#comment-17039433
 ] 

Maxim Gekk commented on SPARK-30858:


The *div* function binds on this particular expressions 
[https://github.com/apache/spark/blob/919d551ddbf7575abe7fe47d4bbba62164d6d845/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L282]
 . I am not sure that we can replace it during analysis.

> IntegralDivide's dataType should not depend on SQLConf.get
> --
>
> Key: SPARK-30858
> URL: https://issues.apache.org/jira/browse/SPARK-30858
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Herman van Hövell
>Priority: Blocker
>
> {{IntegralDivide}}'s dataType depends on the value of 
> {{SQLConf.get.integralDivideReturnLong}}. This is a problem because the 
> configuration can change between different phases of planning, and this can 
> silently break a query plan which can lead to crashes or data corruption. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-30858) IntegralDivide's dataType should not depend on SQLConf.get

2020-02-18 Thread Maxim Gekk (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039433#comment-17039433
 ] 

Maxim Gekk edited comment on SPARK-30858 at 2/18/20 8:29 PM:
-

The *div* function binds on this particular expression 
[https://github.com/apache/spark/blob/919d551ddbf7575abe7fe47d4bbba62164d6d845/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L282]
 . I am not sure that we can replace it during analysis.


was (Author: maxgekk):
The *div* function binds on this particular expressions 
[https://github.com/apache/spark/blob/919d551ddbf7575abe7fe47d4bbba62164d6d845/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L282]
 . I am not sure that we can replace it during analysis.

> IntegralDivide's dataType should not depend on SQLConf.get
> --
>
> Key: SPARK-30858
> URL: https://issues.apache.org/jira/browse/SPARK-30858
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Herman van Hövell
>Priority: Blocker
>
> {{IntegralDivide}}'s dataType depends on the value of 
> {{SQLConf.get.integralDivideReturnLong}}. This is a problem because the 
> configuration can change between different phases of planning, and this can 
> silently break a query plan which can lead to crashes or data corruption. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24497) ANSI SQL: Recursive query

2020-02-18 Thread Daniel Mateus Pires (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039498#comment-17039498
 ] 

Daniel Mateus Pires commented on SPARK-24497:
-

Hey! the PR linked to this issue has merge conflicts and reviewers didn't come 
back to it for another round of reviews, just wanted to notify on this thread 
that this feature would be very useful :+1:

> ANSI SQL: Recursive query
> -
>
> Key: SPARK-24497
> URL: https://issues.apache.org/jira/browse/SPARK-24497
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> h3. *Examples*
> Here is an example for {{WITH RECURSIVE}} clause usage. Table "department" 
> represents the structure of an organization as an adjacency list.
> {code:sql}
> CREATE TABLE department (
> id INTEGER PRIMARY KEY,  -- department ID
> parent_department INTEGER REFERENCES department, -- upper department ID
> name TEXT -- department name
> );
> INSERT INTO department (id, parent_department, "name")
> VALUES
>  (0, NULL, 'ROOT'),
>  (1, 0, 'A'),
>  (2, 1, 'B'),
>  (3, 2, 'C'),
>  (4, 2, 'D'),
>  (5, 0, 'E'),
>  (6, 4, 'F'),
>  (7, 5, 'G');
> -- department structure represented here is as follows:
> --
> -- ROOT-+->A-+->B-+->C
> --  | |
> --  | +->D-+->F
> --  +->E-+->G
> {code}
>  
>  To extract all departments under A, you can use the following recursive 
> query:
> {code:sql}
> WITH RECURSIVE subdepartment AS
> (
> -- non-recursive term
> SELECT * FROM department WHERE name = 'A'
> UNION ALL
> -- recursive term
> SELECT d.*
> FROM
> department AS d
> JOIN
> subdepartment AS sd
> ON (d.parent_department = sd.id)
> )
> SELECT *
> FROM subdepartment
> ORDER BY name;
> {code}
> More details:
> [http://wiki.postgresql.org/wiki/CTEReadme]
> [https://info.teradata.com/htmlpubs/DB_TTU_16_00/index.html#page/SQL_Reference/B035-1141-160K/lqe1472241402390.html]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30870) catalyst inception of lateral view explode with struct raise a Catalyst error

2020-02-18 Thread Thomas Prelle (Jira)
Thomas Prelle created SPARK-30870:
-

 Summary: catalyst inception of lateral view explode with struct 
raise a Catalyst error
 Key: SPARK-30870
 URL: https://issues.apache.org/jira/browse/SPARK-30870
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 3.0.0
Reporter: Thomas Prelle


On spark 3.0.0.preview2 version I found a bug who are not in 3.0.0.preview 
version.

With the table 
{code:java}
spark.sql("select * from tmp").printSchema
root
 |-- value: struct (nullable = true)
 | |-- array: array (nullable = true)
 | | |-- element: struct (containsNull = true)
 | | | |-- subarray: array (nullable = true)
 | | | | |-- element: struct (containsNull = true)
 | | | | | |-- key1: string (nullable = true)
 | | | | | |-- key2: string (nullable = true)
{code}
when you try a double lateral view explode 
{code:java}
spark.sql("select subexplod.* from tmp lateral view explode(tmp.value.array) 
explod as array_explod  lateral view explode(explod.array_explod.subarray) 
subexplod").show()
{code}
It's raising an error :


{code:java}
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: _gen_alias_127#127
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
  at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
  at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:74)
  at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:96)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReferences(BoundAttribute.scala:96)
  at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
  at 
org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
  at 
org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
  at 
org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:496)
  at 
org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:483)
  at 
org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:456)
  at 
org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:496)
  at 
org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
  at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:212)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:209)
  at 
org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
  at 
org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
  at 
org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:496)
  at 
org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:51)
  at 
org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
  at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:212)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:209)
  at 
org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scal

[jira] [Resolved] (SPARK-30811) CTE that refers to non-existent table with same name causes StackOverflowError

2020-02-18 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-30811.
---
Fix Version/s: 2.4.6
   Resolution: Fixed

Issue resolved by pull request 27562
[https://github.com/apache/spark/pull/27562]

> CTE that refers to non-existent table with same name causes StackOverflowError
> --
>
> Key: SPARK-30811
> URL: https://issues.apache.org/jira/browse/SPARK-30811
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.4, 2.4.5
>Reporter: Herman van Hövell
>Assignee: Herman van Hövell
>Priority: Major
> Fix For: 2.4.6
>
>
> The following query causes a StackOverflowError:
> {noformat}
> WITH t AS (SELECT 1 FROM nonexist.t) SELECT * FROM t
> {noformat}
> This only happens when the CTE refers to a non-existent table with the same 
> name and a database qualifier. This is caused by a couple of things:
>  * {{CTESubstitution}} runs analysis on the CTE, but this does not throw an 
> exception because the table has a database qualifier. The reason is that we 
> don't fail is because we re-attempt to resolve the relation in a later rule.
>  * {{CTESubstitution}} replace logic does not check if the table it is 
> replacing has a database, it shouldn't replace the relation if it does. So 
> now we will happily replace {{nonexist.t}} with {{t}}.
>  * {{CTESubstitution}} transforms down, this means it will keep replacing 
> {{t}} with itself, creating an infinite recursion.
> This is not an issue for master/3.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26675) Error happened during creating avro files

2020-02-18 Thread Thomas Humphries (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039508#comment-17039508
 ] 

Thomas Humphries commented on SPARK-26675:
--

*bump 2020* - issue still occurs with Spark 2.4.3 and Avro 2.12, using any of 
PySpark, Scala jar (2.12.8), or Java jar (1.8).  Any use of 

{code:java}
df.write.format("avro")
{code}
results in:

{code:java}
Exception in thread "main" java.lang.NoSuchMethodError: 
org.apache.avro.Schema.createUnion([Lorg/apache/avro/Schema;)Lorg/apache/avro/Schema;
at 
org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:185)
{code}
... etc as per OP.

> Error happened during creating avro files
> -
>
> Key: SPARK-26675
> URL: https://issues.apache.org/jira/browse/SPARK-26675
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.4.0
>Reporter: Tony Mao
>Priority: Major
>
> Run cmd
> {code:java}
> spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 
> /nke/reformat.py
> {code}
> code in reformat.py
> {code:java}
> df = spark.read.option("multiline", "true").json("file:///nke/example1.json")
> df.createOrReplaceTempView("traffic")
> a = spark.sql("""SELECT store.*, store.name as store_name, 
> store.dataSupplierId as store_dataSupplierId, trafficSensor.*,
> trafficSensor.name as trafficSensor_name, trafficSensor.dataSupplierId as 
> trafficSensor_dataSupplierId, readings.*
> FROM (SELECT explode(stores) as store, explode(store.trafficSensors) as 
> trafficSensor,
> explode(trafficSensor.trafficSensorReadings) as readings FROM traffic)""")
> b = a.drop("trafficSensors", "trafficSensorReadings", "name", 
> "dataSupplierId")
> b.write.format("avro").save("file:///nke/curated/namesAndFavColors.avro")
> {code}
> Error message:
> {code:java}
> Traceback (most recent call last):
> File "/nke/reformat.py", line 18, in 
> b.select("store_name", 
> "store_dataSupplierId").write.format("avro").save("file:///nke/curated/namesAndFavColors.avro")
> File "/usr/spark-2.4.0/python/lib/pyspark.zip/pyspark/sql/readwriter.py", 
> line 736, in save
> File "/usr/spark-2.4.0/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 
> line 1257, in __call__
> File "/usr/spark-2.4.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, 
> in deco
> File "/usr/spark-2.4.0/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 
> 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o45.save.
> : java.lang.NoSuchMethodError: 
> org.apache.avro.Schema.createUnion([Lorg/apache/avro/Schema;)Lorg/apache/avro/Schema;
> at 
> org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:185)
> at 
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:176)
> at 
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:174)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
> at 
> org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:174)
> at 
> org.apache.spark.sql.avro.AvroFileFormat$$anonfun$7.apply(AvroFileFormat.scala:118)
> at 
> org.apache.spark.sql.avro.AvroFileFormat$$anonfun$7.apply(AvroFileFormat.scala:118)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.sql.avro.AvroFileFormat.prepareWrite(AvroFileFormat.scala:118)
> at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:103)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at 
> org.apache.spark.sql.exe

[jira] [Commented] (SPARK-28067) Incorrect results in decimal aggregation with whole-stage code gen enabled

2020-02-18 Thread Sunitha Kambhampati (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039516#comment-17039516
 ] 

Sunitha Kambhampati commented on SPARK-28067:
-

I have submitted the two pr's for the two approaches I mention above in my 
comment. 

Approach 1:  Throw exception instead of returning wrong results: 
[https://github.com/apache/spark/pull/27629]

 

Approach 2:

Return null on decimal overflow for aggregate sum when 
spark.sql.ansi.enabled=false,  

Throw exception on decimal overflow for aggregate sum when 
spark.sql.ansi.enabled =true.

[https://github.com/apache/spark/pull/27627] (WIP)

> Incorrect results in decimal aggregation with whole-stage code gen enabled
> --
>
> Key: SPARK-28067
> URL: https://issues.apache.org/jira/browse/SPARK-28067
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.4, 2.4.4, 3.0.0
>Reporter: Mark Sirek
>Priority: Critical
>  Labels: correctness
>
> The following test case involving a join followed by a sum aggregation 
> returns the wrong answer for the sum:
>  
> {code:java}
> val df = Seq(
>  (BigDecimal("1000"), 1),
>  (BigDecimal("1000"), 1),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2),
>  (BigDecimal("1000"), 2)).toDF("decNum", "intNum")
> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, 
> "intNum").agg(sum("decNum"))
> scala> df2.show(40,false)
>  ---
> sum(decNum)
> ---
> 4000.00
> ---
>  
> {code}
>  
> The result should be 104000..
> It appears a partial sum is computed for each join key, as the result 
> returned would be the answer for all rows matching intNum === 1.
> If only the rows with intNum === 2 are included, the answer given is null:
>  
> {code:java}
> scala> val df3 = df.filter($"intNum" === lit(2))
>  df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [decNum: 
> decimal(38,18), intNum: int]
> scala> val df4 = df3.withColumnRenamed("decNum", "decNum2").join(df3, 
> "intNum").agg(sum("decNum"))
>  df4: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]
> scala> df4.show(40,false)
>  ---
> sum(decNum)
> ---
> null
> ---
>  
> {code}
>  
> The correct answer, 10., doesn't fit in 
> the DataType picked for the result, decimal(38,18), so an overflow occurs, 
> which Spark then converts to null.
> The first example, which doesn't filter out the intNum === 1 values should 
> also return null, indicating overflow, but it doesn't.  This may mislead the 
> user to think a valid sum was computed.
> If whole-stage code gen is turned off:
> spark.conf.set("spark.sql.codegen.wholeStage", false)
> ... incorrect results are not returned because the overflow is caught as an 
> exception:
> java.lang.IllegalArgumentException: requirement failed: Decimal precision 39 
> exceeds max precision 38
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30871) Protobuf vulnerability

2020-02-18 Thread Florencia Puppo (Jira)
Florencia Puppo created SPARK-30871:
---

 Summary: Protobuf vulnerability
 Key: SPARK-30871
 URL: https://issues.apache.org/jira/browse/SPARK-30871
 Project: Spark
  Issue Type: Dependency upgrade
  Components: Build
Affects Versions: 2.4.5
Reporter: Florencia Puppo


 Protobuf version 2.5.0 is vulnerable to Integer Overflow by allowing remote 
authenticated attackers to cause a heap-based buffer overflow in serialisation 
process. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27213) Unexpected results when filter is used after distinct

2020-02-18 Thread Sunitha Kambhampati (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039556#comment-17039556
 ] 

Sunitha Kambhampati commented on SPARK-27213:
-

I tried it now on master trunk, synced up to Feb11, 2020 commit 

45db48e2d29359591a4ebc3db4625dd2158e446e  and I see the same behavior as 
mentioned in the description. 

 

{code:java}
>>> df = 
>>> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
>>> df.show(5)
+---+---++---+  
|  x|  y|   z|y_n|
+---+---++---+
|  a|123|12.3|  n|
|  a|123|12.3|  y|
|  a|123|12.4|  y|
+---+---++---+

>>> df.filter("y_n='y'").select('x','y','z').distinct().show()
+---+---++
|  x|  y|   z|
+---+---++
|  a|123|12.3|
|  a|123|12.4|
+---+---++

>>> df.select('x','y','z').distinct().filter("y_n='y'").show()
+---+---++
|  x|  y|   z|
+---+---++
|  a|123|12.4|
+---+---++
{code}

 

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Blocker
>  Labels: correctness, distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#

[jira] [Commented] (SPARK-24894) Invalid DNS name due to hostname truncation

2020-02-18 Thread Thi Nguyen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039580#comment-17039580
 ] 

Thi Nguyen commented on SPARK-24894:


Why is Fix Version 3.0.0? This looks like a bug fix to me, so should've been in 
a patch version?

> Invalid DNS name due to hostname truncation 
> 
>
> Key: SPARK-24894
> URL: https://issues.apache.org/jira/browse/SPARK-24894
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.1
>Reporter: Dharmesh Kakadia
>Assignee: Marcelo Masiero Vanzin
>Priority: Major
> Fix For: 3.0.0
>
>
> The truncation for hostname happening here 
> [https://github.com/apache/spark/blob/5ff1b9ba1983d5601add62aef64a3e87d07050eb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala#L77]
>   is a problematic and can lead to DNS names starting with "-". 
> Originally filled here : 
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/229
> ```
> {{2018-07-23 21:21:42 ERROR Utils:91 - Uncaught exception in thread 
> kubernetes-pod-allocator 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> POST at: https://kubernetes.default.svc/api/v1/namespaces/default/pods. 
> Message: Pod 
> "user-archetypes-all-weekly-1532380861251850404-1532380862321-exec-9" is 
> invalid: spec.hostname: Invalid value: 
> "-archetypes-all-weekly-1532380861251850404-1532380862321-exec-9": a DNS-1123 
> label must consist of lower case alphanumeric characters or '-', and must 
> start and end with an alphanumeric character (e.g. 'my-name', or '123-abc', 
> regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?'). Received 
> status: Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=[StatusCause(field=spec.hostname, 
> message=Invalid value: 
> "-archetypes-all-weekly-1532380861251850404-1532380862321-exec-9": a DNS-1123 
> label must consist of lower case alphanumeric characters or '-', and must 
> start and end with an alphanumeric character (e.g. 'my-name', or '123-abc', 
> regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?'), 
> reason=FieldValueInvalid, additionalProperties={})], group=null, kind=Pod, 
> name=user-archetypes-all-weekly-1532380861251850404-1532380862321-exec-9, 
> retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
> message=Pod 
> "user-archetypes-all-weekly-1532380861251850404-1532380862321-exec-9" is 
> invalid: spec.hostname: Invalid value: 
> "-archetypes-all-weekly-1532380861251850404-1532380862321-exec-9": a DNS-1123 
> label must consist of lower case alphanumeric characters or '-', and must 
> start and end with an alphanumeric character (e.g. 'my-name', or '123-abc', 
> regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?'), 
> metadata=ListMeta(resourceVersion=null, selfLink=null, 
> additionalProperties={}), reason=Invalid, status=Failure, 
> additionalProperties={}). at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:470)
>  at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:409)
>  at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:379)
>  at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:343)
>  at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:226)
>  at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:769)
>  at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:356)
>  at 
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend$$anon$1$$anonfun$3$$anonfun$apply$3.apply(KubernetesClusterSchedulerBackend.scala:140)
>  at 
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend$$anon$1$$anonfun$3$$anonfun$apply$3.apply(KubernetesClusterSchedulerBackend.scala:140)
>  at org.apache.spark.util.Utils$.tryLog(Utils.scala:1922) at 
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend$$anon$1$$anonfun$3.apply(KubernetesClusterSchedulerBackend.scala:139)
>  at 
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend$$anon$1$$anonfun$3.apply(KubernetesClusterSchedulerBackend.scala:138)
>  at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>  at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) 
> at 
> scal

[jira] [Commented] (SPARK-30869) Convert dates to/from timestamps in microseconds precision

2020-02-18 Thread YoungGyu Chun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039671#comment-17039671
 ] 

YoungGyu Chun commented on SPARK-30869:
---

I am working on this

> Convert dates to/from timestamps in microseconds precision
> --
>
> Key: SPARK-30869
> URL: https://issues.apache.org/jira/browse/SPARK-30869
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Maxim Gekk
>Priority: Minor
>
> Currently, Spark converts dates to/from "timestamp" in millisecond precision 
> but internally Catalyst's TimestampType values are stored as microseconds 
> since epoch. When such conversion is needed in other date-timestamp functions 
> like DateTimeUtils.monthsBetween, the function has to convert microseconds to 
> milliseconds and then to days, see 
> https://github.com/apache/spark/blob/06217cfded8d32962e7c54c315f8e684eb9f0999/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L577-L580
>  which just brings additional overhead w/o any benefits.
> In later versions, it makes sense because milliseconds can be passed to 
> TimeZone.getOffset but recently Spark switched to Java 8 time API and ZoneId. 
> And supporting conversions to milliseconds are not needed any more.
> The ticket aims to replace millisToDays by microsToDays, and daysToMillis by 
> daysToMicros in DateTimeUtils.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30868) Throw Exception if runHive(sql) failed

2020-02-18 Thread Ankit Raj Boudh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039694#comment-17039694
 ] 

Ankit Raj Boudh commented on SPARK-30868:
-

i am working on this

> Throw Exception if runHive(sql) failed
> --
>
> Key: SPARK-30868
> URL: https://issues.apache.org/jira/browse/SPARK-30868
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Jackey Lee
>Priority: Major
>
> At present, HiveClientImpl.runHive will not throw an exception when it runs 
> incorrectly, which will cause it to fail to feedback error information 
> normally.
> Example
> {code:scala}
> spark.sql("add jar file:///tmp/test.jar").show()
> spark.sql("show databases").show()
> {code}
> /tmp/test.jar doesn't exist, thus add jar is failed. However this code will 
> run completely without causing application failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26974) Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-26974:
-
Target Version/s:   (was: 3.0.0)

> Invalid data in grouped cached dataset, formed by joining a large cached 
> dataset with a small dataset
> -
>
> Key: SPARK-26974
> URL: https://issues.apache.org/jira/browse/SPARK-26974
> Project: Spark
>  Issue Type: Bug
>  Components: Java API, Spark Core, SQL
>Affects Versions: 2.2.0
>Reporter: Utkarsh Sharma
>Priority: Major
>  Labels: caching, data-corruption, data-integrity
>
> The initial datasets are derived from hive tables using the spark.table() 
> functions.
> Dataset descriptions:
> *+Sales+* dataset (close to 10 billion rows) with the following columns (and 
> sample rows) : 
> ||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
> |1|1|20|
> |1|2|30|
> |2|1|40|
>  
> +*Customer*+ Dataset (close to 5 rows) with the following columns (and 
> sample rows):
> ||CustomerId (bigint)||CustomerGrpNbr (smallint)||
> |1|1|
> |2|2|
> |3|1|
>  
> I am doing the following steps:
>  # Caching sales dataset with close to 10 billion rows.
>  # Doing an inner join of 'sales' with 'customer' dataset
>  
>  # Doing group by on the resultant dataset, based on CustomerGrpNbr column to 
> get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
>  # Caching the resultant grouped dataset.
>  # Doing a .count() on the grouped dataset.
> The step 5 count is supposed to return only 20, because when you do a 
> customer.select("CustomerGroupNbr").distinct().count you get 20 values. 
> However, you get a value of around 65,000 in step 5.
> Following are the commands I am running in spark-shell:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold"))
> sales.cache()
> finalDf.cache()
> finalDf.count() // returns around 65k rows and the count keeps on varying 
> each run
> customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
> I have been able to replicate the same behavior using the java api as well. 
> This anamolous behavior disappears however, when I remove the caching 
> statements. I.e. if i run the following in spark-shell, it works as expected:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold")) 
> finalDf.count() // returns 20 
> customer.select("CustomerGrpNbr").distinct().count() //returns 20
> {code}
> The tables in hive from which the datasets are built do not change during 
> this entire process. So why does the caching cause this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26974) Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset

2020-02-18 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039702#comment-17039702
 ] 

Hyukjin Kwon commented on SPARK-26974:
--

Please avoid to set Critical+ which is reserved for committers. I am resolving 
this as it targets EOL release, and no feedback from the reporter.

> Invalid data in grouped cached dataset, formed by joining a large cached 
> dataset with a small dataset
> -
>
> Key: SPARK-26974
> URL: https://issues.apache.org/jira/browse/SPARK-26974
> Project: Spark
>  Issue Type: Bug
>  Components: Java API, Spark Core, SQL
>Affects Versions: 2.2.0
>Reporter: Utkarsh Sharma
>Priority: Blocker
>  Labels: caching, data-corruption, data-integrity
>
> The initial datasets are derived from hive tables using the spark.table() 
> functions.
> Dataset descriptions:
> *+Sales+* dataset (close to 10 billion rows) with the following columns (and 
> sample rows) : 
> ||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
> |1|1|20|
> |1|2|30|
> |2|1|40|
>  
> +*Customer*+ Dataset (close to 5 rows) with the following columns (and 
> sample rows):
> ||CustomerId (bigint)||CustomerGrpNbr (smallint)||
> |1|1|
> |2|2|
> |3|1|
>  
> I am doing the following steps:
>  # Caching sales dataset with close to 10 billion rows.
>  # Doing an inner join of 'sales' with 'customer' dataset
>  
>  # Doing group by on the resultant dataset, based on CustomerGrpNbr column to 
> get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
>  # Caching the resultant grouped dataset.
>  # Doing a .count() on the grouped dataset.
> The step 5 count is supposed to return only 20, because when you do a 
> customer.select("CustomerGroupNbr").distinct().count you get 20 values. 
> However, you get a value of around 65,000 in step 5.
> Following are the commands I am running in spark-shell:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold"))
> sales.cache()
> finalDf.cache()
> finalDf.count() // returns around 65k rows and the count keeps on varying 
> each run
> customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
> I have been able to replicate the same behavior using the java api as well. 
> This anamolous behavior disappears however, when I remove the caching 
> statements. I.e. if i run the following in spark-shell, it works as expected:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold")) 
> finalDf.count() // returns 20 
> customer.select("CustomerGrpNbr").distinct().count() //returns 20
> {code}
> The tables in hive from which the datasets are built do not change during 
> this entire process. So why does the caching cause this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26974) Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-26974:
-
Priority: Major  (was: Blocker)

> Invalid data in grouped cached dataset, formed by joining a large cached 
> dataset with a small dataset
> -
>
> Key: SPARK-26974
> URL: https://issues.apache.org/jira/browse/SPARK-26974
> Project: Spark
>  Issue Type: Bug
>  Components: Java API, Spark Core, SQL
>Affects Versions: 2.2.0
>Reporter: Utkarsh Sharma
>Priority: Major
>  Labels: caching, data-corruption, data-integrity
>
> The initial datasets are derived from hive tables using the spark.table() 
> functions.
> Dataset descriptions:
> *+Sales+* dataset (close to 10 billion rows) with the following columns (and 
> sample rows) : 
> ||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
> |1|1|20|
> |1|2|30|
> |2|1|40|
>  
> +*Customer*+ Dataset (close to 5 rows) with the following columns (and 
> sample rows):
> ||CustomerId (bigint)||CustomerGrpNbr (smallint)||
> |1|1|
> |2|2|
> |3|1|
>  
> I am doing the following steps:
>  # Caching sales dataset with close to 10 billion rows.
>  # Doing an inner join of 'sales' with 'customer' dataset
>  
>  # Doing group by on the resultant dataset, based on CustomerGrpNbr column to 
> get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
>  # Caching the resultant grouped dataset.
>  # Doing a .count() on the grouped dataset.
> The step 5 count is supposed to return only 20, because when you do a 
> customer.select("CustomerGroupNbr").distinct().count you get 20 values. 
> However, you get a value of around 65,000 in step 5.
> Following are the commands I am running in spark-shell:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold"))
> sales.cache()
> finalDf.cache()
> finalDf.count() // returns around 65k rows and the count keeps on varying 
> each run
> customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
> I have been able to replicate the same behavior using the java api as well. 
> This anamolous behavior disappears however, when I remove the caching 
> statements. I.e. if i run the following in spark-shell, it works as expected:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold")) 
> finalDf.count() // returns 20 
> customer.select("CustomerGrpNbr").distinct().count() //returns 20
> {code}
> The tables in hive from which the datasets are built do not change during 
> this entire process. So why does the caching cause this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26974) Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26974.
--
Resolution: Incomplete

> Invalid data in grouped cached dataset, formed by joining a large cached 
> dataset with a small dataset
> -
>
> Key: SPARK-26974
> URL: https://issues.apache.org/jira/browse/SPARK-26974
> Project: Spark
>  Issue Type: Bug
>  Components: Java API, Spark Core, SQL
>Affects Versions: 2.2.0
>Reporter: Utkarsh Sharma
>Priority: Major
>  Labels: caching, data-corruption, data-integrity
>
> The initial datasets are derived from hive tables using the spark.table() 
> functions.
> Dataset descriptions:
> *+Sales+* dataset (close to 10 billion rows) with the following columns (and 
> sample rows) : 
> ||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
> |1|1|20|
> |1|2|30|
> |2|1|40|
>  
> +*Customer*+ Dataset (close to 5 rows) with the following columns (and 
> sample rows):
> ||CustomerId (bigint)||CustomerGrpNbr (smallint)||
> |1|1|
> |2|2|
> |3|1|
>  
> I am doing the following steps:
>  # Caching sales dataset with close to 10 billion rows.
>  # Doing an inner join of 'sales' with 'customer' dataset
>  
>  # Doing group by on the resultant dataset, based on CustomerGrpNbr column to 
> get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
>  # Caching the resultant grouped dataset.
>  # Doing a .count() on the grouped dataset.
> The step 5 count is supposed to return only 20, because when you do a 
> customer.select("CustomerGroupNbr").distinct().count you get 20 values. 
> However, you get a value of around 65,000 in step 5.
> Following are the commands I am running in spark-shell:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold"))
> sales.cache()
> finalDf.cache()
> finalDf.count() // returns around 65k rows and the count keeps on varying 
> each run
> customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
> I have been able to replicate the same behavior using the java api as well. 
> This anamolous behavior disappears however, when I remove the caching 
> statements. I.e. if i run the following in spark-shell, it works as expected:
> {code:java}
> var sales = spark.table("sales_table")
> var customer = spark.table("customer_table")
> var finalDf = sales.join(customer, 
> "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
> stddev("qty_sold")) 
> finalDf.count() // returns 20 
> customer.select("CustomerGrpNbr").distinct().count() //returns 20
> {code}
> The tables in hive from which the datasets are built do not change during 
> this entire process. So why does the caching cause this problem?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28883) Fix a flaky test: ThriftServerQueryTestSuite

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-28883.
--
Resolution: Duplicate

> Fix a flaky test: ThriftServerQueryTestSuite
> 
>
> Key: SPARK-28883
> URL: https://issues.apache.org/jira/browse/SPARK-28883
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Blocker
>
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/109764/testReport/org.apache.spark.sql.hive.thriftserver/ThriftServerQueryTestSuite/
>  (2 failures)
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/109768/testReport/org.apache.spark.sql.hive.thriftserver/ThriftServerQueryTestSuite/
>  (4 failures)
> Error message:
> {noformat}
> java.sql.SQLException: Could not open client transport with JDBC Uri: 
> jdbc:hive2://localhost:14431: java.net.ConnectException: Connection refused 
> (Connection refused)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30814) Add Columns references should be able to resolve each other

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-30814.
-
Fix Version/s: 3.0.0
 Assignee: Terry Kim
   Resolution: Fixed

> Add Columns references should be able to resolve each other
> ---
>
> Key: SPARK-30814
> URL: https://issues.apache.org/jira/browse/SPARK-30814
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Burak Yavuz
>Assignee: Terry Kim
>Priority: Major
> Fix For: 3.0.0
>
>
> In ResolveAlterTableChanges, we have checks that make sure that positional 
> arguments exist and are normalized around case sensitivity for ALTER TABLE 
> ADD COLUMNS. However, we missed the case, where a column in ADD COLUMNS can 
> depend on the position of a column that is just being added.
> For example for the schema:
> {code:java}
> root:
>   - a: string
>   - b: long
>  {code}
>  
> The following should work:
> {code:java}
> ALTER TABLE ... ADD COLUMNS (x int AFTER a, y int AFTER x) {code}
> Currently, the above statement will throw an error saying that AFTER x cannot 
> be resolved, because x doesn't exist yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30872) Constraints inferred from inferred attributes

2020-02-18 Thread Yuming Wang (Jira)
Yuming Wang created SPARK-30872:
---

 Summary: Constraints inferred from inferred attributes
 Key: SPARK-30872
 URL: https://issues.apache.org/jira/browse/SPARK-30872
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Yuming Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30872) Constraints inferred from inferred attributes

2020-02-18 Thread Yuming Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-30872:

Description: 

{code:scala}
scala> spark.range(20).selectExpr("id as a", "id as b", "id as 
c").write.saveAsTable("t1")

scala> spark.sql("select count(*) from t1 where a = b and b = c and (c = 3 or c 
= 13)").explain(false)
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition, true, [id=#76]
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
  +- *(1) Project
 +- *(1) Filter (((isnotnull(c#36L) AND ((b#35L = 3) OR (b#35L = 
13))) AND isnotnull(b#35L)) AND (a#34L = c#36L)) AND isnotnull(a#34L)) AND 
(a#34L = b#35L)) AND (b#35L = c#36L)) AND ((c#36L = 3) OR (c#36L = 13)))
+- *(1) ColumnarToRow
   +- FileScan parquet default.t1[a#34L,b#35L,c#36L] Batched: true, 
DataFilters: [isnotnull(c#36L), ((b#35L = 3) OR (b#35L = 13)), 
isnotnull(b#35L), (a#34L = c#36L), isnotnull(a#..., Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous...,
 PartitionFilters: [], PushedFilters: [IsNotNull(c), 
Or(EqualTo(b,3),EqualTo(b,13)), IsNotNull(b), IsNotNull(a), 
Or(EqualTo(c,3),EqualT..., ReadSchema: struct
{code}


> Constraints inferred from inferred attributes
> -
>
> Key: SPARK-30872
> URL: https://issues.apache.org/jira/browse/SPARK-30872
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> {code:scala}
> scala> spark.range(20).selectExpr("id as a", "id as b", "id as 
> c").write.saveAsTable("t1")
> scala> spark.sql("select count(*) from t1 where a = b and b = c and (c = 3 or 
> c = 13)").explain(false)
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition, true, [id=#76]
>+- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
>   +- *(1) Project
>  +- *(1) Filter (((isnotnull(c#36L) AND ((b#35L = 3) OR (b#35L = 
> 13))) AND isnotnull(b#35L)) AND (a#34L = c#36L)) AND isnotnull(a#34L)) AND 
> (a#34L = b#35L)) AND (b#35L = c#36L)) AND ((c#36L = 3) OR (c#36L = 13)))
> +- *(1) ColumnarToRow
>+- FileScan parquet default.t1[a#34L,b#35L,c#36L] Batched: 
> true, DataFilters: [isnotnull(c#36L), ((b#35L = 3) OR (b#35L = 13)), 
> isnotnull(b#35L), (a#34L = c#36L), isnotnull(a#..., Format: Parquet, 
> Location: 
> InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(c), 
> Or(EqualTo(b,3),EqualTo(b,13)), IsNotNull(b), IsNotNull(a), 
> Or(EqualTo(c,3),EqualT..., ReadSchema: struct
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30619) org.slf4j.Logger and org.apache.commons.collections classes not built as part of hadoop-provided profile

2020-02-18 Thread Abhishek Rao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039728#comment-17039728
 ] 

Abhishek Rao commented on SPARK-30619:
--

[~hyukjin.kwon] Were you able to reproduce this using the above procedure?

> org.slf4j.Logger and org.apache.commons.collections classes not built as part 
> of hadoop-provided profile
> 
>
> Key: SPARK-30619
> URL: https://issues.apache.org/jira/browse/SPARK-30619
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.4.2, 2.4.4
> Environment: Spark on kubernetes
>Reporter: Abhishek Rao
>Priority: Major
>
> We're using spark-2.4.4-bin-without-hadoop.tgz and executing Java Word count 
> (org.apache.spark.examples.JavaWordCount) example on local files.
> But we're seeing that it is expecting org.slf4j.Logger and 
> org.apache.commons.collections classes to be available for executing this.
> We expected the binary to work as it is for local files. Is there anything 
> which we're missing?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30852) Use Long instead of Int as argument type in Dataset limit method

2020-02-18 Thread Damianos Christophides (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039730#comment-17039730
 ] 

Damianos Christophides commented on SPARK-30852:


thanks [~rakson] and [~cloud_fan] for looking into it. I understand the 
limitations.

Not sure how I can mark it as "won't fix", I can't see the option

> Use Long instead of Int as argument type in Dataset limit method
> 
>
> Key: SPARK-30852
> URL: https://issues.apache.org/jira/browse/SPARK-30852
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.4
>Reporter: Damianos Christophides
>Priority: Minor
>
> The Dataset limit method takes an input of type Int, which is a 32bit 
> integer. The numerical upper limit of this type is 2,147,483,647. I found in 
> my work to need to apply a limit to a Dataset higher than that which gives an 
> error:
> "py4j.Py4JException: Method limit([class java.lang.Long]) does not exist"
>  
> Could the input type of the limit method be changed to a Long (64bit)?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30868) Throw Exception if runHive(sql) failed

2020-02-18 Thread Ankit Raj Boudh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039749#comment-17039749
 ] 

Ankit Raj Boudh commented on SPARK-30868:
-

[~Jackey Lee], you mean after add jar failed .show() statement also should 
throw exception ?

> Throw Exception if runHive(sql) failed
> --
>
> Key: SPARK-30868
> URL: https://issues.apache.org/jira/browse/SPARK-30868
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Jackey Lee
>Priority: Major
>
> At present, HiveClientImpl.runHive will not throw an exception when it runs 
> incorrectly, which will cause it to fail to feedback error information 
> normally.
> Example
> {code:scala}
> spark.sql("add jar file:///tmp/test.jar").show()
> spark.sql("show databases").show()
> {code}
> /tmp/test.jar doesn't exist, thus add jar is failed. However this code will 
> run completely without causing application failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-30785) Create table like should keep tracksPartitionsInCatalog same with source table

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-30785:
---

Assignee: Lantao Jin

> Create table like should keep tracksPartitionsInCatalog same with source table
> --
>
> Key: SPARK-30785
> URL: https://issues.apache.org/jira/browse/SPARK-30785
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Lantao Jin
>Assignee: Lantao Jin
>Priority: Major
>
> Table generated by CTL a partitioned table is a partitioned table. But when 
> run ALTER TABLE ADD PARTITION, it will throw AnalysisException: ALTER TABLE 
> ADD PARTITION is not allowed. That's because the default value of 
> {{tracksPartitionsInCatalog}} from CTL always is {{false}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30785) Create table like should keep tracksPartitionsInCatalog same with source table

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-30785.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

Issue resolved by pull request 27538
[https://github.com/apache/spark/pull/27538]

> Create table like should keep tracksPartitionsInCatalog same with source table
> --
>
> Key: SPARK-30785
> URL: https://issues.apache.org/jira/browse/SPARK-30785
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Lantao Jin
>Assignee: Lantao Jin
>Priority: Major
> Fix For: 3.0.0
>
>
> Table generated by CTL a partitioned table is a partitioned table. But when 
> run ALTER TABLE ADD PARTITION, it will throw AnalysisException: ALTER TABLE 
> ADD PARTITION is not allowed. That's because the default value of 
> {{tracksPartitionsInCatalog}} from CTL always is {{false}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27213) Unexpected results when filter is used after distinct

2020-02-18 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039761#comment-17039761
 ] 

Hyukjin Kwon commented on SPARK-27213:
--

Does this case make sense?

{code}
df.select('x', 'y', 'z').distinct().filter("y_n='y'").show() 
{code}

{code}
>>> df.select('x', 'y', 'z').distinct()
DataFrame[x: string, y: string, z: string]
{code}

There is no {{y_n}} field in this DataFrame so I think it's rather a bug that 
this case works.

{code}
df.filter("y_n='y'").select('x', 'y', 'z').distinct().show()
{code}

this case is fine because the {{df}} has {{y_n}} field:

{code}
>>> df
DataFrame[x: string, y: string, z: string, y_n: string]
{code}

If you select {{y_n}} too, it shows the results correctly:

{code}
df.select('x' ,'y', 'z', 'y_n').distinct().filter("y_n='y'").show()
+---+---++---+
|  x|  y|   z|y_n|
+---+---++---+
|  a|123|12.3|  y|
|  a|123|12.4|  y|
+---+---++---+
{code}.

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Blocker
>  Labels: correctness, distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77,

[jira] [Commented] (SPARK-27213) Unexpected results when filter is used after distinct

2020-02-18 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039763#comment-17039763
 ] 

Hyukjin Kwon commented on SPARK-27213:
--

I am lowering the priority to {{Critical}}.

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Blocker
>  Labels: correctness, distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27213:
-
Priority: Critical  (was: Blocker)

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Critical
>  Labels: correctness, distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, 
> z#76, first#95, valueSet#96|

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27213:
-
Labels: distinct filter  (was: correctness distinct filter)

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Critical
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, 
> z#76, first#95, 

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27213:
-
Priority: Major  (was: Critical)

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Major
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, 
> z#76, first#95, valueSet#96|#74, y#75, z#76, f

[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27213:
-
Target Version/s:   (was: 3.0.0)

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Major
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, 
> z#76, first#95, valueSet#96|#74, y#75, z#76, f

[jira] [Comment Edited] (SPARK-27213) Unexpected results when filter is used after distinct

2020-02-18 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039763#comment-17039763
 ] 

Hyukjin Kwon edited comment on SPARK-27213 at 2/19/20 7:33 AM:
---

I am lowering the priority to Major.


was (Author: hyukjin.kwon):
I am lowering the priority to {{Critical}}.

> Unexpected results when filter is used after distinct
> -
>
> Key: SPARK-27213
> URL: https://issues.apache.org/jira/browse/SPARK-27213
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Rinaz Belhaj
>Priority: Critical
>  Labels: distinct, filter
>
> The following code gives unexpected output due to the filter not getting 
> pushed down in catalyst optimizer.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.show(5)
> df.filter("y_n='y'").select('x','y','z').distinct().show()
> df.select('x','y','z').distinct().filter("y_n='y'").show()
> {code}
> {panel:title=Output}
> |x|y|z|y_n|
> |a|123|12.3|n|
> |a|123|12.3|y|
> |a|123|12.4|y|
>  
> |x|y|z|
> |a|123|12.3|
> |a|123|12.4|
>  
> |x|y|z|
> |a|123|12.4|
> {panel}
> Ideally, the second statement should result in an error since the column used 
> in the filter is not present in the preceding select statement. But the 
> catalyst optimizer is using first() on column y_n and then applying the 
> filter.
> Even if the filter was pushed down, the result would have been accurate.
> {code:java}
> df = 
> spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
> df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
> df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
> {code}
> {panel:title=Output}
>  
>  == Parsed Logical Plan ==
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- AnalysisBarrier
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, 
> z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
>  +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], 
> output=[x#74, y#75, z#76|#74, y#75, z#76])
>  +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77]
>   
>  
> ---
>  
>   
>  == Parsed Logical Plan ==
>  'Filter ('y_n = y)
>  +- AnalysisBarrier
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Analyzed Logical Plan ==
>  x: string, y: string, z: string
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (y_n#77 = y)
>  +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Optimized Logical Plan ==
>  Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, 
> first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS 
> y_n#77]
>  +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false
>   
>  == Physical Plan ==
>  *(3) Project [x#74, y#75, z#76|#74, y#75, z#76]
>  +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
>  +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], 
> functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, 
> y_n#77|#74, y#75, z#76, y_n#77])
>  +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS 
> FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], 
> false, 0
>  +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
> 

[jira] [Updated] (SPARK-27282) Spark incorrect results when using UNION with GROUP BY clause

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27282:
-
Target Version/s:   (was: 3.0.0)

> Spark incorrect results when using UNION with GROUP BY clause
> -
>
> Key: SPARK-27282
> URL: https://issues.apache.org/jira/browse/SPARK-27282
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, Spark Submit, SQL
>Affects Versions: 2.3.2
> Environment: I'm using :
> IntelliJ  IDEA ==> 2018.1.4
> spark-sql and spark-core ==> 2.3.2.3.1.0.0-78 (for HDP 3.1)
> scala ==> 2.11.8
>Reporter: Sofia
>Priority: Blocker
>
> When using UNION clause after a GROUP BY clause in spark, the results 
> obtained are wrong.
> The following example explicit this issue:
> {code:java}
> CREATE TABLE test_un (
> col1 varchar(255),
> col2 varchar(255),
> col3 varchar(255),
> col4 varchar(255)
> );
> INSERT INTO test_un (col1, col2, col3, col4)
> VALUES (1,1,2,4),
> (1,1,2,4),
> (1,1,3,5),
> (2,2,2,null);
> {code}
> I used the following code :
> {code:java}
> val x = Toolkit.HiveToolkit.getDataFromHive("test","test_un")
> val  y = x
>.filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col3")
>   .agg(count(col("col3")).alias("cnt"))
>   .withColumn("col_name", lit("col3"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col3").alias("col_value"), col("cnt"))
> val z = x
>   .filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col4")
>   .agg(count(col("col4")).alias("cnt"))
>   .withColumn("col_name", lit("col4"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col4").alias("col_value"), col("cnt"))
> y.union(z).show()
> {code}
>  And i obtained the following results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|5|1|
> |1|1|col3|4|2|
> |1|1|col4|5|1|
> |1|1|col4|4|2|
> Expected results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|3|1|
> |1|1|col3|2|2|
> |1|1|col4|4|2|
> |1|1|col4|5|1|
> But when i remove the last row of the table, i obtain the correct results.
> {code:java}
> (2,2,2,null){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27282) Spark incorrect results when using UNION with GROUP BY clause

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27282:
-
Priority: Major  (was: Blocker)

> Spark incorrect results when using UNION with GROUP BY clause
> -
>
> Key: SPARK-27282
> URL: https://issues.apache.org/jira/browse/SPARK-27282
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, Spark Submit, SQL
>Affects Versions: 2.3.2
> Environment: I'm using :
> IntelliJ  IDEA ==> 2018.1.4
> spark-sql and spark-core ==> 2.3.2.3.1.0.0-78 (for HDP 3.1)
> scala ==> 2.11.8
>Reporter: Sofia
>Priority: Major
>
> When using UNION clause after a GROUP BY clause in spark, the results 
> obtained are wrong.
> The following example explicit this issue:
> {code:java}
> CREATE TABLE test_un (
> col1 varchar(255),
> col2 varchar(255),
> col3 varchar(255),
> col4 varchar(255)
> );
> INSERT INTO test_un (col1, col2, col3, col4)
> VALUES (1,1,2,4),
> (1,1,2,4),
> (1,1,3,5),
> (2,2,2,null);
> {code}
> I used the following code :
> {code:java}
> val x = Toolkit.HiveToolkit.getDataFromHive("test","test_un")
> val  y = x
>.filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col3")
>   .agg(count(col("col3")).alias("cnt"))
>   .withColumn("col_name", lit("col3"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col3").alias("col_value"), col("cnt"))
> val z = x
>   .filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col4")
>   .agg(count(col("col4")).alias("cnt"))
>   .withColumn("col_name", lit("col4"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col4").alias("col_value"), col("cnt"))
> y.union(z).show()
> {code}
>  And i obtained the following results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|5|1|
> |1|1|col3|4|2|
> |1|1|col4|5|1|
> |1|1|col4|4|2|
> Expected results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|3|1|
> |1|1|col3|2|2|
> |1|1|col4|4|2|
> |1|1|col4|5|1|
> But when i remove the last row of the table, i obtain the correct results.
> {code:java}
> (2,2,2,null){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27282) Spark incorrect results when using UNION with GROUP BY clause

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27282.
--
Resolution: Incomplete

> Spark incorrect results when using UNION with GROUP BY clause
> -
>
> Key: SPARK-27282
> URL: https://issues.apache.org/jira/browse/SPARK-27282
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, Spark Submit, SQL
>Affects Versions: 2.3.2
> Environment: I'm using :
> IntelliJ  IDEA ==> 2018.1.4
> spark-sql and spark-core ==> 2.3.2.3.1.0.0-78 (for HDP 3.1)
> scala ==> 2.11.8
>Reporter: Sofia
>Priority: Major
>
> When using UNION clause after a GROUP BY clause in spark, the results 
> obtained are wrong.
> The following example explicit this issue:
> {code:java}
> CREATE TABLE test_un (
> col1 varchar(255),
> col2 varchar(255),
> col3 varchar(255),
> col4 varchar(255)
> );
> INSERT INTO test_un (col1, col2, col3, col4)
> VALUES (1,1,2,4),
> (1,1,2,4),
> (1,1,3,5),
> (2,2,2,null);
> {code}
> I used the following code :
> {code:java}
> val x = Toolkit.HiveToolkit.getDataFromHive("test","test_un")
> val  y = x
>.filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col3")
>   .agg(count(col("col3")).alias("cnt"))
>   .withColumn("col_name", lit("col3"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col3").alias("col_value"), col("cnt"))
> val z = x
>   .filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col4")
>   .agg(count(col("col4")).alias("cnt"))
>   .withColumn("col_name", lit("col4"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col4").alias("col_value"), col("cnt"))
> y.union(z).show()
> {code}
>  And i obtained the following results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|5|1|
> |1|1|col3|4|2|
> |1|1|col4|5|1|
> |1|1|col4|4|2|
> Expected results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|3|1|
> |1|1|col3|2|2|
> |1|1|col4|4|2|
> |1|1|col4|5|1|
> But when i remove the last row of the table, i obtain the correct results.
> {code:java}
> (2,2,2,null){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27282) Spark incorrect results when using UNION with GROUP BY clause

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27282:
-
Labels:   (was: correctness)

> Spark incorrect results when using UNION with GROUP BY clause
> -
>
> Key: SPARK-27282
> URL: https://issues.apache.org/jira/browse/SPARK-27282
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, Spark Submit, SQL
>Affects Versions: 2.3.2
> Environment: I'm using :
> IntelliJ  IDEA ==> 2018.1.4
> spark-sql and spark-core ==> 2.3.2.3.1.0.0-78 (for HDP 3.1)
> scala ==> 2.11.8
>Reporter: Sofia
>Priority: Blocker
>
> When using UNION clause after a GROUP BY clause in spark, the results 
> obtained are wrong.
> The following example explicit this issue:
> {code:java}
> CREATE TABLE test_un (
> col1 varchar(255),
> col2 varchar(255),
> col3 varchar(255),
> col4 varchar(255)
> );
> INSERT INTO test_un (col1, col2, col3, col4)
> VALUES (1,1,2,4),
> (1,1,2,4),
> (1,1,3,5),
> (2,2,2,null);
> {code}
> I used the following code :
> {code:java}
> val x = Toolkit.HiveToolkit.getDataFromHive("test","test_un")
> val  y = x
>.filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col3")
>   .agg(count(col("col3")).alias("cnt"))
>   .withColumn("col_name", lit("col3"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col3").alias("col_value"), col("cnt"))
> val z = x
>   .filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col4")
>   .agg(count(col("col4")).alias("cnt"))
>   .withColumn("col_name", lit("col4"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col4").alias("col_value"), col("cnt"))
> y.union(z).show()
> {code}
>  And i obtained the following results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|5|1|
> |1|1|col3|4|2|
> |1|1|col4|5|1|
> |1|1|col4|4|2|
> Expected results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|3|1|
> |1|1|col3|2|2|
> |1|1|col4|4|2|
> |1|1|col4|5|1|
> But when i remove the last row of the table, i obtain the correct results.
> {code:java}
> (2,2,2,null){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27282) Spark incorrect results when using UNION with GROUP BY clause

2020-02-18 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039769#comment-17039769
 ] 

Hyukjin Kwon commented on SPARK-27282:
--

Spark 2.3.x is EOL. Let's reopen when we can confirm this issue persists in 
Spark 2.4.x+.

> Spark incorrect results when using UNION with GROUP BY clause
> -
>
> Key: SPARK-27282
> URL: https://issues.apache.org/jira/browse/SPARK-27282
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, Spark Submit, SQL
>Affects Versions: 2.3.2
> Environment: I'm using :
> IntelliJ  IDEA ==> 2018.1.4
> spark-sql and spark-core ==> 2.3.2.3.1.0.0-78 (for HDP 3.1)
> scala ==> 2.11.8
>Reporter: Sofia
>Priority: Major
>
> When using UNION clause after a GROUP BY clause in spark, the results 
> obtained are wrong.
> The following example explicit this issue:
> {code:java}
> CREATE TABLE test_un (
> col1 varchar(255),
> col2 varchar(255),
> col3 varchar(255),
> col4 varchar(255)
> );
> INSERT INTO test_un (col1, col2, col3, col4)
> VALUES (1,1,2,4),
> (1,1,2,4),
> (1,1,3,5),
> (2,2,2,null);
> {code}
> I used the following code :
> {code:java}
> val x = Toolkit.HiveToolkit.getDataFromHive("test","test_un")
> val  y = x
>.filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col3")
>   .agg(count(col("col3")).alias("cnt"))
>   .withColumn("col_name", lit("col3"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col3").alias("col_value"), col("cnt"))
> val z = x
>   .filter(col("col4")isNotNull)
>   .groupBy("col1", "col2","col4")
>   .agg(count(col("col4")).alias("cnt"))
>   .withColumn("col_name", lit("col4"))
>   .select(col("col1"), col("col2"), 
> col("col_name"),col("col4").alias("col_value"), col("cnt"))
> y.union(z).show()
> {code}
>  And i obtained the following results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|5|1|
> |1|1|col3|4|2|
> |1|1|col4|5|1|
> |1|1|col4|4|2|
> Expected results:
> ||col1||col2||col_name||col_value||cnt||
> |1|1|col3|3|1|
> |1|1|col3|2|2|
> |1|1|col4|4|2|
> |1|1|col4|5|1|
> But when i remove the last row of the table, i obtain the correct results.
> {code:java}
> (2,2,2,null){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27784) Alias ID reuse can break correctness when substituting foldable expressions

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27784.
--
Resolution: Cannot Reproduce

I am resolving this as the reported issue cannot be reproduced according to 
"Contributing to JIRA Maintenance" in 
https://spark.apache.org/contributing.html. It would be best to identify which 
JIRA fixed it.

(also, we're resolving JIRAs that target EOL as affect version)

> Alias ID reuse can break correctness when substituting foldable expressions
> ---
>
> Key: SPARK-27784
> URL: https://issues.apache.org/jira/browse/SPARK-27784
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1, 2.3.2
>Reporter: Ryan Blue
>Priority: Blocker
>  Labels: correctness
>
> This is a correctness bug when reusing a set of project expressions in the 
> DataFrame API.
> Use case: a user was migrating a table to a new version with an additional 
> column ("data" in the repro case). To migrate the user unions the old table 
> ("t2") with the new table ("t1"), and applies a common set of projections to 
> ensure the union doesn't hit an issue with ordering (SPARK-22335). In some 
> cases, this produces an incorrect query plan:
> {code:java}
> Seq((4, "a"), (5, "b"), (6, "c")).toDF("id", "data").write.saveAsTable("t1")
> Seq(1, 2, 3).toDF("id").write.saveAsTable("t2")
> val dim = Seq(2, 3, 4).toDF("id")
> val outputCols = Seq($"id", coalesce($"data", lit("_")).as("data"))
> val t1 = spark.table("t1").select(outputCols:_*)
> val t2 = spark.table("t2").withColumn("data", lit(null)).select(outputCols:_*)
> t1.join(dim, t1("id") === dim("id")).select(t1("id"), 
> t1("data")).union(t2).explain(true){code}
> {code:java}
> == Physical Plan ==
> Union
> :- *Project [id#330, _ AS data#237] < THE CONSTANT IS 
> FROM THE OTHER SIDE OF THE UNION
> : +- *BroadcastHashJoin [id#330], [id#234], Inner, BuildRight
> : :- *Project [id#330]
> : :  +- *Filter isnotnull(id#330)
> : : +- *FileScan parquet t1[id#330] Batched: true, Format: Parquet, 
> Location: CatalogFileIndex[s3://.../t1], PartitionFilters: [], PushedFilters: 
> [IsNotNull(id)], ReadSchema: struct
> : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
> false] as bigint)))
> :+- LocalTableScan [id#234]
> +- *Project [id#340, _ AS data#237]
>+- *FileScan parquet t2[id#340] Batched: true, Format: Parquet, Location: 
> CatalogFileIndex[s3://.../t2], PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct{code}
> The problem happens because "outputCols" has an alias. The ID for that alias 
> is created when the projection Seq is created, so it is reused in both sides 
> of the union.
> When FoldablePropagation runs, it identifies that "data" in the t2 side of 
> the union is a foldable expression and replaces all references to it, 
> including the references in the t1 side of the union.
> The join to a dimension table is necessary to reproduce the problem because 
> it requires a Projection on top of the join that uses an AttributeReference 
> for data#237. Otherwise, the projections are collapsed and the projection 
> includes an Alias that does not get rewritten by FoldablePropagation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28024) Incorrect numeric values when out of range

2020-02-18 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039775#comment-17039775
 ] 

Hyukjin Kwon commented on SPARK-28024:
--

Case 1 is fixed. Case 2, 3, and 4 seem not fixed

> Incorrect numeric values when out of range
> --
>
> Key: SPARK-28024
> URL: https://issues.apache.org/jira/browse/SPARK-28024
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.4, 2.4.4, 3.0.0
>Reporter: Yuming Wang
>Priority: Blocker
>  Labels: correctness
> Attachments: SPARK-28024.png
>
>
> For example
> Case 1:
> {code:sql}
> select tinyint(128) * tinyint(2); -- 0
> select smallint(2147483647) * smallint(2); -- -2
> select int(2147483647) * int(2); -- -2
> SELECT smallint((-32768)) * smallint(-1); -- -32768
> {code}
> Case 2:
> {code:sql}
> spark-sql> select cast('10e-70' as float), cast('-10e-70' as float);
> 0.0   -0.0
> {code}
> Case 3:
> {code:sql}
> spark-sql> select cast('10e-400' as double), cast('-10e-400' as double);
> 0.0   -0.0
> {code}
> Case 4:
> {code:sql}
> spark-sql> select exp(-1.2345678901234E200);
> 0.0
> postgres=# select exp(-1.2345678901234E200);
> ERROR:  value overflows numeric format
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28125) dataframes created by randomSplit have overlapping rows

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-28125:
-
Labels:   (was: correctness data-corruption data-integrity data-loss)

> dataframes created by randomSplit have overlapping rows
> ---
>
> Key: SPARK-28125
> URL: https://issues.apache.org/jira/browse/SPARK-28125
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib, PySpark, Spark Core
>Affects Versions: 2.4.3
> Environment: Run with Databricks Runtime 5.3 ML (includes Apache 
> Spark 2.4.0, Scala 2.11)
>  
> More details on the environment: 
> [https://docs.databricks.com/release-notes/runtime/5.3ml.html]
> The python package versions: 
> [https://docs.databricks.com/release-notes/runtime/5.3ml.html#python-libraries]
>Reporter: Zachary
>Priority: Blocker
>
> It appears that the function randomSplit on a DataFrame creates a separate 
> execution plan for each of the result DataFrames, or at least that's the 
> impression I get from reading a few StackOverflow pages on it: 
> [https://stackoverflow.com/questions/38379522/how-does-spark-keep-track-of-the-splits-in-randomsplit/38380023#38380023]
> [https://stackoverflow.com/questions/32933143/how-does-sparks-rdd-randomsplit-actually-split-the-rdd/32933366]
>  
> Because of the separate executions, it is easy to create a situation where 
> the Dataframes returned by randomSplit have overlapping rows. Thus if people 
> are relying on it to split a dataset into training and test, then they could 
> easily end up with the same rows in both sets, thus causing a serious problem 
> when running model evaluation. 
>  
> I know that if you call .cache() on the RDD before calling .randomSplit then 
> you can be assured that the returned frames have unique rows, but this 
> work-around is definitely not obvious. I did not know about this issue and 
> ended up creating improper data sets when doing model training and 
> evaluation. Something should be adjusted in .randomSplit so that under all 
> circumstances, the returned Dataframes will have unique rows. 
>  
> Here is a Pyspark script I wrote that re-creates the issue and includes the 
> work-around line that fixes it as a temporary workaround: 
>  
> {code:java}
> import numpy as np
> from pyspark.sql import Row
> from pyspark.sql.functions import *
> from pyspark.sql.types import *
> N = 10
> ratio1 = 0.85
> ratio2 = 0.15
> gen_rand = udf(lambda x: int(np.random.random()*5 + 2), IntegerType())
> orig_list = list(np.zeros(N))
> rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
> df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
> dfA = df.withColumn("ID2", gen_rand(df['ID']))
> orig_list = list(np.zeros(N))
> rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
> df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
> dfA = df.withColumn("ID2", gen_rand(df['ID']))
> dfA = dfA.select("ID2").distinct()
> dfA_els = dfA.rdd.map(lambda x: x['ID2']).collect()
> print("This confirms that if you look at the parent Dataframe, the ID2 col 
> has unqiue values")
> print("Num rows parent DF: {}".format(len(dfA_els)))
> print("num unique ID2 vals: {}".format(len(set(dfA_els
> #dfA = dfA.cache() #Uncommenting this line does fix the issue
> df1, df2 = dfA.randomSplit([ratio2, ratio1])
> df1_ids = set(df1.rdd.map(lambda x: x['ID2']).distinct().collect())
> df2_ids = set(df2.rdd.map(lambda x: x['ID2']).distinct().collect())
> num_inter = len(df1_ids.intersection(df2_ids))
> print()
> print("Number common IDs between the two splits: {}".format(num_inter))
> print("(should be zero if randomSplit is working as expected)")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28125) dataframes created by randomSplit have overlapping rows

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-28125:
-
Target Version/s:   (was: 3.0.0)

> dataframes created by randomSplit have overlapping rows
> ---
>
> Key: SPARK-28125
> URL: https://issues.apache.org/jira/browse/SPARK-28125
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib, PySpark, Spark Core
>Affects Versions: 2.4.3
> Environment: Run with Databricks Runtime 5.3 ML (includes Apache 
> Spark 2.4.0, Scala 2.11)
>  
> More details on the environment: 
> [https://docs.databricks.com/release-notes/runtime/5.3ml.html]
> The python package versions: 
> [https://docs.databricks.com/release-notes/runtime/5.3ml.html#python-libraries]
>Reporter: Zachary
>Priority: Blocker
>
> It appears that the function randomSplit on a DataFrame creates a separate 
> execution plan for each of the result DataFrames, or at least that's the 
> impression I get from reading a few StackOverflow pages on it: 
> [https://stackoverflow.com/questions/38379522/how-does-spark-keep-track-of-the-splits-in-randomsplit/38380023#38380023]
> [https://stackoverflow.com/questions/32933143/how-does-sparks-rdd-randomsplit-actually-split-the-rdd/32933366]
>  
> Because of the separate executions, it is easy to create a situation where 
> the Dataframes returned by randomSplit have overlapping rows. Thus if people 
> are relying on it to split a dataset into training and test, then they could 
> easily end up with the same rows in both sets, thus causing a serious problem 
> when running model evaluation. 
>  
> I know that if you call .cache() on the RDD before calling .randomSplit then 
> you can be assured that the returned frames have unique rows, but this 
> work-around is definitely not obvious. I did not know about this issue and 
> ended up creating improper data sets when doing model training and 
> evaluation. Something should be adjusted in .randomSplit so that under all 
> circumstances, the returned Dataframes will have unique rows. 
>  
> Here is a Pyspark script I wrote that re-creates the issue and includes the 
> work-around line that fixes it as a temporary workaround: 
>  
> {code:java}
> import numpy as np
> from pyspark.sql import Row
> from pyspark.sql.functions import *
> from pyspark.sql.types import *
> N = 10
> ratio1 = 0.85
> ratio2 = 0.15
> gen_rand = udf(lambda x: int(np.random.random()*5 + 2), IntegerType())
> orig_list = list(np.zeros(N))
> rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
> df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
> dfA = df.withColumn("ID2", gen_rand(df['ID']))
> orig_list = list(np.zeros(N))
> rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
> df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
> dfA = df.withColumn("ID2", gen_rand(df['ID']))
> dfA = dfA.select("ID2").distinct()
> dfA_els = dfA.rdd.map(lambda x: x['ID2']).collect()
> print("This confirms that if you look at the parent Dataframe, the ID2 col 
> has unqiue values")
> print("Num rows parent DF: {}".format(len(dfA_els)))
> print("num unique ID2 vals: {}".format(len(set(dfA_els
> #dfA = dfA.cache() #Uncommenting this line does fix the issue
> df1, df2 = dfA.randomSplit([ratio2, ratio1])
> df1_ids = set(df1.rdd.map(lambda x: x['ID2']).distinct().collect())
> df2_ids = set(df2.rdd.map(lambda x: x['ID2']).distinct().collect())
> num_inter = len(df1_ids.intersection(df2_ids))
> print()
> print("Number common IDs between the two splits: {}".format(num_inter))
> print("(should be zero if randomSplit is working as expected)")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28125) dataframes created by randomSplit have overlapping rows

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-28125:
-
Priority: Major  (was: Blocker)

> dataframes created by randomSplit have overlapping rows
> ---
>
> Key: SPARK-28125
> URL: https://issues.apache.org/jira/browse/SPARK-28125
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib, PySpark, Spark Core
>Affects Versions: 2.4.3
> Environment: Run with Databricks Runtime 5.3 ML (includes Apache 
> Spark 2.4.0, Scala 2.11)
>  
> More details on the environment: 
> [https://docs.databricks.com/release-notes/runtime/5.3ml.html]
> The python package versions: 
> [https://docs.databricks.com/release-notes/runtime/5.3ml.html#python-libraries]
>Reporter: Zachary
>Priority: Major
>
> It appears that the function randomSplit on a DataFrame creates a separate 
> execution plan for each of the result DataFrames, or at least that's the 
> impression I get from reading a few StackOverflow pages on it: 
> [https://stackoverflow.com/questions/38379522/how-does-spark-keep-track-of-the-splits-in-randomsplit/38380023#38380023]
> [https://stackoverflow.com/questions/32933143/how-does-sparks-rdd-randomsplit-actually-split-the-rdd/32933366]
>  
> Because of the separate executions, it is easy to create a situation where 
> the Dataframes returned by randomSplit have overlapping rows. Thus if people 
> are relying on it to split a dataset into training and test, then they could 
> easily end up with the same rows in both sets, thus causing a serious problem 
> when running model evaluation. 
>  
> I know that if you call .cache() on the RDD before calling .randomSplit then 
> you can be assured that the returned frames have unique rows, but this 
> work-around is definitely not obvious. I did not know about this issue and 
> ended up creating improper data sets when doing model training and 
> evaluation. Something should be adjusted in .randomSplit so that under all 
> circumstances, the returned Dataframes will have unique rows. 
>  
> Here is a Pyspark script I wrote that re-creates the issue and includes the 
> work-around line that fixes it as a temporary workaround: 
>  
> {code:java}
> import numpy as np
> from pyspark.sql import Row
> from pyspark.sql.functions import *
> from pyspark.sql.types import *
> N = 10
> ratio1 = 0.85
> ratio2 = 0.15
> gen_rand = udf(lambda x: int(np.random.random()*5 + 2), IntegerType())
> orig_list = list(np.zeros(N))
> rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
> df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
> dfA = df.withColumn("ID2", gen_rand(df['ID']))
> orig_list = list(np.zeros(N))
> rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
> df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
> dfA = df.withColumn("ID2", gen_rand(df['ID']))
> dfA = dfA.select("ID2").distinct()
> dfA_els = dfA.rdd.map(lambda x: x['ID2']).collect()
> print("This confirms that if you look at the parent Dataframe, the ID2 col 
> has unqiue values")
> print("Num rows parent DF: {}".format(len(dfA_els)))
> print("num unique ID2 vals: {}".format(len(set(dfA_els
> #dfA = dfA.cache() #Uncommenting this line does fix the issue
> df1, df2 = dfA.randomSplit([ratio2, ratio1])
> df1_ids = set(df1.rdd.map(lambda x: x['ID2']).distinct().collect())
> df2_ids = set(df2.rdd.map(lambda x: x['ID2']).distinct().collect())
> num_inter = len(df1_ids.intersection(df2_ids))
> print()
> print("Number common IDs between the two splits: {}".format(num_inter))
> print("(should be zero if randomSplit is working as expected)")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-28155) do not leak SaveMode to file source v2

2020-02-18 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-28155.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

This is already done during a series of refactor for file source v2.

> do not leak SaveMode to file source v2
> --
>
> Key: SPARK-28155
> URL: https://issues.apache.org/jira/browse/SPARK-28155
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Wenchen Fan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Currently there is a hack in `DataFrameWriter`, which passes `SaveMode` to 
> file source v2. This should be removed and file source v2 should not accept 
> SaveMode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29103) CheckAnalysis for data source V2 ALTER TABLE ignores case sensitivity

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-29103:
-
Priority: Major  (was: Blocker)

> CheckAnalysis for data source V2 ALTER TABLE ignores case sensitivity
> -
>
> Key: SPARK-29103
> URL: https://issues.apache.org/jira/browse/SPARK-29103
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jose Torres
>Priority: Major
>
> For each column referenced, we run
> ```val field = table.schema.findNestedField(fieldName, includeCollections = 
> true)```
> and fail analysis if the field isn't there. This check is always 
> case-sensitive on column names, even if the underlying catalog is case 
> insensitive, so it will sometimes throw on ALTER operations which the catalog 
> supports.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29103) CheckAnalysis for data source V2 ALTER TABLE ignores case sensitivity

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-29103:
-
Priority: Blocker  (was: Major)

> CheckAnalysis for data source V2 ALTER TABLE ignores case sensitivity
> -
>
> Key: SPARK-29103
> URL: https://issues.apache.org/jira/browse/SPARK-29103
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jose Torres
>Priority: Blocker
>
> For each column referenced, we run
> ```val field = table.schema.findNestedField(fieldName, includeCollections = 
> true)```
> and fail analysis if the field isn't there. This check is always 
> case-sensitive on column names, even if the underlying catalog is case 
> insensitive, so it will sometimes throw on ALTER operations which the catalog 
> supports.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-29103) CheckAnalysis for data source V2 ALTER TABLE ignores case sensitivity

2020-02-18 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-29103.
--
Resolution: Duplicate

Fixed at SPARK-30615

> CheckAnalysis for data source V2 ALTER TABLE ignores case sensitivity
> -
>
> Key: SPARK-29103
> URL: https://issues.apache.org/jira/browse/SPARK-29103
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jose Torres
>Priority: Blocker
>
> For each column referenced, we run
> ```val field = table.schema.findNestedField(fieldName, includeCollections = 
> true)```
> and fail analysis if the field isn't there. This check is always 
> case-sensitive on column names, even if the underlying catalog is case 
> insensitive, so it will sometimes throw on ALTER operations which the catalog 
> supports.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org