[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2019-02-05 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761535#comment-16761535
 ] 

Mitesh edited comment on SPARK-19981 at 2/6/19 6:54 AM:


Ping any updates here? This still is an issue in 2.3.2. 

Also maybe a dupe of SPARK-19468


was (Author: masterddt):
Ping any updates here? This still is an issue in 2.3.2.

> Sort-Merge join inserts shuffles when joining dataframes with aliased columns
> -
>
> Key: SPARK-19981
> URL: https://issues.apache.org/jira/browse/SPARK-19981
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Allen George
>Priority: Major
>
> Performing a sort-merge join with two dataframes - each of which has the join 
> column aliased - causes Spark to insert an unnecessary shuffle.
> Consider the scala test code below, which should be equivalent to the 
> following SQL.
> {code:SQL}
> SELECT * FROM
>   (SELECT number AS aliased from df1) t1
> LEFT JOIN
>   (SELECT number AS aliased from df2) t2
> ON t1.aliased = t2.aliased
> {code}
> {code:scala}
> private case class OneItem(number: Long)
> private case class TwoItem(number: Long, value: String)
> test("join with aliases should not trigger shuffle") {
>   val df1 = sqlContext.createDataFrame(
> Seq(
>   OneItem(0),
>   OneItem(2),
>   OneItem(4)
> )
>   )
>   val partitionedDf1 = df1.repartition(10, col("number"))
>   partitionedDf1.createOrReplaceTempView("df1")
>   partitionedDf1.cache() partitionedDf1.count()
>   
>   val df2 = sqlContext.createDataFrame(
> Seq(
>   TwoItem(0, "zero"),
>   TwoItem(2, "two"),
>   TwoItem(4, "four")
> )
>   )
>   val partitionedDf2 = df2.repartition(10, col("number"))
>   partitionedDf2.createOrReplaceTempView("df2")
>   partitionedDf2.cache() partitionedDf2.count()
>   
>   val fromDf1 = sqlContext.sql("SELECT number from df1")
>   val fromDf2 = sqlContext.sql("SELECT number from df2")
>   val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased")
>   val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased")
>   aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") }
> {code}
> Both the SQL and the Scala code generate a query-plan where an extra exchange 
> is inserted before performing the sort-merge join. This exchange changes the 
> partitioning from {{HashPartitioning("number", 10)}} for each frame being 
> joined into {{HashPartitioning("aliased", 5)}}. I would have expected that 
> since it's a simple column aliasing, and both frames have exactly the same 
> partitioning that the initial frames.
> {noformat} 
> *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, 
> hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, 
> 5)%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)]
> +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], 
> Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, 
> 5)%NONNULL,hashpartitioning(aliased#270L, 
> 5)%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, 
> aliased#270:bigint%NONNULL)]
>:- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, 
> aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>:  +- Exchange [args=hashpartitioning(aliased#267L, 
> 5)%NONNULL][outPart=HashPartitioning(5, 
> aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>: +- *Project [args=[number#198L AS 
> aliased#267L]][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>:+- InMemoryTableScan 
> [args=[number#198L]][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)]
>:   :  +- InMemoryRelation [number#198L], true, 1, 
> StorageLevel(disk, memory, deserialized, 1 replicas), 
> false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)]
>:   : :  +- Exchange [args=hashpartitioning(number#198L, 
> 10)%NONNULL][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>:   : : +- LocalTableScan 
> [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>+- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, 
> aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
>   +- Exchange 

[jira] [Commented] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2019-02-05 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761535#comment-16761535
 ] 

Mitesh commented on SPARK-19981:


Ping any updates here? This still is an issue in 2.3.2.

> Sort-Merge join inserts shuffles when joining dataframes with aliased columns
> -
>
> Key: SPARK-19981
> URL: https://issues.apache.org/jira/browse/SPARK-19981
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Allen George
>Priority: Major
>
> Performing a sort-merge join with two dataframes - each of which has the join 
> column aliased - causes Spark to insert an unnecessary shuffle.
> Consider the scala test code below, which should be equivalent to the 
> following SQL.
> {code:SQL}
> SELECT * FROM
>   (SELECT number AS aliased from df1) t1
> LEFT JOIN
>   (SELECT number AS aliased from df2) t2
> ON t1.aliased = t2.aliased
> {code}
> {code:scala}
> private case class OneItem(number: Long)
> private case class TwoItem(number: Long, value: String)
> test("join with aliases should not trigger shuffle") {
>   val df1 = sqlContext.createDataFrame(
> Seq(
>   OneItem(0),
>   OneItem(2),
>   OneItem(4)
> )
>   )
>   val partitionedDf1 = df1.repartition(10, col("number"))
>   partitionedDf1.createOrReplaceTempView("df1")
>   partitionedDf1.cache() partitionedDf1.count()
>   
>   val df2 = sqlContext.createDataFrame(
> Seq(
>   TwoItem(0, "zero"),
>   TwoItem(2, "two"),
>   TwoItem(4, "four")
> )
>   )
>   val partitionedDf2 = df2.repartition(10, col("number"))
>   partitionedDf2.createOrReplaceTempView("df2")
>   partitionedDf2.cache() partitionedDf2.count()
>   
>   val fromDf1 = sqlContext.sql("SELECT number from df1")
>   val fromDf2 = sqlContext.sql("SELECT number from df2")
>   val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased")
>   val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased")
>   aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") }
> {code}
> Both the SQL and the Scala code generate a query-plan where an extra exchange 
> is inserted before performing the sort-merge join. This exchange changes the 
> partitioning from {{HashPartitioning("number", 10)}} for each frame being 
> joined into {{HashPartitioning("aliased", 5)}}. I would have expected that 
> since it's a simple column aliasing, and both frames have exactly the same 
> partitioning that the initial frames.
> {noformat} 
> *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, 
> hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, 
> 5)%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)]
> +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], 
> Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, 
> 5)%NONNULL,hashpartitioning(aliased#270L, 
> 5)%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, 
> aliased#270:bigint%NONNULL)]
>:- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, 
> aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>:  +- Exchange [args=hashpartitioning(aliased#267L, 
> 5)%NONNULL][outPart=HashPartitioning(5, 
> aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>: +- *Project [args=[number#198L AS 
> aliased#267L]][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>:+- InMemoryTableScan 
> [args=[number#198L]][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)]
>:   :  +- InMemoryRelation [number#198L], true, 1, 
> StorageLevel(disk, memory, deserialized, 1 replicas), 
> false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)]
>:   : :  +- Exchange [args=hashpartitioning(number#198L, 
> 10)%NONNULL][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>:   : : +- LocalTableScan 
> [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>+- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, 
> aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
>   +- Exchange [args=hashpartitioning(aliased#270L, 
> 5)%NONNULL][outPart=HashPartitioning(5, 
> aliased#270:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
> 

[jira] [Comment Edited] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2019-02-05 Thread Bruce Robbins (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761289#comment-16761289
 ] 

Bruce Robbins edited comment on SPARK-26708 at 2/6/19 12:41 AM:


How does one hit this issue?

Edit: Ah, never mind. I see there is a test.


was (Author: bersprockets):
How does one hit this issue?

> Incorrect result caused by inconsistency between a SQL cache's cached RDD and 
> its physical plan
> ---
>
> Key: SPARK-26708
> URL: https://issues.apache.org/jira/browse/SPARK-26708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiao Li
>Assignee: Maryann Xue
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.1, 3.0.0
>
>
> When performing non-cascading cache invalidation, {{recache}} is called on 
> the other cache entries which are dependent on the cache being invalidated. 
> It leads to the the physical plans of those cache entries being re-compiled. 
> For those cache entries, if the cache RDD has already been persisted, chances 
> are there will be inconsistency between the data and the new plan. It can 
> cause a correctness issue if the new plan's {{outputPartitioning}} or 
> {{outputOrdering}} is different from the that of the actual data, and 
> meanwhile the cache is used by another query that asks for specific 
> {{outputPartitioning}} or {{outputOrdering}} which happens to match the new 
> plan but not the actual data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26733) Clean up entrypoint.sh

2019-02-05 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-26733.

   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23655
[https://github.com/apache/spark/pull/23655]

> Clean up entrypoint.sh
> --
>
> Key: SPARK-26733
> URL: https://issues.apache.org/jira/browse/SPARK-26733
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.0.0
>Reporter: Marcelo Vanzin
>Assignee: Marcelo Vanzin
>Priority: Minor
> Fix For: 3.0.0
>
>
> After recent cleanups in the surrounding code, entrypoint.sh has collected 
> some unused code and can be cleaned up a little.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26733) Clean up entrypoint.sh

2019-02-05 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-26733:
--

Assignee: Marcelo Vanzin

> Clean up entrypoint.sh
> --
>
> Key: SPARK-26733
> URL: https://issues.apache.org/jira/browse/SPARK-26733
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.0.0
>Reporter: Marcelo Vanzin
>Assignee: Marcelo Vanzin
>Priority: Minor
>
> After recent cleanups in the surrounding code, entrypoint.sh has collected 
> some unused code and can be cleaned up a little.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17636) Parquet predicate pushdown for nested fields

2019-02-05 Thread DB Tsai (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761312#comment-16761312
 ] 

DB Tsai commented on SPARK-17636:
-

[~MasterDDT] This is different from [SPARK-4502]. One is predicate pushdown for 
nested columns, and the other is nested schema pruning. 

> Parquet predicate pushdown for nested fields
> 
>
> Key: SPARK-17636
> URL: https://issues.apache.org/jira/browse/SPARK-17636
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, SQL
>Affects Versions: 1.6.2, 1.6.3, 2.0.2
>Reporter: Mitesh
>Assignee: DB Tsai
>Priority: Minor
> Fix For: 3.0.0
>
>
> There's a *PushedFilters* for a simple numeric field, but not for a numeric 
> field inside a struct. Not sure if this is a Spark limitation because of 
> Parquet, or only a Spark limitation.
> {noformat}
> scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", 
> "sale_id")
> res5: org.apache.spark.sql.DataFrame = [day_timestamp: 
> struct, sale_id: bigint]
> scala> res5.filter("sale_id > 4").queryExecution.executedPlan
> res9: org.apache.spark.sql.execution.SparkPlan =
> Filter[23814] [args=(sale_id#86324L > 
> 4)][outPart=UnknownPartitioning(0)][outOrder=List()]
> +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: 
> s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)]
> scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan
> res10: org.apache.spark.sql.execution.SparkPlan =
> Filter[23815] [args=(day_timestamp#86302.timestamp > 
> 4)][outPart=UnknownPartitioning(0)][outOrder=List()]
> +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: 
> s3a://some/parquet/file
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17636) Parquet predicate pushdown for nested fields

2019-02-05 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761311#comment-16761311
 ] 

Mitesh commented on SPARK-17636:


Should this be closed, as a duplicate of SPARK-4502?

> Parquet predicate pushdown for nested fields
> 
>
> Key: SPARK-17636
> URL: https://issues.apache.org/jira/browse/SPARK-17636
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, SQL
>Affects Versions: 1.6.2, 1.6.3, 2.0.2
>Reporter: Mitesh
>Assignee: DB Tsai
>Priority: Minor
> Fix For: 3.0.0
>
>
> There's a *PushedFilters* for a simple numeric field, but not for a numeric 
> field inside a struct. Not sure if this is a Spark limitation because of 
> Parquet, or only a Spark limitation.
> {noformat}
> scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", 
> "sale_id")
> res5: org.apache.spark.sql.DataFrame = [day_timestamp: 
> struct, sale_id: bigint]
> scala> res5.filter("sale_id > 4").queryExecution.executedPlan
> res9: org.apache.spark.sql.execution.SparkPlan =
> Filter[23814] [args=(sale_id#86324L > 
> 4)][outPart=UnknownPartitioning(0)][outOrder=List()]
> +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: 
> s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)]
> scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan
> res10: org.apache.spark.sql.execution.SparkPlan =
> Filter[23815] [args=(day_timestamp#86302.timestamp > 
> 4)][outPart=UnknownPartitioning(0)][outOrder=List()]
> +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: 
> s3a://some/parquet/file
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19468) Dataset slow because of unnecessary shuffles

2019-02-05 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761309#comment-16761309
 ] 

Mitesh commented on SPARK-19468:


Also this may be a dupe of SPARK-19981

> Dataset slow because of unnecessary shuffles
> 
>
> Key: SPARK-19468
> URL: https://issues.apache.org/jira/browse/SPARK-19468
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: koert kuipers
>Priority: Major
>
> we noticed that some algos we ported from rdd to dataset are significantly 
> slower, and the main reason seems to be more shuffles that we successfully 
> avoid for rdds by careful partitioning. this seems to be dataset specific as 
> it works ok for dataframe.
> see also here:
> http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/
> it kind of boils down to this... if i partition and sort dataframes that get 
> used for joins repeatedly i can avoid shuffles:
> {noformat}
> System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")
> val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")
>   
> .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
> val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")
>   
> .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)
> val joined = df1.join(df2, col("key") === col("key2"))
> joined.explain
> == Physical Plan ==
> *SortMergeJoin [key#5], [key2#27], Inner
> :- InMemoryTableScan [key#5, value#6]
> : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 
> replicas)
> :   +- *Sort [key#5 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(key#5, 4)
> : +- LocalTableScan [key#5, value#6]
> +- InMemoryTableScan [key2#27, value2#28]
>   +- InMemoryRelation [key2#27, value2#28], true, 1, 
> StorageLevel(disk, 1 replicas)
> +- *Sort [key2#27 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(key2#27, 4)
>   +- LocalTableScan [key2#27, value2#28]
> {noformat}
> notice how the persisted dataframes are not shuffled or sorted anymore before 
> being used in the join. however if i try to do the same with dataset i have 
> no luck:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#105._1], [_2#106._1], Inner
> :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#105._1, 4)
> : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
> :+- InMemoryTableScan [_1#83, _2#84]
> :  +- InMemoryRelation [_1#83, _2#84], true, 1, 
> StorageLevel(disk, 1 replicas)
> :+- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :   +- Exchange hashpartitioning(_1#83, 4)
> :  +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(_2#106._1, 4)
>   +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
>  +- InMemoryTableScan [_1#100, _2#101]
>+- InMemoryRelation [_1#100, _2#101], true, 1, 
> StorageLevel(disk, 1 replicas)
>  +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_1#83, 4)
>+- LocalTableScan [_1#83, _2#84]
> {noformat}
> notice how my persisted Datasets are shuffled and sorted again. part of the 
> issue seems to be in joinWith, which does some preprocessing that seems to 
> confuse the planner. if i change the joinWith to join (which returns a 
> dataframe) it looks a little better in that only one side gets shuffled 
> again, but still not optimal:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#83], [_1#100], Inner
> :- InMemoryTableScan [_1#83, _2#84]
> : +- InMemoryRelation [_1#83, _2#84], true, 1, StorageLevel(disk, 1 
> replicas)
> :   +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#83, 4)
> : +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_1#100 ASC NULLS 

[jira] [Commented] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2019-02-05 Thread Bruce Robbins (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761289#comment-16761289
 ] 

Bruce Robbins commented on SPARK-26708:
---

How does one hit this issue?

> Incorrect result caused by inconsistency between a SQL cache's cached RDD and 
> its physical plan
> ---
>
> Key: SPARK-26708
> URL: https://issues.apache.org/jira/browse/SPARK-26708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiao Li
>Assignee: Maryann Xue
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.1, 3.0.0
>
>
> When performing non-cascading cache invalidation, {{recache}} is called on 
> the other cache entries which are dependent on the cache being invalidated. 
> It leads to the the physical plans of those cache entries being re-compiled. 
> For those cache entries, if the cache RDD has already been persisted, chances 
> are there will be inconsistency between the data and the new plan. It can 
> cause a correctness issue if the new plan's {{outputPartitioning}} or 
> {{outputOrdering}} is different from the that of the actual data, and 
> meanwhile the cache is used by another query that asks for specific 
> {{outputPartitioning}} or {{outputOrdering}} which happens to match the new 
> plan but not the actual data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19468) Dataset slow because of unnecessary shuffles

2019-02-05 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761262#comment-16761262
 ] 

Mitesh commented on SPARK-19468:


Also curious why in the fix for SPARK-19931, it was only fixed for 
HashPartitioning instead of any kind of partitioning that extends Expression.

> Dataset slow because of unnecessary shuffles
> 
>
> Key: SPARK-19468
> URL: https://issues.apache.org/jira/browse/SPARK-19468
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: koert kuipers
>Priority: Major
>
> we noticed that some algos we ported from rdd to dataset are significantly 
> slower, and the main reason seems to be more shuffles that we successfully 
> avoid for rdds by careful partitioning. this seems to be dataset specific as 
> it works ok for dataframe.
> see also here:
> http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/
> it kind of boils down to this... if i partition and sort dataframes that get 
> used for joins repeatedly i can avoid shuffles:
> {noformat}
> System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")
> val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")
>   
> .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
> val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")
>   
> .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)
> val joined = df1.join(df2, col("key") === col("key2"))
> joined.explain
> == Physical Plan ==
> *SortMergeJoin [key#5], [key2#27], Inner
> :- InMemoryTableScan [key#5, value#6]
> : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 
> replicas)
> :   +- *Sort [key#5 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(key#5, 4)
> : +- LocalTableScan [key#5, value#6]
> +- InMemoryTableScan [key2#27, value2#28]
>   +- InMemoryRelation [key2#27, value2#28], true, 1, 
> StorageLevel(disk, 1 replicas)
> +- *Sort [key2#27 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(key2#27, 4)
>   +- LocalTableScan [key2#27, value2#28]
> {noformat}
> notice how the persisted dataframes are not shuffled or sorted anymore before 
> being used in the join. however if i try to do the same with dataset i have 
> no luck:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#105._1], [_2#106._1], Inner
> :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#105._1, 4)
> : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
> :+- InMemoryTableScan [_1#83, _2#84]
> :  +- InMemoryRelation [_1#83, _2#84], true, 1, 
> StorageLevel(disk, 1 replicas)
> :+- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :   +- Exchange hashpartitioning(_1#83, 4)
> :  +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(_2#106._1, 4)
>   +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
>  +- InMemoryTableScan [_1#100, _2#101]
>+- InMemoryRelation [_1#100, _2#101], true, 1, 
> StorageLevel(disk, 1 replicas)
>  +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_1#83, 4)
>+- LocalTableScan [_1#83, _2#84]
> {noformat}
> notice how my persisted Datasets are shuffled and sorted again. part of the 
> issue seems to be in joinWith, which does some preprocessing that seems to 
> confuse the planner. if i change the joinWith to join (which returns a 
> dataframe) it looks a little better in that only one side gets shuffled 
> again, but still not optimal:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#83], [_1#100], Inner
> :- InMemoryTableScan [_1#83, _2#84]
> : +- InMemoryRelation [_1#83, _2#84], true, 1, StorageLevel(disk, 1 
> replicas)
> :   +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :  +- Exchange 

[jira] [Comment Edited] (SPARK-19468) Dataset slow because of unnecessary shuffles

2019-02-05 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761207#comment-16761207
 ] 

Mitesh edited comment on SPARK-19468 at 2/5/19 8:59 PM:


+1 I'm seeing the same behavior. It seems like any physical operator that 
changes output (ProjectExec.output, 

SortAggregateExec.output, ...) needs the same fix from 
https://issues.apache.org/jira/browse/SPARK-19931, to ensure if the output is 
aliasing anything, that is reflected in the attributes inside the 
outputPartitioning/outputOrdering.


was (Author: masterddt):
+1 I'm seeing the same behavior. It seems like any physical operator that 
changes output (ProjectExec.output, 

SortAggregateExec.output, ...) needs the same fix from 
https://issues.apache.org/jira/browse/SPARK-19931, to ensure if the output is 
aliasing anything, that is reflected in the attributes inside the 
`outputPartitioning`/`outputOrdering`.

> Dataset slow because of unnecessary shuffles
> 
>
> Key: SPARK-19468
> URL: https://issues.apache.org/jira/browse/SPARK-19468
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: koert kuipers
>Priority: Major
>
> we noticed that some algos we ported from rdd to dataset are significantly 
> slower, and the main reason seems to be more shuffles that we successfully 
> avoid for rdds by careful partitioning. this seems to be dataset specific as 
> it works ok for dataframe.
> see also here:
> http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/
> it kind of boils down to this... if i partition and sort dataframes that get 
> used for joins repeatedly i can avoid shuffles:
> {noformat}
> System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")
> val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")
>   
> .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
> val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")
>   
> .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)
> val joined = df1.join(df2, col("key") === col("key2"))
> joined.explain
> == Physical Plan ==
> *SortMergeJoin [key#5], [key2#27], Inner
> :- InMemoryTableScan [key#5, value#6]
> : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 
> replicas)
> :   +- *Sort [key#5 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(key#5, 4)
> : +- LocalTableScan [key#5, value#6]
> +- InMemoryTableScan [key2#27, value2#28]
>   +- InMemoryRelation [key2#27, value2#28], true, 1, 
> StorageLevel(disk, 1 replicas)
> +- *Sort [key2#27 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(key2#27, 4)
>   +- LocalTableScan [key2#27, value2#28]
> {noformat}
> notice how the persisted dataframes are not shuffled or sorted anymore before 
> being used in the join. however if i try to do the same with dataset i have 
> no luck:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#105._1], [_2#106._1], Inner
> :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#105._1, 4)
> : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
> :+- InMemoryTableScan [_1#83, _2#84]
> :  +- InMemoryRelation [_1#83, _2#84], true, 1, 
> StorageLevel(disk, 1 replicas)
> :+- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :   +- Exchange hashpartitioning(_1#83, 4)
> :  +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(_2#106._1, 4)
>   +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
>  +- InMemoryTableScan [_1#100, _2#101]
>+- InMemoryRelation [_1#100, _2#101], true, 1, 
> StorageLevel(disk, 1 replicas)
>  +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_1#83, 4)
>+- LocalTableScan [_1#83, _2#84]
> {noformat}
> notice how my persisted Datasets are shuffled and sorted again. part of the 
> issue seems to be in joinWith, which does some preprocessing that seems to 
> confuse the planner. if i change the joinWith to join (which returns a 
> dataframe) it looks a little better in that only one side gets shuffled 
> again, but still not optimal:
> 

[jira] [Comment Edited] (SPARK-19468) Dataset slow because of unnecessary shuffles

2019-02-05 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761207#comment-16761207
 ] 

Mitesh edited comment on SPARK-19468 at 2/5/19 8:59 PM:


+1 I'm seeing the same behavior. It seems like any physical operator that 
changes output (ProjectExec.output, 

SortAggregateExec.output, ...) needs the same fix from SPARK-19931, to ensure 
if the output is aliasing anything, that is reflected in the attributes inside 
the outputPartitioning/outputOrdering.


was (Author: masterddt):
+1 I'm seeing the same behavior. It seems like any physical operator that 
changes output (ProjectExec.output, 

SortAggregateExec.output, ...) needs the same fix from 
https://issues.apache.org/jira/browse/SPARK-19931, to ensure if the output is 
aliasing anything, that is reflected in the attributes inside the 
outputPartitioning/outputOrdering.

> Dataset slow because of unnecessary shuffles
> 
>
> Key: SPARK-19468
> URL: https://issues.apache.org/jira/browse/SPARK-19468
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: koert kuipers
>Priority: Major
>
> we noticed that some algos we ported from rdd to dataset are significantly 
> slower, and the main reason seems to be more shuffles that we successfully 
> avoid for rdds by careful partitioning. this seems to be dataset specific as 
> it works ok for dataframe.
> see also here:
> http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/
> it kind of boils down to this... if i partition and sort dataframes that get 
> used for joins repeatedly i can avoid shuffles:
> {noformat}
> System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")
> val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")
>   
> .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
> val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")
>   
> .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)
> val joined = df1.join(df2, col("key") === col("key2"))
> joined.explain
> == Physical Plan ==
> *SortMergeJoin [key#5], [key2#27], Inner
> :- InMemoryTableScan [key#5, value#6]
> : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 
> replicas)
> :   +- *Sort [key#5 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(key#5, 4)
> : +- LocalTableScan [key#5, value#6]
> +- InMemoryTableScan [key2#27, value2#28]
>   +- InMemoryRelation [key2#27, value2#28], true, 1, 
> StorageLevel(disk, 1 replicas)
> +- *Sort [key2#27 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(key2#27, 4)
>   +- LocalTableScan [key2#27, value2#28]
> {noformat}
> notice how the persisted dataframes are not shuffled or sorted anymore before 
> being used in the join. however if i try to do the same with dataset i have 
> no luck:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#105._1], [_2#106._1], Inner
> :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#105._1, 4)
> : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
> :+- InMemoryTableScan [_1#83, _2#84]
> :  +- InMemoryRelation [_1#83, _2#84], true, 1, 
> StorageLevel(disk, 1 replicas)
> :+- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :   +- Exchange hashpartitioning(_1#83, 4)
> :  +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(_2#106._1, 4)
>   +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
>  +- InMemoryTableScan [_1#100, _2#101]
>+- InMemoryRelation [_1#100, _2#101], true, 1, 
> StorageLevel(disk, 1 replicas)
>  +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_1#83, 4)
>+- LocalTableScan [_1#83, _2#84]
> {noformat}
> notice how my persisted Datasets are shuffled and sorted again. part of the 
> issue seems to be in joinWith, which does some preprocessing that seems to 
> confuse the planner. if i change the joinWith to join (which returns a 
> dataframe) it looks a little better in that only one side gets shuffled 
> again, but still not optimal:
> {noformat}
> val ds1 = Seq((0, 0), (1, 

[jira] [Commented] (SPARK-19468) Dataset slow because of unnecessary shuffles

2019-02-05 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761207#comment-16761207
 ] 

Mitesh commented on SPARK-19468:


+1 I'm seeing the same behavior. It seems like any physical operator that 
changes output (ProjectExec.output, 

SortAggregateExec.output, ...) needs the same fix from 
https://issues.apache.org/jira/browse/SPARK-19931, to ensure if the output is 
aliasing anything, that is reflected in the attributes inside the 
`outputPartitioning`/`outputOrdering`.

> Dataset slow because of unnecessary shuffles
> 
>
> Key: SPARK-19468
> URL: https://issues.apache.org/jira/browse/SPARK-19468
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: koert kuipers
>Priority: Major
>
> we noticed that some algos we ported from rdd to dataset are significantly 
> slower, and the main reason seems to be more shuffles that we successfully 
> avoid for rdds by careful partitioning. this seems to be dataset specific as 
> it works ok for dataframe.
> see also here:
> http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/
> it kind of boils down to this... if i partition and sort dataframes that get 
> used for joins repeatedly i can avoid shuffles:
> {noformat}
> System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")
> val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")
>   
> .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
> val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")
>   
> .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)
> val joined = df1.join(df2, col("key") === col("key2"))
> joined.explain
> == Physical Plan ==
> *SortMergeJoin [key#5], [key2#27], Inner
> :- InMemoryTableScan [key#5, value#6]
> : +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk, 1 
> replicas)
> :   +- *Sort [key#5 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(key#5, 4)
> : +- LocalTableScan [key#5, value#6]
> +- InMemoryTableScan [key2#27, value2#28]
>   +- InMemoryRelation [key2#27, value2#28], true, 1, 
> StorageLevel(disk, 1 replicas)
> +- *Sort [key2#27 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(key2#27, 4)
>   +- LocalTableScan [key2#27, value2#28]
> {noformat}
> notice how the persisted dataframes are not shuffled or sorted anymore before 
> being used in the join. however if i try to do the same with dataset i have 
> no luck:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#105._1], [_2#106._1], Inner
> :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#105._1, 4)
> : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
> :+- InMemoryTableScan [_1#83, _2#84]
> :  +- InMemoryRelation [_1#83, _2#84], true, 1, 
> StorageLevel(disk, 1 replicas)
> :+- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :   +- Exchange hashpartitioning(_1#83, 4)
> :  +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(_2#106._1, 4)
>   +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
>  +- InMemoryTableScan [_1#100, _2#101]
>+- InMemoryRelation [_1#100, _2#101], true, 1, 
> StorageLevel(disk, 1 replicas)
>  +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_1#83, 4)
>+- LocalTableScan [_1#83, _2#84]
> {noformat}
> notice how my persisted Datasets are shuffled and sorted again. part of the 
> issue seems to be in joinWith, which does some preprocessing that seems to 
> confuse the planner. if i change the joinWith to join (which returns a 
> dataframe) it looks a little better in that only one side gets shuffled 
> again, but still not optimal:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#83], [_1#100], Inner
> :- 

[jira] [Assigned] (SPARK-26768) Remove useless code in BlockManager

2019-02-05 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen reassigned SPARK-26768:
-

Assignee: liupengcheng

> Remove useless code in BlockManager
> ---
>
> Key: SPARK-26768
> URL: https://issues.apache.org/jira/browse/SPARK-26768
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Assignee: liupengcheng
>Priority: Major
> Attachments: Selection_037.jpg
>
>
> Recently, when I was reading some code of `BlockManager.getBlockData`, I 
> found that there are useless code that would never reach. The related codes 
> is as below:
>  
> {code:java}
> override def getBlockData(blockId: BlockId): ManagedBuffer = {
>   if (blockId.isShuffle) {
> 
> shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
>   } else {
> getLocalBytes(blockId) match {
>   case Some(blockData) =>
> new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
> true)
>   case None =>
> // If this block manager receives a request for a block that it 
> doesn't have then it's
> // likely that the master has outdated block statuses for this block. 
> Therefore, we send
> // an RPC so that this block is marked as being unavailable from this 
> block manager.
> reportBlockStatus(blockId, BlockStatus.empty)
> throw new BlockNotFoundException(blockId.toString)
> }
>   }
> }
> {code}
> {code:java}
> def getLocalBytes(blockId: BlockId): Option[BlockData] = {
>   logDebug(s"Getting local block $blockId as bytes")
>   // As an optimization for map output fetches, if the block is for a 
> shuffle, return it
>   // without acquiring a lock; the disk store never deletes (recent) items so 
> this should work
>   if (blockId.isShuffle) {
> val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
> // TODO: This should gracefully handle case where local block is not 
> available. Currently
> // downstream code will throw an exception.
> val buf = new ChunkedByteBuffer(
>   
> shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
> Some(new ByteBufferBlockData(buf, true))
>   } else {
> blockInfoManager.lockForReading(blockId).map { info => 
> doGetLocalBytes(blockId, info) }
>   }
> }
> {code}
> the `blockId.isShuffle` is checked twice, but however it seems that in the 
> method calling hierarchy of `BlockManager.getLocalBytes`, the another 
> callsite of the `BlockManager.getLocalBytes` is at 
> `TorrentBroadcast.readBlocks` where the blockId can never be a 
> `ShuffleBlockId`.
>   !Selection_037.jpg!
> So I think we should remove these useless code for easy reading.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26768) Remove useless code in BlockManager

2019-02-05 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-26768.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23693
[https://github.com/apache/spark/pull/23693]

> Remove useless code in BlockManager
> ---
>
> Key: SPARK-26768
> URL: https://issues.apache.org/jira/browse/SPARK-26768
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Assignee: liupengcheng
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: Selection_037.jpg
>
>
> Recently, when I was reading some code of `BlockManager.getBlockData`, I 
> found that there are useless code that would never reach. The related codes 
> is as below:
>  
> {code:java}
> override def getBlockData(blockId: BlockId): ManagedBuffer = {
>   if (blockId.isShuffle) {
> 
> shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
>   } else {
> getLocalBytes(blockId) match {
>   case Some(blockData) =>
> new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
> true)
>   case None =>
> // If this block manager receives a request for a block that it 
> doesn't have then it's
> // likely that the master has outdated block statuses for this block. 
> Therefore, we send
> // an RPC so that this block is marked as being unavailable from this 
> block manager.
> reportBlockStatus(blockId, BlockStatus.empty)
> throw new BlockNotFoundException(blockId.toString)
> }
>   }
> }
> {code}
> {code:java}
> def getLocalBytes(blockId: BlockId): Option[BlockData] = {
>   logDebug(s"Getting local block $blockId as bytes")
>   // As an optimization for map output fetches, if the block is for a 
> shuffle, return it
>   // without acquiring a lock; the disk store never deletes (recent) items so 
> this should work
>   if (blockId.isShuffle) {
> val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
> // TODO: This should gracefully handle case where local block is not 
> available. Currently
> // downstream code will throw an exception.
> val buf = new ChunkedByteBuffer(
>   
> shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
> Some(new ByteBufferBlockData(buf, true))
>   } else {
> blockInfoManager.lockForReading(blockId).map { info => 
> doGetLocalBytes(blockId, info) }
>   }
> }
> {code}
> the `blockId.isShuffle` is checked twice, but however it seems that in the 
> method calling hierarchy of `BlockManager.getLocalBytes`, the another 
> callsite of the `BlockManager.getLocalBytes` is at 
> `TorrentBroadcast.readBlocks` where the blockId can never be a 
> `ShuffleBlockId`.
>   !Selection_037.jpg!
> So I think we should remove these useless code for easy reading.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25692) Flaky test: ChunkFetchIntegrationSuite.fetchBothChunks

2019-02-05 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-25692.
---
Resolution: Fixed

Issue resolved by pull request 23700
[https://github.com/apache/spark/pull/23700]

> Flaky test: ChunkFetchIntegrationSuite.fetchBothChunks
> --
>
> Key: SPARK-25692
> URL: https://issues.apache.org/jira/browse/SPARK-25692
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Shixiong Zhu
>Assignee: Sanket Reddy
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: Screen Shot 2018-10-22 at 4.12.41 PM.png, Screen Shot 
> 2018-11-01 at 10.17.16 AM.png
>
>
> Looks like the whole test suite is pretty flaky. See: 
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.6/5490/testReport/junit/org.apache.spark.network/ChunkFetchIntegrationSuite/history/
> This may be a regression in 3.0 as this didn't happen in 2.4 branch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25692) Flaky test: ChunkFetchIntegrationSuite.fetchBothChunks

2019-02-05 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen reassigned SPARK-25692:
-

Assignee: Sanket Reddy

> Flaky test: ChunkFetchIntegrationSuite.fetchBothChunks
> --
>
> Key: SPARK-25692
> URL: https://issues.apache.org/jira/browse/SPARK-25692
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Shixiong Zhu
>Assignee: Sanket Reddy
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: Screen Shot 2018-10-22 at 4.12.41 PM.png, Screen Shot 
> 2018-11-01 at 10.17.16 AM.png
>
>
> Looks like the whole test suite is pretty flaky. See: 
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.6/5490/testReport/junit/org.apache.spark.network/ChunkFetchIntegrationSuite/history/
> This may be a regression in 3.0 as this didn't happen in 2.4 branch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26821) filters not working with char datatype when querying against hive table

2019-02-05 Thread Sujith (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761086#comment-16761086
 ] 

Sujith edited comment on SPARK-26821 at 2/5/19 6:27 PM:


Yeah with spaces it will work fine,.shall we document this behavior? ,  will 
try to check the behavior in couple of other systems also.


was (Author: s71955):
Yeah with spaces it will work fine, will try to check the behavior in couple of 
other systems also. 

> filters not working with char datatype when querying against hive table
> ---
>
> Key: SPARK-26821
> URL: https://issues.apache.org/jira/browse/SPARK-26821
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Sujith
>Priority: Major
>
> creates a table with a char type field, While inserting data to char data 
> type column, if the data string length is less than the specified datatype 
> length, spark2x will not process filter query properly leading to incorrect 
> result .
> 0: jdbc:hive2://10.19.89.222:22550/default> create table jj(id int, name 
> char(5));
>  +--+-+
> |Result|
> +--+-+
>  +--+-+
>  No rows selected (0.894 seconds)
>  0: jdbc:hive2://10.19.89.222:22550/default> insert into table jj 
> values(232,'ds');
>  +--+-+
> |Result|
> +--+-+
>  +--+-+
>  No rows selected (1.815 seconds)
>  0: jdbc:hive2://10.19.89.222:22550/default> select * from jj where name='ds';
>  +--+--++--
> |id|name|
> +--+--++--
>  +--+--++--
>  
> The above query will not give any result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26821) filters not working with char datatype when querying against hive table

2019-02-05 Thread Sujith (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761086#comment-16761086
 ] 

Sujith commented on SPARK-26821:


Yeah with spaces it will work fine, will try to check the behavior in couple of 
other systems also. 

> filters not working with char datatype when querying against hive table
> ---
>
> Key: SPARK-26821
> URL: https://issues.apache.org/jira/browse/SPARK-26821
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Sujith
>Priority: Major
>
> creates a table with a char type field, While inserting data to char data 
> type column, if the data string length is less than the specified datatype 
> length, spark2x will not process filter query properly leading to incorrect 
> result .
> 0: jdbc:hive2://10.19.89.222:22550/default> create table jj(id int, name 
> char(5));
>  +--+-+
> |Result|
> +--+-+
>  +--+-+
>  No rows selected (0.894 seconds)
>  0: jdbc:hive2://10.19.89.222:22550/default> insert into table jj 
> values(232,'ds');
>  +--+-+
> |Result|
> +--+-+
>  +--+-+
>  No rows selected (1.815 seconds)
>  0: jdbc:hive2://10.19.89.222:22550/default> select * from jj where name='ds';
>  +--+--++--
> |id|name|
> +--+--++--
>  +--+--++--
>  
> The above query will not give any result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26829) In place standard scaler so the column remains same after transformation

2019-02-05 Thread Santokh Singh (JIRA)
Santokh Singh created SPARK-26829:
-

 Summary: In place standard scaler so the column remains same after 
transformation
 Key: SPARK-26829
 URL: https://issues.apache.org/jira/browse/SPARK-26829
 Project: Spark
  Issue Type: Improvement
  Components: ML
Affects Versions: 2.3.2
Reporter: Santokh Singh






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26827) Support importing python modules having shared objects(.so)

2019-02-05 Thread Dhruve Ashar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761072#comment-16761072
 ] 

Dhruve Ashar edited comment on SPARK-26827 at 2/5/19 6:08 PM:
--

Resolution : Pass the same archive with py-files and archives option.


was (Author: dhruve ashar):
Pass the same archive with py-files and archives option.

> Support importing python modules having shared objects(.so)
> ---
>
> Key: SPARK-26827
> URL: https://issues.apache.org/jira/browse/SPARK-26827
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Dhruve Ashar
>Priority: Major
>
> If a user wants to import dynamic modules, specifically having .so files, 
> this is currently disallowed by python from a zip file. 
> ([https://docs.python.org/3/library/zipimport.html)] and currently spark 
> doesn't support this either. 
> Files which are passed using py-files options are placed on the PYTHONPATH, 
> but are not extracted. While files which are passed as archives, are 
> extracted but not placed on the PYTHONPATH. The dynamic modules can be loaded 
> if they are extracted and added to the PYTHONPATH.
>  
> Has anyone encountered this issue before and what is the best way to go about 
> it?
>  
> Some possible solutions:
> 1 - Get around this issue, by passing the archive with py-files and archives 
> option, this extracts the archive as well as adds it to the path. Gotcha - 
> both have to be named the same. I have tested this and it works, but its just 
> a workaround.
> 2 - We add a new config like py-archives which takes all the files and 
> extracts them and also adds them to the PYTHONPATH. Or just examine the 
> contents of the zip file and if it has dynamic modules then do the same. I am 
> happy to work on the fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26828) Coalesce to reduce partitions before writing to hive is not working

2019-02-05 Thread Anusha Buchireddygari (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anusha Buchireddygari updated SPARK-26828:
--
Priority: Minor  (was: Major)

> Coalesce to reduce partitions before writing to hive is not working
> ---
>
> Key: SPARK-26828
> URL: https://issues.apache.org/jira/browse/SPARK-26828
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Anusha Buchireddygari
>Priority: Minor
>
> final_store.coalesce(5).write.mode("overwrite").insertInto("database.tablename",overwrite
>  = True), this statement is not merging partitions. I've set 
> .config("spark.default.parallelism", "2000") \
> .config("spark.sql.shuffle.partitions", "2000") \
> however repartition is working but takes 20-25 minutes to insert.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26827) Support importing python modules having shared objects(.so)

2019-02-05 Thread Dhruve Ashar (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruve Ashar resolved SPARK-26827.
--
Resolution: Workaround

Pass the same archive with py-files and archives option.

> Support importing python modules having shared objects(.so)
> ---
>
> Key: SPARK-26827
> URL: https://issues.apache.org/jira/browse/SPARK-26827
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Dhruve Ashar
>Priority: Major
>
> If a user wants to import dynamic modules, specifically having .so files, 
> this is currently disallowed by python from a zip file. 
> ([https://docs.python.org/3/library/zipimport.html)] and currently spark 
> doesn't support this either. 
> Files which are passed using py-files options are placed on the PYTHONPATH, 
> but are not extracted. While files which are passed as archives, are 
> extracted but not placed on the PYTHONPATH. The dynamic modules can be loaded 
> if they are extracted and added to the PYTHONPATH.
>  
> Has anyone encountered this issue before and what is the best way to go about 
> it?
>  
> Some possible solutions:
> 1 - Get around this issue, by passing the archive with py-files and archives 
> option, this extracts the archive as well as adds it to the path. Gotcha - 
> both have to be named the same. I have tested this and it works, but its just 
> a workaround.
> 2 - We add a new config like py-archives which takes all the files and 
> extracts them and also adds them to the PYTHONPATH. Or just examine the 
> contents of the zip file and if it has dynamic modules then do the same. I am 
> happy to work on the fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26827) Support importing python modules having shared objects(.so)

2019-02-05 Thread Dhruve Ashar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761069#comment-16761069
 ] 

Dhruve Ashar commented on SPARK-26827:
--

Thanks for the response [~irashid] and [~hyukjin.kwon]. Will close this one out.

> Support importing python modules having shared objects(.so)
> ---
>
> Key: SPARK-26827
> URL: https://issues.apache.org/jira/browse/SPARK-26827
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Dhruve Ashar
>Priority: Major
>
> If a user wants to import dynamic modules, specifically having .so files, 
> this is currently disallowed by python from a zip file. 
> ([https://docs.python.org/3/library/zipimport.html)] and currently spark 
> doesn't support this either. 
> Files which are passed using py-files options are placed on the PYTHONPATH, 
> but are not extracted. While files which are passed as archives, are 
> extracted but not placed on the PYTHONPATH. The dynamic modules can be loaded 
> if they are extracted and added to the PYTHONPATH.
>  
> Has anyone encountered this issue before and what is the best way to go about 
> it?
>  
> Some possible solutions:
> 1 - Get around this issue, by passing the archive with py-files and archives 
> option, this extracts the archive as well as adds it to the path. Gotcha - 
> both have to be named the same. I have tested this and it works, but its just 
> a workaround.
> 2 - We add a new config like py-archives which takes all the files and 
> extracts them and also adds them to the PYTHONPATH. Or just examine the 
> contents of the zip file and if it has dynamic modules then do the same. I am 
> happy to work on the fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26828) Coalesce to reduce partitions before writing to hive is not working

2019-02-05 Thread Anusha Buchireddygari (JIRA)
Anusha Buchireddygari created SPARK-26828:
-

 Summary: Coalesce to reduce partitions before writing to hive is 
not working
 Key: SPARK-26828
 URL: https://issues.apache.org/jira/browse/SPARK-26828
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Anusha Buchireddygari


final_store.coalesce(5).write.mode("overwrite").insertInto("database.tablename",overwrite
 = True), this statement is not merging partitions. I've set 

.config("spark.default.parallelism", "2000") \
.config("spark.sql.shuffle.partitions", "2000") \

however repartition is working but takes 20-25 minutes to insert.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26827) Support importing python modules having shared objects(.so)

2019-02-05 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761050#comment-16761050
 ] 

Hyukjin Kwon commented on SPARK-26827:
--

The workaround sounds pretty okay as is. 

> Support importing python modules having shared objects(.so)
> ---
>
> Key: SPARK-26827
> URL: https://issues.apache.org/jira/browse/SPARK-26827
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Dhruve Ashar
>Priority: Major
>
> If a user wants to import dynamic modules, specifically having .so files, 
> this is currently disallowed by python from a zip file. 
> ([https://docs.python.org/3/library/zipimport.html)] and currently spark 
> doesn't support this either. 
> Files which are passed using py-files options are placed on the PYTHONPATH, 
> but are not extracted. While files which are passed as archives, are 
> extracted but not placed on the PYTHONPATH. The dynamic modules can be loaded 
> if they are extracted and added to the PYTHONPATH.
>  
> Has anyone encountered this issue before and what is the best way to go about 
> it?
>  
> Some possible solutions:
> 1 - Get around this issue, by passing the archive with py-files and archives 
> option, this extracts the archive as well as adds it to the path. Gotcha - 
> both have to be named the same. I have tested this and it works, but its just 
> a workaround.
> 2 - We add a new config like py-archives which takes all the files and 
> extracts them and also adds them to the PYTHONPATH. Or just examine the 
> contents of the zip file and if it has dynamic modules then do the same. I am 
> happy to work on the fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26827) Support importing python modules having shared objects(.so)

2019-02-05 Thread Dhruve Ashar (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761034#comment-16761034
 ] 

Dhruve Ashar commented on SPARK-26827:
--

[~holden.ka...@gmail.com] , [~irashid] any thoughts on this one?

> Support importing python modules having shared objects(.so)
> ---
>
> Key: SPARK-26827
> URL: https://issues.apache.org/jira/browse/SPARK-26827
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Dhruve Ashar
>Priority: Major
>
> If a user wants to import dynamic modules, specifically having .so files, 
> this is currently disallowed by python from a zip file. 
> ([https://docs.python.org/3/library/zipimport.html)] and currently spark 
> doesn't support this either. 
> Files which are passed using py-files options are placed on the PYTHONPATH, 
> but are not extracted. While files which are passed as archives, are 
> extracted but not placed on the PYTHONPATH. The dynamic modules can be loaded 
> if they are extracted and added to the PYTHONPATH.
>  
> Has anyone encountered this issue before and what is the best way to go about 
> it?
>  
> Some possible solutions:
> 1 - Get around this issue, by passing the archive with py-files and archives 
> option, this extracts the archive as well as adds it to the path. Gotcha - 
> both have to be named the same. I have tested this and it works, but its just 
> a workaround.
> 2 - We add a new config like py-archives which takes all the files and 
> extracts them and also adds them to the PYTHONPATH. Or just examine the 
> contents of the zip file and if it has dynamic modules then do the same. I am 
> happy to work on the fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26827) Support importing python modules having shared objects(.so)

2019-02-05 Thread Imran Rashid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761039#comment-16761039
 ] 

Imran Rashid commented on SPARK-26827:
--

I don't know enough about python use cases to have an opinion here, sorry, but 
maybe [~hyukjin.kwon] or [~bryanc] do

> Support importing python modules having shared objects(.so)
> ---
>
> Key: SPARK-26827
> URL: https://issues.apache.org/jira/browse/SPARK-26827
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Dhruve Ashar
>Priority: Major
>
> If a user wants to import dynamic modules, specifically having .so files, 
> this is currently disallowed by python from a zip file. 
> ([https://docs.python.org/3/library/zipimport.html)] and currently spark 
> doesn't support this either. 
> Files which are passed using py-files options are placed on the PYTHONPATH, 
> but are not extracted. While files which are passed as archives, are 
> extracted but not placed on the PYTHONPATH. The dynamic modules can be loaded 
> if they are extracted and added to the PYTHONPATH.
>  
> Has anyone encountered this issue before and what is the best way to go about 
> it?
>  
> Some possible solutions:
> 1 - Get around this issue, by passing the archive with py-files and archives 
> option, this extracts the archive as well as adds it to the path. Gotcha - 
> both have to be named the same. I have tested this and it works, but its just 
> a workaround.
> 2 - We add a new config like py-archives which takes all the files and 
> extracts them and also adds them to the PYTHONPATH. Or just examine the 
> contents of the zip file and if it has dynamic modules then do the same. I am 
> happy to work on the fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26827) Support importing python modules having shared objects(.so)

2019-02-05 Thread Dhruve Ashar (JIRA)
Dhruve Ashar created SPARK-26827:


 Summary: Support importing python modules having shared 
objects(.so)
 Key: SPARK-26827
 URL: https://issues.apache.org/jira/browse/SPARK-26827
 Project: Spark
  Issue Type: New Feature
  Components: PySpark
Affects Versions: 2.4.0, 2.3.2
Reporter: Dhruve Ashar


If a user wants to import dynamic modules, specifically having .so files, this 
is currently disallowed by python from a zip file. 
([https://docs.python.org/3/library/zipimport.html)] and currently spark 
doesn't support this either. 

Files which are passed using py-files options are placed on the PYTHONPATH, but 
are not extracted. While files which are passed as archives, are extracted but 
not placed on the PYTHONPATH. The dynamic modules can be loaded if they are 
extracted and added to the PYTHONPATH.

 

Has anyone encountered this issue before and what is the best way to go about 
it?

 

Some possible solutions:

1 - Get around this issue, by passing the archive with py-files and archives 
option, this extracts the archive as well as adds it to the path. Gotcha - both 
have to be named the same. I have tested this and it works, but its just a 
workaround.

2 - We add a new config like py-archives which takes all the files and extracts 
them and also adds them to the PYTHONPATH. Or just examine the contents of the 
zip file and if it has dynamic modules then do the same. I am happy to work on 
the fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26826) Array indexing functions array_allpositions and array_select

2019-02-05 Thread Petar Zecevic (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Petar Zecevic updated SPARK-26826:
--
Description: 
This ticket proposes two extra array functions: {{array_allpositions}} (named 
after {{array_position}}) and {{array_select}}. These functions should make it 
easier to:
* get an array of indices of all occurences of a value in an array 
({{array_allpositions}})
* select all elements of an array based on an array of indices 
({{array_select}})

Although higher-order functions, such as {{aggregate}} and {{transform}}, have 
been recently added, performing tasks above is still not simple, hence this 
addition.

  was:
This ticket proposes two extra array functions: `array_allpositions` (named 
after `array_position`) and `array_select`. These functions should make it 
easier to:
* get an array of indices of all occurences of a value in an array 
(`array_allpositions`)
* select all elements of an array based on an array of indices (`array_select`)

Although higher-order functions, such as `aggregate` and `transform`, have been 
recently added, performing tasks above is still not simple, hence this addition.


> Array indexing functions array_allpositions and array_select
> 
>
> Key: SPARK-26826
> URL: https://issues.apache.org/jira/browse/SPARK-26826
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Petar Zecevic
>Priority: Major
>
> This ticket proposes two extra array functions: {{array_allpositions}} (named 
> after {{array_position}}) and {{array_select}}. These functions should make 
> it easier to:
> * get an array of indices of all occurences of a value in an array 
> ({{array_allpositions}})
> * select all elements of an array based on an array of indices 
> ({{array_select}})
> Although higher-order functions, such as {{aggregate}} and {{transform}}, 
> have been recently added, performing tasks above is still not simple, hence 
> this addition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26826) Array indexing functions array_allpositions and array_select

2019-02-05 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26826:


Assignee: Apache Spark

> Array indexing functions array_allpositions and array_select
> 
>
> Key: SPARK-26826
> URL: https://issues.apache.org/jira/browse/SPARK-26826
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Petar Zecevic
>Assignee: Apache Spark
>Priority: Major
>
> This ticket proposes two extra array functions: `array_allpositions` (named 
> after `array_position`) and `array_select`. These functions should make it 
> easier to:
> * get an array of indices of all occurences of a value in an array 
> (`array_allpositions`)
> * select all elements of an array based on an array of indices 
> (`array_select`)
> Although higher-order functions, such as `aggregate` and `transform`, have 
> been recently added, performing tasks above is still not simple, hence this 
> addition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26826) Array indexing functions array_allpositions and array_select

2019-02-05 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26826:


Assignee: (was: Apache Spark)

> Array indexing functions array_allpositions and array_select
> 
>
> Key: SPARK-26826
> URL: https://issues.apache.org/jira/browse/SPARK-26826
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Petar Zecevic
>Priority: Major
>
> This ticket proposes two extra array functions: `array_allpositions` (named 
> after `array_position`) and `array_select`. These functions should make it 
> easier to:
> * get an array of indices of all occurences of a value in an array 
> (`array_allpositions`)
> * select all elements of an array based on an array of indices 
> (`array_select`)
> Although higher-order functions, such as `aggregate` and `transform`, have 
> been recently added, performing tasks above is still not simple, hence this 
> addition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26826) Array indexing functions array_allpositions and array_select

2019-02-05 Thread Petar Zecevic (JIRA)
Petar Zecevic created SPARK-26826:
-

 Summary: Array indexing functions array_allpositions and 
array_select
 Key: SPARK-26826
 URL: https://issues.apache.org/jira/browse/SPARK-26826
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.4.0
Reporter: Petar Zecevic


This ticket proposes two extra array functions: `array_allpositions` (named 
after `array_position`) and `array_select`. These functions should make it 
easier to:
* get an array of indices of all occurences of a value in an array 
(`array_allpositions`)
* select all elements of an array based on an array of indices (`array_select`)

Although higher-order functions, such as `aggregate` and `transform`, have been 
recently added, performing tasks above is still not simple, hence this addition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

2019-02-05 Thread Andre Araujo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760901#comment-16760901
 ] 

Andre Araujo commented on SPARK-26825:
--

Thanks a lot, [~gsomogyi] 

> Spark Structure Streaming job failing when submitted in cluster mode
> 
>
> Key: SPARK-26825
> URL: https://issues.apache.org/jira/browse/SPARK-26825
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Andre Araujo
>Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> val checkpointPath = new Path(checkpointRoot)
> checkpointPath: org.apache.hadoop.fs.Path = 
> 

[jira] [Commented] (SPARK-26821) filters not working with char datatype when querying against hive table

2019-02-05 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760897#comment-16760897
 ] 

Sean Owen commented on SPARK-26821:
---

Can you find the result when filtering for "ds   " with spaces? 
I find that result odd from MySQL but unless there's harder evidence that this 
is correct behavior (e.g. ANSI SQL) or behavior we want to match (Hive) I don't 
think this is incorrect.

> filters not working with char datatype when querying against hive table
> ---
>
> Key: SPARK-26821
> URL: https://issues.apache.org/jira/browse/SPARK-26821
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Sujith
>Priority: Major
>
> creates a table with a char type field, While inserting data to char data 
> type column, if the data string length is less than the specified datatype 
> length, spark2x will not process filter query properly leading to incorrect 
> result .
> 0: jdbc:hive2://10.19.89.222:22550/default> create table jj(id int, name 
> char(5));
>  +--+-+
> |Result|
> +--+-+
>  +--+-+
>  No rows selected (0.894 seconds)
>  0: jdbc:hive2://10.19.89.222:22550/default> insert into table jj 
> values(232,'ds');
>  +--+-+
> |Result|
> +--+-+
>  +--+-+
>  No rows selected (1.815 seconds)
>  0: jdbc:hive2://10.19.89.222:22550/default> select * from jj where name='ds';
>  +--+--++--
> |id|name|
> +--+--++--
>  +--+--++--
>  
> The above query will not give any result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26819) ArrayIndexOutOfBoundsException while loading a CSV to a Dataset with dependencies spark-core_2.12 and spark-sql_2.12 (with spark-core_2.11 and spark-sql_2.11 : wo

2019-02-05 Thread M. Le Bihan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760799#comment-16760799
 ] 

M. Le Bihan edited comment on SPARK-26819 at 2/5/19 1:30 PM:
-

You made me search a lot...

 

In this stacktrace part that ends with the bug, classical jacksson 
"com.fasterxml.jackson.databind.introspect" is left to enter mostly methods of 
"com.fasterxml.jackson.module.scala.introspect.BeanIntrospector" and from 
"scala.collection" package :

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 10582
{{ at 
com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.accept(BytecodeReadingParanamer.java:563)}}
{{ at 
com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.access$200(BytecodeReadingParanamer.java:338)}}
{{ at 
com.thoughtworks.paranamer.BytecodeReadingParanamer.lookupParameterNames(BytecodeReadingParanamer.java:103)}}
{{ at 
com.thoughtworks.paranamer.CachingParanamer.lookupParameterNames(CachingParanamer.java:90)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.getCtorParams(BeanIntrospector.scala:44)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1(BeanIntrospector.scala:58)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1$adapted(BeanIntrospector.scala:58)}}
{{ at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)}}
{{ at scala.collection.Iterator.foreach(Iterator.scala:937)}}
{{ at scala.collection.Iterator.foreach$(Iterator.scala:937)}}
{{ at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)}}
{{ at scala.collection.IterableLike.foreach(IterableLike.scala:70)}}
{{ at scala.collection.IterableLike.foreach$(IterableLike.scala:69)}}
{{ at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}
{{ at scala.collection.TraversableLike.flatMap(TraversableLike.scala:240)}}
{{ at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:237)}}
{{ at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.findConstructorParam$1(BeanIntrospector.scala:58)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$19(BeanIntrospector.scala:176)}}
{{ at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)}}
{{ at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)}}
{{ at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)}}
{{ at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194)}}
{{ at scala.collection.TraversableLike.map(TraversableLike.scala:233)}}
{{ at scala.collection.TraversableLike.map$(TraversableLike.scala:226)}}
{{ at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:194)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14(BeanIntrospector.scala:170)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14$adapted(BeanIntrospector.scala:169)}}
{{ at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)}}
{{ at scala.collection.immutable.List.foreach(List.scala:388)}}
{{ at scala.collection.TraversableLike.flatMap(TraversableLike.scala:240)}}
{{ at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:237)}}
{{ at scala.collection.immutable.List.flatMap(List.scala:351)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.apply(BeanIntrospector.scala:169)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$._descriptorFor(ScalaAnnotationIntrospectorModule.scala:21)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.fieldName(ScalaAnnotationIntrospectorModule.scala:29)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.findImplicitPropertyName(ScalaAnnotationIntrospectorModule.scala:77)}}
{{ at
 
com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findImplicitPropertyName(AnnotationIntrospectorPair.java:490)}}
 
{code}
 

_BeanInstrospector.java_ seems to be the same in 
[2.9.7:2_11|https://jar-download.com/artifacts/com.fasterxml.jackson.module/jackson-module-scala_2.11/2.9.7/source-code/com/fasterxml/jackson/module/scala/introspect/BeanIntrospector.scala]
 and 
[2.9.7:2_12|https://jar-download.com/artifacts/com.fasterxml.jackson.module/jackson-module-scala_2.12/2.9.7/source-code/com/fasterxml/jackson/module/scala/introspect/BeanIntrospector.scala]

[com.thoughtworks.paranamer|https://mvnrepository.com/artifact/com.thoughtworks.paranamer/paranamer]
 seems to be stable, in 2.8, and didn't change since year 2015, and that 
version 2.8 was taken into account by Spark 2.3.0 in 2017 : 
[https://jira.apache.org/jira/browse/SPARK-22128]

I'd rather put the 

[jira] [Commented] (SPARK-26819) ArrayIndexOutOfBoundsException while loading a CSV to a Dataset with dependencies spark-core_2.12 and spark-sql_2.12 (with spark-core_2.11 and spark-sql_2.11 : working

2019-02-05 Thread M. Le Bihan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760799#comment-16760799
 ] 

M. Le Bihan commented on SPARK-26819:
-

You made me search a lot...

 

In this stacktrace part that ends with the bug, classical jacksson 
"com.fasterxml.jackson.databind.introspect" is left to enter mostly methods of 
"com.fasterxml.jackson.module.scala.introspect.BeanIntrospector" and from 
"scala.collection" package :

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 10582
{{ at 
com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.accept(BytecodeReadingParanamer.java:563)}}
{{ at 
com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.access$200(BytecodeReadingParanamer.java:338)}}
{{ at 
com.thoughtworks.paranamer.BytecodeReadingParanamer.lookupParameterNames(BytecodeReadingParanamer.java:103)}}
{{ at 
com.thoughtworks.paranamer.CachingParanamer.lookupParameterNames(CachingParanamer.java:90)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.getCtorParams(BeanIntrospector.scala:44)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1(BeanIntrospector.scala:58)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1$adapted(BeanIntrospector.scala:58)}}
{{ at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)}}
{{ at scala.collection.Iterator.foreach(Iterator.scala:937)}}
{{ at scala.collection.Iterator.foreach$(Iterator.scala:937)}}
{{ at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)}}
{{ at scala.collection.IterableLike.foreach(IterableLike.scala:70)}}
{{ at scala.collection.IterableLike.foreach$(IterableLike.scala:69)}}
{{ at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}
{{ at scala.collection.TraversableLike.flatMap(TraversableLike.scala:240)}}
{{ at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:237)}}
{{ at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.findConstructorParam$1(BeanIntrospector.scala:58)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$19(BeanIntrospector.scala:176)}}
{{ at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)}}
{{ at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)}}
{{ at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)}}
{{ at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194)}}
{{ at scala.collection.TraversableLike.map(TraversableLike.scala:233)}}
{{ at scala.collection.TraversableLike.map$(TraversableLike.scala:226)}}
{{ at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:194)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14(BeanIntrospector.scala:170)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14$adapted(BeanIntrospector.scala:169)}}
{{ at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)}}
{{ at scala.collection.immutable.List.foreach(List.scala:388)}}
{{ at scala.collection.TraversableLike.flatMap(TraversableLike.scala:240)}}
{{ at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:237)}}
{{ at scala.collection.immutable.List.flatMap(List.scala:351)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.apply(BeanIntrospector.scala:169)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$._descriptorFor(ScalaAnnotationIntrospectorModule.scala:21)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.fieldName(ScalaAnnotationIntrospectorModule.scala:29)}}
{{ at 
com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.findImplicitPropertyName(ScalaAnnotationIntrospectorModule.scala:77)}}
{{ at
 
com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findImplicitPropertyName(AnnotationIntrospectorPair.java:490)}}
 
{code}
 

_BeanInstrospector.java_ seems to be the same in 
[2.9.7:2_11|https://jar-download.com/artifacts/com.fasterxml.jackson.module/jackson-module-scala_2.11/2.9.7/source-code/com/fasterxml/jackson/module/scala/introspect/BeanIntrospector.scala]
 and 
[2.9.7:2_12|https://jar-download.com/artifacts/com.fasterxml.jackson.module/jackson-module-scala_2.12/2.9.7/source-code/com/fasterxml/jackson/module/scala/introspect/BeanIntrospector.scala]

[com.thoughtworks.paranamer|https://mvnrepository.com/artifact/com.thoughtworks.paranamer/paranamer]
 seems to be stable, in 2.8, and didn't change since year 2015, and that 
version 2.8 was taken into account by Spark 2.3.0 in 2017 : 
[https://jira.apache.org/jira/browse/SPARK-22128]

I'd rather put the source of the problem inside "scala" packages.

[jira] [Commented] (SPARK-24284) java.util.NoSuchElementException in Spark Streaming with Kafka

2019-02-05 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760769#comment-16760769
 ] 

Gabor Somogyi commented on SPARK-24284:
---

[~ujjalsatpa...@gmail.com] On 1.6.3 CachedKafkaConsumer doesn't exist so the 
stacktrace doesn't match with the code. So what is the version exactly?
A little more info would be beneficial, like driver + executor logs etc.

> java.util.NoSuchElementException in Spark Streaming with Kafka
> --
>
> Key: SPARK-24284
> URL: https://issues.apache.org/jira/browse/SPARK-24284
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 1.6.3
> Environment: Spark Streaming 1.6.3
>  
>Reporter: Ujjal Satpathy
>Priority: Major
>
> Hi,
> I am getting below error while running Spark streaming with Kafka. Though the 
> issue is not consistent but causing many of the batches failed.
> Job aborted due to stage failure: Task 85 in stage 5914.0 failed 4 times, 
> most recent failure: Lost task 85.3 in stage 5914.0 : 
> java.util.NoSuchElementException
>  at java.util.ArrayDeque.getLast(ArrayDeque.java:328)
>  at 
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.(MemoryRecords.java:275)
>  at 
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:318)
>  at 
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:223)
>  at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
>  at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:545)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
>  at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:98)
>  at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:72)
>  at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>  at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1196)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1195)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1195)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1277)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1203)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1183)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

2019-02-05 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760777#comment-16760777
 ] 

Gabor Somogyi commented on SPARK-26825:
---

There is another PR from me which modifies this part but after it merged I'm 
intended to solve this as well.

> Spark Structure Streaming job failing when submitted in cluster mode
> 
>
> Key: SPARK-26825
> URL: https://issues.apache.org/jira/browse/SPARK-26825
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Andre Araujo
>Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> val checkpointPath = new Path(checkpointRoot)
> 

[jira] [Commented] (SPARK-24284) java.util.NoSuchElementException in Spark Streaming with Kafka

2019-02-05 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760775#comment-16760775
 ] 

Gabor Somogyi commented on SPARK-24284:
---

This code part has been rewritten in 2.4. Is it possible to retest with it?


> java.util.NoSuchElementException in Spark Streaming with Kafka
> --
>
> Key: SPARK-24284
> URL: https://issues.apache.org/jira/browse/SPARK-24284
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 1.6.3
> Environment: Spark Streaming 1.6.3
>  
>Reporter: Ujjal Satpathy
>Priority: Major
>
> Hi,
> I am getting below error while running Spark streaming with Kafka. Though the 
> issue is not consistent but causing many of the batches failed.
> Job aborted due to stage failure: Task 85 in stage 5914.0 failed 4 times, 
> most recent failure: Lost task 85.3 in stage 5914.0 : 
> java.util.NoSuchElementException
>  at java.util.ArrayDeque.getLast(ArrayDeque.java:328)
>  at 
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.(MemoryRecords.java:275)
>  at 
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:318)
>  at 
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:223)
>  at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
>  at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:545)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
>  at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:98)
>  at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:72)
>  at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>  at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1196)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1195)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1195)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1277)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1203)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1183)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

2019-02-05 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760702#comment-16760702
 ] 

Gabor Somogyi commented on SPARK-26825:
---

[~asdaraujo] excellent analysis! One minor correction:
 * Replacing the call to Utils.createTempDir with something that creates a temp 
dir on default FS, rather than local filesystem

This is correct:
{quote}Ensuring this method returns a path qualified with a scheme (hdfs://, to 
avoid later fs resolution mistakes.{quote}


> Spark Structure Streaming job failing when submitted in cluster mode
> 
>
> Key: SPARK-26825
> URL: https://issues.apache.org/jira/browse/SPARK-26825
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Andre Araujo
>Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
> 

[jira] [Commented] (SPARK-26819) ArrayIndexOutOfBoundsException while loading a CSV to a Dataset with dependencies spark-core_2.12 and spark-sql_2.12 (with spark-core_2.11 and spark-sql_2.11 : working

2019-02-05 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760670#comment-16760670
 ] 

Hyukjin Kwon commented on SPARK-26819:
--

Is it basically a dependency management issue in your side? From a coursory 
look. different Jackson version is being picked up.

> ArrayIndexOutOfBoundsException while loading a CSV to a Dataset with 
> dependencies spark-core_2.12 and spark-sql_2.12 (with spark-core_2.11 and 
> spark-sql_2.11 : working fine)
> -
>
> Key: SPARK-26819
> URL: https://issues.apache.org/jira/browse/SPARK-26819
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.4.0
> Environment: Java 8, Windows 7.
>Reporter: M. Le Bihan
>Priority: Major
> Attachments: CompteResultatCSV.java, ComptesResultatsIT.java, 
> comptes-communes-Entr'Allier.csv
>
>
> A simple CSV reading to a Dataset fails if Spark 2.4.0 is associated to 
> dependencies spark-spark-core_2.12 and spark-sql_2.12, but works fine with 
> spark-core_2.11 and spark-sql_2.11.
>  
> With _2.12, I encounter this stacktrace :
>  
> {{java.lang.ArrayIndexOutOfBoundsException: 10582}}
> {{ at 
> com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.accept(BytecodeReadingParanamer.java:563)}}
> {{ at 
> com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.access$200(BytecodeReadingParanamer.java:338)}}
> {{ at 
> com.thoughtworks.paranamer.BytecodeReadingParanamer.lookupParameterNames(BytecodeReadingParanamer.java:103)}}
> {{ at 
> com.thoughtworks.paranamer.CachingParanamer.lookupParameterNames(CachingParanamer.java:90)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.getCtorParams(BeanIntrospector.scala:44)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1(BeanIntrospector.scala:58)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1$adapted(BeanIntrospector.scala:58)}}
> {{ at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)}}
> {{ at scala.collection.Iterator.foreach(Iterator.scala:937)}}
> {{ at scala.collection.Iterator.foreach$(Iterator.scala:937)}}
> {{ at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)}}
> {{ at scala.collection.IterableLike.foreach(IterableLike.scala:70)}}
> {{ at scala.collection.IterableLike.foreach$(IterableLike.scala:69)}}
> {{ at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}
> {{ at scala.collection.TraversableLike.flatMap(TraversableLike.scala:240)}}
> {{ at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:237)}}
> {{ at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.findConstructorParam$1(BeanIntrospector.scala:58)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$19(BeanIntrospector.scala:176)}}
> {{ at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)}}
> {{ at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)}}
> {{ at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)}}
> {{ at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194)}}
> {{ at scala.collection.TraversableLike.map(TraversableLike.scala:233)}}
> {{ at scala.collection.TraversableLike.map$(TraversableLike.scala:226)}}
> {{ at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:194)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14(BeanIntrospector.scala:170)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14$adapted(BeanIntrospector.scala:169)}}
> {{ at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)}}
> {{ at scala.collection.immutable.List.foreach(List.scala:388)}}
> {{ at scala.collection.TraversableLike.flatMap(TraversableLike.scala:240)}}
> {{ at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:237)}}
> {{ at scala.collection.immutable.List.flatMap(List.scala:351)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.apply(BeanIntrospector.scala:169)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$._descriptorFor(ScalaAnnotationIntrospectorModule.scala:21)}}
> {{ at 
> com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.fieldName(ScalaAnnotationIntrospectorModule.scala:29)}}
> {{ at 
> 

[jira] [Updated] (SPARK-26823) SBT Build Warnings

2019-02-05 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-26823:
-
Target Version/s:   (was: 2.4.0)

> SBT Build Warnings 
> ---
>
> Key: SPARK-26823
> URL: https://issues.apache.org/jira/browse/SPARK-26823
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Ahshan
>Priority: Major
>
> Recently installed Spark 2.4.0 with scala 2.11  and encountering below 
> warnings over the sbt build phase.
> Noticed same exact warning message over the Jenkins build.
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/lastBuild/consoleFull
>  
>  
> {noformat}
> java -version
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
>  
> {noformat}
>  
> [warn] There may be incompatibilities among your library dependencies; run 
> 'evicted' to see detailed eviction warnings.
> {noformat}
>  
> [warn] Found version conflict(s) in library dependencies; some are suspected 
> to be binary incompatible:
> [warn] * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final, 
> 3.7.0.Final}
> [warn]     +- org.apache.spark:spark-core_2.11:2.4.0             (depends on 
> 3.9.9.Final)
> [warn]     +- org.apache.zookeeper:zookeeper:3.4.6               (depends on 
> 3.6.2.Final)
> [warn]     +- org.apache.hadoop:hadoop-hdfs:2.6.5                (depends on 
> 3.6.2.Final)
> [warn] * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1}
> [warn]     +- org.apache.hadoop:hadoop-yarn-client:2.6.5         (depends on 
> 11.0.2)
> [warn]     +- org.apache.hadoop:hadoop-yarn-api:2.6.5            (depends on 
> 11.0.2)
> [warn]     +- org.apache.hadoop:hadoop-yarn-common:2.6.5         (depends on 
> 11.0.2)
> [warn]     +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5 (depends 
> on 11.0.2)
> [warn]     +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5  (depends on 
> 11.0.2)
> [warn]     +- org.apache.hadoop:hadoop-hdfs:2.6.5                (depends on 
> 11.0.2)
> [warn]     +- org.apache.curator:curator-framework:2.6.0         (depends on 
> 16.0.1)
> [warn]     +- org.apache.curator:curator-client:2.6.0            (depends on 
> 16.0.1)
> [warn]     +- org.apache.curator:curator-recipes:2.6.0           (depends on 
> 16.0.1)
> [warn]     +- org.apache.hadoop:hadoop-common:2.6.5              (depends on 
> 16.0.1)
> [warn]     +- org.htrace:htrace-core:3.0.4                       (depends on 
> 12.0.1)
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26727) CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException

2019-02-05 Thread Laszlo Rigo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760604#comment-16760604
 ] 

Laszlo Rigo edited comment on SPARK-26727 at 2/5/19 10:05 AM:
--

For me these calls still seem to be asynchronous:
{noformat}
var i=0
try{
  while(true){
println(i)
spark.sql("DROP VIEW IF EXISTS testSparkReplace")
spark.sql("CREATE VIEW testSparkReplace as SELECT dummy FROM ae_dual")
while(!spark.catalog.tableExists("testSparkReplace")) {}
i=i+1
  }
}catch{
  case e: Exception => e.printStackTrace()
}{noformat}
This script failed with the same exception when the value of 'i' was 183:
{noformat}
org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
AlreadyExistsException(message:Table testsparkreplace already exists);
at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
at 
org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:236)
at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:319)
at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:175)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.(Dataset.scala:195)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:80)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(:30)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:26)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:41)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:43)
at $line17.$read$$iw$$iw$$iw$$iw$$iw.(:45)
at $line17.$read$$iw$$iw$$iw$$iw.(:47)
at $line17.$read$$iw$$iw$$iw.(:49)
at $line17.$read$$iw$$iw.(:51)
at $line17.$read$$iw.(:53)
at $line17.$read.(:55)
at $line17.$read$.(:59)
at $line17.$read$.()
at $line17.$eval$.$print$lzycompute(:7)
at $line17.$eval$.$print(:6)
at $line17.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
at 
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
at 
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
at 
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at 
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at 
scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:819)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:691)
at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:404)
at 

[jira] [Comment Edited] (SPARK-26727) CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException

2019-02-05 Thread Laszlo Rigo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755846#comment-16755846
 ] 

Laszlo Rigo edited comment on SPARK-26727 at 2/5/19 9:29 AM:
-

[~dongjoon], We have this hive-metastore rpm installed:

rpm -qi hive-metastore
 Name : hive-metastore Relocations: (not relocatable)
 Version : 1.1.0+cdh5.4.8+277 Vendor: (none)
 Release : 1.cdh5.4.8.p1373.1769.el6 Build Date: Fri 25 Mar 2016 05:46:42 PM CET
 Install Date: Tue 29 Jan 2019 07:48:54 PM CET Build Host: 
ec2-pkg-centos-6-1226.vpc.cloudera.com
 Group : System/Daemons Source RPM: 
hive-1.1.0+cdh5.4.8+277-1.cdh5.4.8.p1373.1769.el6.src.rpm
 Size : 5458 License: ASL 2.0
 Signature : DSA/SHA1, Fri 25 Mar 2016 06:12:22 PM CET, Key ID f90c0d8fe8f86acd
 URL : [http://hive.apache.org/]
 Summary : Shared metadata repository for Hive.
 Description :
 This optional package hosts a metadata server for Hive clients across a 
network to use.


hive --version
 Hive 1.1.0-cdh5.4.8
 Subversion 
[file:///data/jenkins/workspace/generic-package-rhel64-6-0/topdir/BUILD/hive-1.1.0-cdh5.4.8]
 -r Unknown
 Compiled by jenkins on Fri Mar 25 09:38:39 PDT 2016
 From source with checksum e4569745d1b7e0b2785263766d99cffc


was (Author: rigolaszlo):
[~dongjoon], We have this hive-metastore rpm installed:

rpm -qi hive-metastore
 Name : hive-metastore Relocations: (not relocatable)
 Version : 1.1.0+cdh5.4.8+277 Vendor: (none)
 Release : 1.cdh5.4.8.p1373.1769.el6 Build Date: Fri 25 Mar 2016 05:46:42 PM CET
 Install Date: Tue 29 Jan 2019 07:48:54 PM CET Build Host: 
ec2-pkg-centos-6-1226.vpc.cloudera.com
 Group : System/Daemons Source RPM: 
hive-1.1.0+cdh5.4.8+277-1.cdh5.4.8.p1373.1769.el6.src.rpm
 Size : 5458 License: ASL 2.0
 Signature : DSA/SHA1, Fri 25 Mar 2016 06:12:22 PM CET, Key ID f90c0d8fe8f86acd
 URL : [http://hive.apache.org/]
 Summary : Shared metadata repository for Hive.
 Description :
 This optional package hosts a metadata server for Hive clients across a 
network to use.

 

# hive --version
Hive 1.1.0-cdh5.4.8
Subversion 
file:///data/jenkins/workspace/generic-package-rhel64-6-0/topdir/BUILD/hive-1.1.0-cdh5.4.8
 -r Unknown
Compiled by jenkins on Fri Mar 25 09:38:39 PDT 2016
>From source with checksum e4569745d1b7e0b2785263766d99cffc

> CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException
> ---
>
> Key: SPARK-26727
> URL: https://issues.apache.org/jira/browse/SPARK-26727
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Srinivas Yarra
>Priority: Major
>
> We experienced that sometimes the Hive query "CREATE OR REPLACE VIEW  name> AS SELECT  FROM " fails with the following exception:
> {code:java}
> // code placeholder
> org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or 
> view '' already exists in database 'default'; at 
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:314)
>  at 
> org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:165) 
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at 
> org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at 
> org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365) at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364) at 
> org.apache.spark.sql.Dataset.(Dataset.scala:195) at 
> org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:80) at 
> org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) ... 49 elided
> {code}
> {code}
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res1: org.apache.spark.sql.DataFrame = []
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res2: org.apache.spark.sql.DataFrame = [] 
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res3: org.apache.spark.sql.DataFrame = [] 
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res4: org.apache.spark.sql.DataFrame = [] 
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as 

[jira] [Comment Edited] (SPARK-26727) CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException

2019-02-05 Thread Laszlo Rigo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755846#comment-16755846
 ] 

Laszlo Rigo edited comment on SPARK-26727 at 2/5/19 9:28 AM:
-

[~dongjoon], We have this hive-metastore rpm installed:

rpm -qi hive-metastore
 Name : hive-metastore Relocations: (not relocatable)
 Version : 1.1.0+cdh5.4.8+277 Vendor: (none)
 Release : 1.cdh5.4.8.p1373.1769.el6 Build Date: Fri 25 Mar 2016 05:46:42 PM CET
 Install Date: Tue 29 Jan 2019 07:48:54 PM CET Build Host: 
ec2-pkg-centos-6-1226.vpc.cloudera.com
 Group : System/Daemons Source RPM: 
hive-1.1.0+cdh5.4.8+277-1.cdh5.4.8.p1373.1769.el6.src.rpm
 Size : 5458 License: ASL 2.0
 Signature : DSA/SHA1, Fri 25 Mar 2016 06:12:22 PM CET, Key ID f90c0d8fe8f86acd
 URL : [http://hive.apache.org/]
 Summary : Shared metadata repository for Hive.
 Description :
 This optional package hosts a metadata server for Hive clients across a 
network to use.

 

# hive --version
Hive 1.1.0-cdh5.4.8
Subversion 
file:///data/jenkins/workspace/generic-package-rhel64-6-0/topdir/BUILD/hive-1.1.0-cdh5.4.8
 -r Unknown
Compiled by jenkins on Fri Mar 25 09:38:39 PDT 2016
>From source with checksum e4569745d1b7e0b2785263766d99cffc


was (Author: rigolaszlo):
[~dongjoon], We have this hive-metastore rpm installed:

rpm -qi hive-metastore
Name : hive-metastore Relocations: (not relocatable)
Version : 1.1.0+cdh5.4.8+277 Vendor: (none)
Release : 1.cdh5.4.8.p1373.1769.el6 Build Date: Fri 25 Mar 2016 05:46:42 PM CET
Install Date: Tue 29 Jan 2019 07:48:54 PM CET Build Host: 
ec2-pkg-centos-6-1226.vpc.cloudera.com
Group : System/Daemons Source RPM: 
hive-1.1.0+cdh5.4.8+277-1.cdh5.4.8.p1373.1769.el6.src.rpm
Size : 5458 License: ASL 2.0
Signature : DSA/SHA1, Fri 25 Mar 2016 06:12:22 PM CET, Key ID f90c0d8fe8f86acd
URL : http://hive.apache.org/
Summary : Shared metadata repository for Hive.
Description :
This optional package hosts a metadata server for Hive clients across a network 
to use.

> CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException
> ---
>
> Key: SPARK-26727
> URL: https://issues.apache.org/jira/browse/SPARK-26727
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Srinivas Yarra
>Priority: Major
>
> We experienced that sometimes the Hive query "CREATE OR REPLACE VIEW  name> AS SELECT  FROM " fails with the following exception:
> {code:java}
> // code placeholder
> org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or 
> view '' already exists in database 'default'; at 
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:314)
>  at 
> org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:165) 
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at 
> org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at 
> org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365) at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364) at 
> org.apache.spark.sql.Dataset.(Dataset.scala:195) at 
> org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:80) at 
> org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) ... 49 elided
> {code}
> {code}
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res1: org.apache.spark.sql.DataFrame = []
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res2: org.apache.spark.sql.DataFrame = [] 
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res3: org.apache.spark.sql.DataFrame = [] 
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res4: org.apache.spark.sql.DataFrame = [] 
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res5: org.apache.spark.sql.DataFrame = [] 
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res6: org.apache.spark.sql.DataFrame = [] 
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 

[jira] [Commented] (SPARK-26727) CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException

2019-02-05 Thread Laszlo Rigo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760604#comment-16760604
 ] 

Laszlo Rigo commented on SPARK-26727:
-

For me these calls still seem to be asynchronous:
{noformat}
var i=0
try{
  while(true){
println(i)
spark.sql("DROP VIEW IF EXISTS testSparkReplace")
spark.sql("CREATE VIEW testSparkReplace as SELECT dummy FROM ae_dual")
while(!spark.catalog.tableExists("testSparkReplace")) {}
i=i+1
  }
}catch{
  case e: Exception => e.printStackTrace()
}{noformat}
This script failed with the same exception when the value of 'i' was 183:
{noformat}
org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
AlreadyExistsException(message:Table testsparkreplace already exists);
at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
at 
org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:236)
at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:319)
at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:175)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.(Dataset.scala:195)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:80)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(:30)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:26)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:41)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:43)
at $line17.$read$$iw$$iw$$iw$$iw$$iw.(:45)
at $line17.$read$$iw$$iw$$iw$$iw.(:47)
at $line17.$read$$iw$$iw$$iw.(:49)
at $line17.$read$$iw$$iw.(:51)
at $line17.$read$$iw.(:53)
at $line17.$read.(:55)
at $line17.$read$.(:59)
at $line17.$read$.()
at $line17.$eval$.$print$lzycompute(:7)
at $line17.$eval$.$print(:6)
at $line17.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
at 
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
at 
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
at 
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at 
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at 
scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:819)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:691)
at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:404)
at