[jira] [Updated] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-29 Thread Andrew Duffy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Duffy updated SPARK-22641:
-
Description: 
We seem to have found an issue with PySpark UDFs interacting with 
{{withColumn}} when the UDF depends on the column added in {{withColumn}}, but 
_only_ if {{withColumn}} is performed after a {{distinct()}}.

Simplest repro in a local PySpark shell:

{code}
import pyspark.sql.functions as F

@F.udf
def ident(x):
return x

spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.lit('qq')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}

This fails with the following exception:

{code}
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: pythonUDF0#13
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:513)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:659)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:164)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:141)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:138)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:374)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:422)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:113)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:141)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:138)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:233)
at 
org.apache.spark.sql.execution.SparkPlan.executeColl

[jira] [Updated] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-29 Thread Andrew Duffy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Duffy updated SPARK-22641:
-
Description: 
We seem to have found an issue with PySpark UDFs interacting with 
{{withColumn}} when the UDF depends on the column added in {{withColumn}}, but 
_only_ if {{withColumn}} is performed after a {{distinct()}}.

Simplest repro in a local PySpark shell:

{code}
import pyspark.sql.functions as F

@F.udf
def ident(x):
return x

spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.lit('qq')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}

This fails with the following exception:

{code}
py4j.protocol.Py4JJavaError: An error occurred while calling 
o263.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: pythonUDF0#97
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:513)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:659)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:164)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:141)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:138)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:374)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:422)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:113)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:141)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:138)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(S

[jira] [Commented] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-28 Thread Andrew Duffy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270132#comment-16270132
 ] 

Andrew Duffy commented on SPARK-22641:
--

Query plan with the literal:

{code}
== Parsed Logical Plan ==
'Project [a#98, b#102, ident('b) AS fails_here#106]
+- Project [a#98, qq AS b#102]
   +- Deduplicate [a#98]
  +- LogicalRDD [a#98], false

== Analyzed Logical Plan ==
a: string, b: string, fails_here: string
Project [a#98, b#102, ident(b#102) AS fails_here#106]
+- Project [a#98, qq AS b#102]
   +- Deduplicate [a#98]
  +- LogicalRDD [a#98], false

== Optimized Logical Plan ==
Aggregate [a#98], [a#98, qq AS b#102, ident(qq) AS fails_here#106]
+- LogicalRDD [a#98], false

== Physical Plan ==
*HashAggregate(keys=[a#98], functions=[], output=[a#98, b#102, fails_here#106])
+- Exchange hashpartitioning(a#98, 200)
   +- BatchEvalPython [ident(qq)], [a#98, pythonUDF0#111]
  +- *HashAggregate(keys=[a#98], functions=[], output=[a#98])
 +- Scan ExistingRDD[a#98]
{code}

And with {{F.col('a')}}

{code}
== Parsed Logical Plan ==
'Project [a#56, b#60, ident('b) AS fails_here#64]
+- Project [a#56, a#56 AS b#60]
   +- Deduplicate [a#56]
  +- LogicalRDD [a#56], false

== Analyzed Logical Plan ==
a: string, b: string, fails_here: string
Project [a#56, b#60, ident(b#60) AS fails_here#64]
+- Project [a#56, a#56 AS b#60]
   +- Deduplicate [a#56]
  +- LogicalRDD [a#56], false

== Optimized Logical Plan ==
Project [a#56, b#60, ident(a#56) AS fails_here#64]
+- Aggregate [a#56], [a#56, a#56 AS b#60, a#56]
   +- LogicalRDD [a#56], false

== Physical Plan ==
*Project [a#56, b#60, pythonUDF0#69 AS fails_here#64]
+- BatchEvalPython [ident(a#56)], [a#56, b#60, pythonUDF0#69]
   +- *Project [a#56, b#60]
  +- *HashAggregate(keys=[a#56], functions=[], output=[a#56, b#60, a#56])
 +- Exchange hashpartitioning(a#56, 200)
+- *HashAggregate(keys=[a#56], functions=[], output=[a#56])
   +- Scan ExistingRDD[a#56]
{code}

> Pyspark UDF relying on column added with withColumn after distinct
> --
>
> Key: SPARK-22641
> URL: https://issues.apache.org/jira/browse/SPARK-22641
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Andrew Duffy
>
> We seem to have found an issue with PySpark UDFs interacting with 
> {{withColumn}} when the UDF depends on the column added in {{withColumn}}, 
> but _only_ if {{withColumn}} is performed after a {{distinct()}}.
> Simplest repro in a local PySpark shell:
> {code}
> import pyspark.sql.functions as F
> @F.udf
> def ident(x):
> return x
> spark.createDataFrame([{'a': '1'}]) \
> .distinct() \
> .withColumn('b', F.lit('qq')) \
> .withColumn('fails_here', ident('b')) \
> .collect()
> {code}
> This fails with the following exception:
> {code}
> Py4JJavaError: An error occurred while calling o1321.collectToPython.
> : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
> attribute, tree: pythonUDF0#306
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateE

[jira] [Commented] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-28 Thread Andrew Duffy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270128#comment-16270128
 ] 

Andrew Duffy commented on SPARK-22641:
--

So it seems this is only a problem when using literal columns. As an example, 
the following riff on the original succeeds:

{code}
import pyspark.sql.functions as F

@F.udf
def ident(x):
return x

spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.col('a')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}

> Pyspark UDF relying on column added with withColumn after distinct
> --
>
> Key: SPARK-22641
> URL: https://issues.apache.org/jira/browse/SPARK-22641
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Andrew Duffy
>
> We seem to have found an issue with PySpark UDFs interacting with 
> {{withColumn}} when the UDF depends on the column added in {{withColumn}}, 
> but _only_ if {{withColumn}} is performed after a {{distinct()}}.
> Simplest repro in a local PySpark shell:
> {code}
> import pyspark.sql.functions as F
> @F.udf
> def ident(x):
> return x
> spark.createDataFrame([{'a': '1'}]) \
> .distinct() \
> .withColumn('b', F.lit('qq')) \
> .withColumn('fails_here', ident('b')) \
> .collect()
> {code}
> This fails with the following exception:
> {code}
> Py4JJavaError: An error occurred while calling o1321.collectToPython.
> : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
> attribute, tree: pythonUDF0#306
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:474)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:474)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:612)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$class.pro

[jira] [Comment Edited] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-28 Thread Andrew Duffy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270128#comment-16270128
 ] 

Andrew Duffy edited comment on SPARK-22641 at 11/29/17 4:44 AM:


So it seems this is only a problem when using literal columns. As an example, 
the following riff on the original succeeds:

{code}
import pyspark.sql.functions as F

@F.udf
def ident(x):
return x

# Note the F.col('a')
spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.col('a')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}


was (Author: andreweduffy):
So it seems this is only a problem when using literal columns. As an example, 
the following riff on the original succeeds:

{code}
import pyspark.sql.functions as F

@F.udf
def ident(x):
return x

spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.col('a')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}

> Pyspark UDF relying on column added with withColumn after distinct
> --
>
> Key: SPARK-22641
> URL: https://issues.apache.org/jira/browse/SPARK-22641
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Andrew Duffy
>
> We seem to have found an issue with PySpark UDFs interacting with 
> {{withColumn}} when the UDF depends on the column added in {{withColumn}}, 
> but _only_ if {{withColumn}} is performed after a {{distinct()}}.
> Simplest repro in a local PySpark shell:
> {code}
> import pyspark.sql.functions as F
> @F.udf
> def ident(x):
> return x
> spark.createDataFrame([{'a': '1'}]) \
> .distinct() \
> .withColumn('b', F.lit('qq')) \
> .withColumn('fails_here', ident('b')) \
> .collect()
> {code}
> This fails with the following exception:
> {code}
> Py4JJavaError: An error occurred while calling o1321.collectToPython.
> : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
> attribute, tree: pythonUDF0#306
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:474)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:474)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:612)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
>   at 

[jira] [Updated] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-28 Thread Andrew Duffy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Duffy updated SPARK-22641:
-
Description: 
We seem to have found an issue with PySpark UDFs interacting with 
{{withColumn}} when the UDF depends on the column added in {{withColumn}}, but 
_only_ if {{withColumn}} is performed after a {{distinct()}}.

Simplest repro in a local PySpark shell:

{code}
import pyspark.sql.functions as F

@F.udf
def ident(x):
return x

spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.lit('qq')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}

This fails with the following exception:

{code}
Py4JJavaError: An error occurred while calling o1321.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: pythonUDF0#306
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:474)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:474)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:612)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.ex

[jira] [Updated] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-28 Thread Andrew Duffy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Duffy updated SPARK-22641:
-
Description: 
We seem to have found an issue with PySpark UDFs interacting with 
{{withColumn}} when the UDF depends on the column added in {{withColumn}}, but 
_only_ if {{withColumn}} is performed after a {{distinct()}}.

Simplest repro in a local PySpark shell:

{code}
import pyspark.sql.functions as F

@F.udf
def ident(x):
return x

spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.lit('qq')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}

This fails with the following exception:

{code}
Py4JJavaError: An error occurred while calling o1321.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: pythonUDF0#306
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:474)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:474)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:612)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.ex

[jira] [Updated] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-28 Thread Andrew Duffy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Duffy updated SPARK-22641:
-
Description: 
We seem to have found an issue with PySpark UDFs interacting with 
{{withColumn}} when the UDF depends on the column added in {{withColumn}}, but 
_only_ if {{withColumn}} is performed after a {{distinct()}}.

Simplest repro in a local PySpark shell:

{code}
import pyspark.sql.functions as F

@F.udf
def ident(x):
return x

spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.lit('qq')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}

This fails with the following exception:

{code}
Py4JJavaError: An error occurred while calling o1321.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: pythonUDF0#306
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:474)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:474)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:612)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.ex

[jira] [Updated] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-28 Thread Andrew Duffy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Duffy updated SPARK-22641:
-
Description: 
We seem to have found an issue with PySpark UDFs interacting with 
{{withColumn}} when the UDF depends on the column added in {{withColumn}}, but 
_only_ if {{withColumn}} is performed after a {{distinct()}}.

Simplest repro in a local PySpark shell:

{code}
import pyspark.sql.functions as F

@F.udf
def ident(x):
return x

spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.lit('qq')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}

This fails with the following exception:

{code}
Run
File
Edit
View
Kernel

Local Scope

24
1
2
3
4
5
6
7
# Initialize
import pyspark.sql as S
import pyspark.sql.functions as F
sc = get_sc()
sqlContext = S.SQLContext(sc)
spark = sqlContext.sparkSession
 
No results
25
1
2
3
4
@F.udf
def ident(x):
return x
 
No results
40
1
3
5
4
2
6
spark.createDataFrame([{'a': '1'}]) \
.withColumn('b', F.lit('qq')) \
.collect()
.withColumn('fails_here', ident('b')) \
.distinct() \
 
No results


Py4JJavaErrorTraceback (most recent call last)
 in ()
> 1 spark.createDataFrame([{'a': '1'}]) .distinct() 
.withColumn('b', F.lit('qq')) .withColumn('fails_here', ident('b')) 
.collect()

/opt/palantir/services/.296331252/service/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py
 in collect(self)
428 """
429 with SCCallSiteSync(self._sc) as css:
--> 430 port = self._jdf.collectToPython()
431 return list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer(
432 

/opt/palantir/services/.296331252/var/data/envs/python/default/3365517267c0b352b50f13a35d1b2ed1/lib/python2.7/site-packages/py4j/java_gateway.pyc
 in __call__(self, *args)
   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135 for temp_arg in temp_args:

/opt/palantir/services/.296331252/service/spark/python/lib/pyspark.zip/pyspark/sql/utils.py
 in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/opt/palantir/services/.296331252/var/data/envs/python/default/3365517267c0b352b50f13a35d1b2ed1/lib/python2.7/site-packages/py4j/protocol.pyc
 in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling o1321.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: pythonUDF0#306
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:474)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.co

[jira] [Created] (SPARK-22641) Pyspark UDF relying on column added with withColumn after distinct

2017-11-28 Thread Andrew Duffy (JIRA)
Andrew Duffy created SPARK-22641:


 Summary: Pyspark UDF relying on column added with withColumn after 
distinct
 Key: SPARK-22641
 URL: https://issues.apache.org/jira/browse/SPARK-22641
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.3.0
Reporter: Andrew Duffy


We seem to have found an issue with PySpark UDFs interacting with 
{{withColumn}} when the UDF depends on the column added in {{withColumn}}, but 
_only_ if {{withColumn}} is performed after a {{distinct()}}.

Simplest repro in a local PySpark shell:

{code}
import pyspark.sql.functions as F

@F.udf(returnType="integer")
def ident(x):
return x

df = spark.createDataFrame([
{'a': '1', 'nums': ['1']},
{'a': '2', 'nums': ['1', '2']}
])
df2 = df.distinct().withColumn('c', F.lit(1))
df2.show()
df2.withColumn('added', ident(df2['c'])).collect()
{code}

The {{df.show()}} will succeed, but the following collect fails with the 
following exception:

{code}
Traceback (most recent call last):
  File "", line 1, in 
  File "/Users/aduffy/git/open_source/spark/python/pyspark/sql/dataframe.py", 
line 451, in collect
port = self._jdf.collectToPython()
  File 
"/Users/aduffy/git/open_source/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
 line 1160, in __call__
  File "/Users/aduffy/git/open_source/spark/python/pyspark/sql/utils.py", line 
63, in deco
return f(*a, **kw)
  File 
"/Users/aduffy/git/open_source/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py",
 line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
o72.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
attribute, tree: pythonUDF0#26
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:512)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:511)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:511)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:657)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:164)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:141)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:138)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at 
org.apache.spark.sql.execution.aggregate.HashAggr

[jira] [Comment Edited] (SPARK-21218) Convert IN predicate to equivalent Parquet filter

2017-06-27 Thread Andrew Duffy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065177#comment-16065177
 ] 

Andrew Duffy edited comment on SPARK-21218 at 6/27/17 5:39 PM:
---

Curious, I wonder what the previous benchmarks were lacking.

 Have you tried disjunction push-down on other datatypes, e.g. strings? In any 
case, I'm also on board with this change, if it is in fact useful.

I think [~hyukjin.kwon] was saying we should close this as a dupe, and rename 
the PR with the original ticket number 
[17091|https://issues.apache.org/jira/browse/SPARK-17091]. You can re-open that 
issue and say that you've done tests where this now looks like it will be a big 
improvement.


was (Author: andreweduffy):
Curious, I wonder what the previous benchmarks were lacking.

 Have you tried disjunction push-down on other datatypes, e.g. strings? In any 
case, I'm also on board with this change, if it is in fact useful.

I think [~hyukjin.kwon] was saying we should close this as a dupe, and rename 
the PR with the original ticket number (#17091). You can re-open that issue and 
say that you've done tests where this now looks like it will be a big 
improvement.

> Convert IN predicate to equivalent Parquet filter
> -
>
> Key: SPARK-21218
> URL: https://issues.apache.org/jira/browse/SPARK-21218
> Project: Spark
>  Issue Type: Improvement
>  Components: Optimizer
>Affects Versions: 2.1.1
>Reporter: Michael Styles
> Attachments: IN Predicate.png, OR Predicate.png
>
>
> Convert IN predicate to equivalent expression involving equality conditions 
> to allow the filter to be pushed down to Parquet.
> For instance,
> C1 IN (10, 20) is rewritten as (C1 = 10) OR (C1 = 20)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21218) Convert IN predicate to equivalent Parquet filter

2017-06-27 Thread Andrew Duffy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065177#comment-16065177
 ] 

Andrew Duffy commented on SPARK-21218:
--

Curious, I wonder what the previous benchmarks were lacking.

 Have you tried disjunction push-down on other datatypes, e.g. strings? In any 
case, I'm also on board with this change, if it is in fact useful.

I think [~hyukjin.kwon] was saying we should close this as a dupe, and rename 
the PR with the original ticket number (#17091). You can re-open that issue and 
say that you've done tests where this now looks like it will be a big 
improvement.

> Convert IN predicate to equivalent Parquet filter
> -
>
> Key: SPARK-21218
> URL: https://issues.apache.org/jira/browse/SPARK-21218
> Project: Spark
>  Issue Type: Improvement
>  Components: Optimizer
>Affects Versions: 2.1.1
>Reporter: Michael Styles
> Attachments: IN Predicate.png, OR Predicate.png
>
>
> Convert IN predicate to equivalent expression involving equality conditions 
> to allow the filter to be pushed down to Parquet.
> For instance,
> C1 IN (10, 20) is rewritten as (C1 = 10) OR (C1 = 20)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21218) Convert IN predicate to equivalent Parquet filter

2017-06-26 Thread Andrew Duffy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16063981#comment-16063981
 ] 

Andrew Duffy commented on SPARK-21218:
--

Good catch, looks like a dupe. [~hyukjin.kwon] did profiling on the issue last 
year and it was determined at the time that eval'ing the disjunction in Spark 
was faster.

> Convert IN predicate to equivalent Parquet filter
> -
>
> Key: SPARK-21218
> URL: https://issues.apache.org/jira/browse/SPARK-21218
> Project: Spark
>  Issue Type: Improvement
>  Components: Optimizer
>Affects Versions: 2.1.1
>Reporter: Michael Styles
>
> Convert IN predicate to equivalent expression involving equality conditions 
> to allow the filter to be pushed down to Parquet.
> For instance,
> C1 IN (10, 20) is rewritten as (C1 = 10) OR (C1 = 20)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-17091) ParquetFilters rewrite IN to OR of Eq

2017-06-26 Thread Andrew Duffy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Duffy resolved SPARK-17091.
--
Resolution: Won't Fix

Should've closed this last year, but at the time based on Hyukjin Kwon's 
profiling, evaluating the disjunction at the Spark level was determined to 
yield better performance.

> ParquetFilters rewrite IN to OR of Eq
> -
>
> Key: SPARK-17091
> URL: https://issues.apache.org/jira/browse/SPARK-17091
> Project: Spark
>  Issue Type: Bug
>Reporter: Andrew Duffy
>
> Past attempts at pushing down the InSet operation for Parquet relied on 
> user-defined predicates. It would be simpler to rewrite an IN clause into the 
> corresponding OR union of a set of equality conditions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17310) Disable Parquet's record-by-record filter in normal parquet reader and do it in Spark-side

2016-08-30 Thread Andrew Duffy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15448672#comment-15448672
 ] 

Andrew Duffy commented on SPARK-17310:
--

+1 to this, see comments on https://github.com/apache/spark/pull/14671, 
particularly rdblue's comment. We need to wait for next release of Parquet to 
be able to be able to set {{parquet.filter.record-level.enabled}} config

> Disable Parquet's record-by-record filter in normal parquet reader and do it 
> in Spark-side
> --
>
> Key: SPARK-17310
> URL: https://issues.apache.org/jira/browse/SPARK-17310
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Hyukjin Kwon
>
> Currently, we are pushing filters down for normal Parquet reader which also 
> filters record-by-record.
> It seems Spark-side codegen row-by-row filtering might be faster than 
> Parquet's one in general due to type-boxing and virtual function calls which 
> Spark's one tries to avoid.
> Maybe we should perform a benchmark and disable this. This ticket was from 
> https://github.com/apache/spark/pull/14671
> Please refer the discussion in the PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17213) Parquet String Pushdown for Non-Eq Comparisons Broken

2016-08-24 Thread Andrew Duffy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Duffy updated SPARK-17213:
-
Description: 
Spark defines ordering over strings based on comparison of UTF8 byte arrays, 
which compare bytes as unsigned integers. Currently however Parquet does not 
respect this ordering. This is currently in the process of being fixed in 
Parquet, JIRA and PR link below, but currently all filters are broken over 
strings, with there actually being a correctness issue for {{>}} and {{<}}.

*Repro:*
Querying directly from in-memory DataFrame:
{code}
> Seq("a", "é").toDF("name").where("name > 'a'").count
1
{code}

Querying from a parquet dataset:
{code}
> Seq("a", "é").toDF("name").write.parquet("/tmp/bad")
> spark.read.parquet("/tmp/bad").where("name > 'a'").count
0
{code}
This happens because Spark sorts the rows to be {{[a, é]}}, but Parquet's 
implementation of comparison of strings is based on signed byte array 
comparison, so it will actually create 1 row group with statistics 
{{min=é,max=a}}, and so the row group will be dropped by the query.

Based on the way Parquet pushes down Eq, it will not be affecting correctness 
but it will force you to read row groups you should be able to skip.

Link to PARQUET issue: https://issues.apache.org/jira/browse/PARQUET-686
Link to PR: https://github.com/apache/parquet-mr/pull/362

  was:
Spark defines ordering over strings based on comparison of UTF8 byte arrays, 
which compare bytes as unsigned integers. Currently however Parquet does not 
respect this ordering. This is currently in the process of being fixed in 
Parquet, JIRA and PR link below, but currently all filters are broken over 
strings, with there actually being a correctness issue for {{>}} and {{<}}.

*Repro:*
Querying directly from in-memory DataFrame:
{code}
> Seq("a", "é").toDF("name").where("name > 'a'").count
1
{code}

Querying from a parquet dataset:
{code}
> Seq("a", "é").toDF("name").write.parquet("/tmp/bad")
> spark.read.parquet("/tmp/bad").where("name > 'a'").count
0
{code}
This happens because Spark sorts the rows to be {{[a, é]}}, but Parquet's 
implementation of comparison of strings is based on signed byte array 
comparison, so it will actually create 1 row group with statistics 
{{min=é,max=a}}, and so the row group will be dropped by the query.

Link to PARQUET issue: https://issues.apache.org/jira/browse/PARQUET-686
Link to PR: https://github.com/apache/parquet-mr/pull/362


> Parquet String Pushdown for Non-Eq Comparisons Broken
> -
>
> Key: SPARK-17213
> URL: https://issues.apache.org/jira/browse/SPARK-17213
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Andrew Duffy
>
> Spark defines ordering over strings based on comparison of UTF8 byte arrays, 
> which compare bytes as unsigned integers. Currently however Parquet does not 
> respect this ordering. This is currently in the process of being fixed in 
> Parquet, JIRA and PR link below, but currently all filters are broken over 
> strings, with there actually being a correctness issue for {{>}} and {{<}}.
> *Repro:*
> Querying directly from in-memory DataFrame:
> {code}
> > Seq("a", "é").toDF("name").where("name > 'a'").count
> 1
> {code}
> Querying from a parquet dataset:
> {code}
> > Seq("a", "é").toDF("name").write.parquet("/tmp/bad")
> > spark.read.parquet("/tmp/bad").where("name > 'a'").count
> 0
> {code}
> This happens because Spark sorts the rows to be {{[a, é]}}, but Parquet's 
> implementation of comparison of strings is based on signed byte array 
> comparison, so it will actually create 1 row group with statistics 
> {{min=é,max=a}}, and so the row group will be dropped by the query.
> Based on the way Parquet pushes down Eq, it will not be affecting correctness 
> but it will force you to read row groups you should be able to skip.
> Link to PARQUET issue: https://issues.apache.org/jira/browse/PARQUET-686
> Link to PR: https://github.com/apache/parquet-mr/pull/362



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17213) Parquet String Pushdown for Non-Eq Comparisons Broken

2016-08-24 Thread Andrew Duffy (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Duffy updated SPARK-17213:
-
Description: 
Spark defines ordering over strings based on comparison of UTF8 byte arrays, 
which compare bytes as unsigned integers. Currently however Parquet does not 
respect this ordering. This is currently in the process of being fixed in 
Parquet, JIRA and PR link below, but currently all filters are broken over 
strings, with there actually being a correctness issue for {{>}} and {{<}}.

*Repro:*
Querying directly from in-memory DataFrame:
{code}
> Seq("a", "é").toDF("name").where("name > 'a'").count
1
{code}

Querying from a parquet dataset:
{code}
> Seq("a", "é").toDF("name").write.parquet("/tmp/bad")
> spark.read.parquet("/tmp/bad").where("name > 'a'").count
0
{code}
This happens because Spark sorts the rows to be {{[a, é]}}, but Parquet's 
implementation of comparison of strings is based on signed byte array 
comparison, so it will actually create 1 row group with statistics 
{{min=é,max=a}}, and so the row group will be dropped by the query.

Link to PARQUET issue: https://issues.apache.org/jira/browse/PARQUET-686
Link to PR: https://github.com/apache/parquet-mr/pull/362

  was:
Spark defines ordering over strings based on comparison of UTF8 byte arrays, 
which compare bytes as unsigned integers. Currently however Parquet does not 
respect this ordering. This is currently in the process of being fixed in 
Parquet, JIRA and PR link below, but currently all filters are broken over 
strings, with there actually being a correctness issue for {{>}} and {{<}}.

*Repro:*
Querying directly from in-memory DataFrame:
{code}
> Seq("a", "é").toDF("name").where("name > 'a'").count
1
{code}

Querying from a parquet dataset:
{code}
> Seq("a", "é").toDF("name").write.parquet("/tmp/bad")
> spark.read.parquet("/tmp/bad").where("name > 'a'").count
0
{code}
This happens because Spark sorts the rows to be {{[a, é]}}, but Parquet's 
implementation of comparison of strings is based on signed byte array 
comparison, so it will actually create 1 row group with statistics 
{{min=é,max=a}}, and so the row group will be dropped by the query.


> Parquet String Pushdown for Non-Eq Comparisons Broken
> -
>
> Key: SPARK-17213
> URL: https://issues.apache.org/jira/browse/SPARK-17213
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Andrew Duffy
>
> Spark defines ordering over strings based on comparison of UTF8 byte arrays, 
> which compare bytes as unsigned integers. Currently however Parquet does not 
> respect this ordering. This is currently in the process of being fixed in 
> Parquet, JIRA and PR link below, but currently all filters are broken over 
> strings, with there actually being a correctness issue for {{>}} and {{<}}.
> *Repro:*
> Querying directly from in-memory DataFrame:
> {code}
> > Seq("a", "é").toDF("name").where("name > 'a'").count
> 1
> {code}
> Querying from a parquet dataset:
> {code}
> > Seq("a", "é").toDF("name").write.parquet("/tmp/bad")
> > spark.read.parquet("/tmp/bad").where("name > 'a'").count
> 0
> {code}
> This happens because Spark sorts the rows to be {{[a, é]}}, but Parquet's 
> implementation of comparison of strings is based on signed byte array 
> comparison, so it will actually create 1 row group with statistics 
> {{min=é,max=a}}, and so the row group will be dropped by the query.
> Link to PARQUET issue: https://issues.apache.org/jira/browse/PARQUET-686
> Link to PR: https://github.com/apache/parquet-mr/pull/362



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17213) Parquet String Pushdown for Non-Eq Comparisons Broken

2016-08-24 Thread Andrew Duffy (JIRA)
Andrew Duffy created SPARK-17213:


 Summary: Parquet String Pushdown for Non-Eq Comparisons Broken
 Key: SPARK-17213
 URL: https://issues.apache.org/jira/browse/SPARK-17213
 Project: Spark
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Andrew Duffy


Spark defines ordering over strings based on comparison of UTF8 byte arrays, 
which compare bytes as unsigned integers. Currently however Parquet does not 
respect this ordering. This is currently in the process of being fixed in 
Parquet, JIRA and PR link below, but currently all filters are broken over 
strings, with there actually being a correctness issue for {{>}} and {{<}}.

*Repro:*
Querying directly from in-memory DataFrame:
{code}
> Seq("a", "é").toDF("name").where("name > 'a'").count
1
{code}

Querying from a parquet dataset:
{code}
> Seq("a", "é").toDF("name").write.parquet("/tmp/bad")
> spark.read.parquet("/tmp/bad").where("name > 'a'").count
0
{code}
This happens because Spark sorts the rows to be {{[a, é]}}, but Parquet's 
implementation of comparison of strings is based on signed byte array 
comparison, so it will actually create 1 row group with statistics 
{{min=é,max=a}}, and so the row group will be dropped by the query.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17091) ParquetFilters rewrite IN to OR of Eq

2016-08-16 Thread Andrew Duffy (JIRA)
Andrew Duffy created SPARK-17091:


 Summary: ParquetFilters rewrite IN to OR of Eq
 Key: SPARK-17091
 URL: https://issues.apache.org/jira/browse/SPARK-17091
 Project: Spark
  Issue Type: Bug
Reporter: Andrew Duffy


Past attempts at pushing down the InSet operation for Parquet relied on 
user-defined predicates. It would be simpler to rewrite an IN clause into the 
corresponding OR union of a set of equality conditions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17059) Allow FileFormat to specify partition pruning strategy

2016-08-15 Thread Andrew Duffy (JIRA)
Andrew Duffy created SPARK-17059:


 Summary: Allow FileFormat to specify partition pruning strategy
 Key: SPARK-17059
 URL: https://issues.apache.org/jira/browse/SPARK-17059
 Project: Spark
  Issue Type: Bug
Reporter: Andrew Duffy


Allow Spark to have pluggable pruning of input files for FileSourceScanExec by 
allowing FileFormat's to specify format-specific filterPartitions method.

This is especially useful for Parquet as Spark does not currently make use of 
the summary metadata, instead reading the footer of all part files for a 
Parquet data source. This can lead to massive speedups when reading a filtered 
chunk of a dataset, especially when using remote storage (S3).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16265) Add option to SparkSubmit to ship driver JRE to YARN

2016-07-07 Thread Andrew Duffy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365998#comment-15365998
 ] 

Andrew Duffy commented on SPARK-16265:
--

Hi Sean, yeah I can see where you're coming from, but I feel like this change 
is simple and targeted enough (meant to be used with the {{SparkLauncher}} API) 
that it can actually be useful without adding much (if any) maintenance load. 
If anything I would argue it at least deserves consideration as an experimental 
feature, as users who write programs that use SparkLauncher are going to have 
to split Java versions for the code that launches and interacts with the Spark 
app and the Spark app itself if the application is eg. written for one 
environment and then deployed in another uncontrolled customer environment 
where the cluster does not have Java 8 installed.

> Add option to SparkSubmit to ship driver JRE to YARN
> 
>
> Key: SPARK-16265
> URL: https://issues.apache.org/jira/browse/SPARK-16265
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 1.6.2
>Reporter: Andrew Duffy
>
> Add an option to {{SparkSubmit}} to allow the driver to package up it's 
> version of the JRE to be shipped to a YARN cluster. This allows deploying 
> Spark applications to a YARN cluster in which its required Java version need 
> not match one of the versions already installed on the YARN cluster, useful 
> in situations in which the Spark Application developer does not have 
> administrative access over the YARN cluster (ex. school or corporate 
> environment) but still wants to use certain language features in their code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16265) Add option to SparkSubmit to ship driver JRE to YARN

2016-06-28 Thread Andrew Duffy (JIRA)
Andrew Duffy created SPARK-16265:


 Summary: Add option to SparkSubmit to ship driver JRE to YARN
 Key: SPARK-16265
 URL: https://issues.apache.org/jira/browse/SPARK-16265
 Project: Spark
  Issue Type: Improvement
Affects Versions: 1.6.2
Reporter: Andrew Duffy
 Fix For: 2.1.0


Add an option to {{SparkSubmit}} to allow the driver to package up it's version 
of the JRE to be shipped to a YARN cluster. This allows deploying Spark 
applications to a YARN cluster in which its required Java version need not 
match one of the versions already installed on the YARN cluster, useful in 
situations in which the Spark Application developer does not have 
administrative access over the YARN cluster (ex. school or corporate 
environment) but still wants to use certain language features in their code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org