[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489129#comment-16489129 ] Andreas Weise commented on SPARK-24373: --- We are also facing increased runtime duration for our SQL jobs (after upgrading from 2.2.1 to 2.3.0), but didn't trace it down to the root cause. This issue sounds reasonable to me, as we are also using cache() + count() quite often. > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21063) Spark return an empty result from remote hadoop cluster
[ https://issues.apache.org/jira/browse/SPARK-21063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196354#comment-16196354 ] Andreas Weise edited comment on SPARK-21063 at 10/8/17 10:47 PM: - [~sowen] Seems like a bug IMHO. Same problem here with Spark 2.2.0, Hive on MySQL and default HiveThriftServer2. {code:title=Creating Table} spark.range(10).write.mode('overwrite').format("parquet").saveAsTable("test1") {code} {code:title=beeline is fine} $ bin/beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://localhost:1 Connecting to jdbc:hive2://localhost:1 Connected to: Spark SQL (version 2.2.0) Driver: Hive JDBC (version 1.2.1.spark2) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://localhost:1> select * from test1; +-+--+ | id | +-+--+ | 5 | | 6 | | 7 | | 8 | | 9 | | 0 | | 1 | | 2 | | 3 | | 4 | +-+--+ 10 rows selected (0.213 seconds) {code} {code:title=spark+jdbc gives 0 results} spark.read.jdbc("jdbc:hive2://localhost:1/default","test1", properties={"driver": "org.apache.hive.jdbc.HiveDriver"}).collect() [] {code} Whats interesting now is the SQL Statements fired during JDBC call from Spark in contrast to the one from beeline. They are all logged in HiveThriftServer2: {code:title=SQL from Spark+JDBC} SELECT * FROM test1 WHERE 1=0 SELECT "id" FROM test1 {code} {code:title=SQL from beeline} select * from test1 {code} Now tried Spark+JDBC with fetchsize option: {code:title=Spark+JDBC with fetchsize} spark.read.jdbc("jdbc:hive2://localhost:1/default","test1", properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": "10"}).collect() {code} This results in an error. {noformat} org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 4 times, most recent failure: Lost task 0.3 in stage 28.0 (TID 52, 192.168.0.210, executor 10): java.sql.SQLException: Cannot convert column 1 to long: java.lang.NumberFormatException: For input string: "id" at org.apache.hive.jdbc.HiveBaseResultSet.getLong(HiveBaseResultSet.java:372) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NumberFormatException: For input string: "id" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.valueOf(Long.java:803) at org.apache.hive.jdbc.HiveBaseResultSet.getLong(HiveBaseResultSet.java:368) ... 23 more {noformat} was (Author: aweise): Seems like a bug IMHO. Same problem here with Spark 2.2.0, Hive on MySQL and default HiveThriftServer2.
[jira] [Commented] (SPARK-21063) Spark return an empty result from remote hadoop cluster
[ https://issues.apache.org/jira/browse/SPARK-21063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196354#comment-16196354 ] Andreas Weise commented on SPARK-21063: --- Seems like a bug IMHO. Same problem here with Spark 2.2.0, Hive on MySQL and default HiveThriftServer2. {code:title=Creating Table} spark.range(10).write.mode('overwrite').format("parquet").saveAsTable("test1") {code} {code:title=beeline is fine} $ bin/beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://localhost:1 Connecting to jdbc:hive2://localhost:1 Connected to: Spark SQL (version 2.2.0) Driver: Hive JDBC (version 1.2.1.spark2) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://localhost:1> select * from test1; +-+--+ | id | +-+--+ | 5 | | 6 | | 7 | | 8 | | 9 | | 0 | | 1 | | 2 | | 3 | | 4 | +-+--+ 10 rows selected (0.213 seconds) {code} {code:title=spark+jdbc gives 0 results} spark.read.jdbc("jdbc:hive2://localhost:1/default","test1", properties={"driver": "org.apache.hive.jdbc.HiveDriver"}).collect() [] {code} Whats interesting now is the SQL Statements fired during JDBC call from Spark in contrast to the one from beeline. They are all logged in HiveThriftServer2: {code:title=SQL from Spark+JDBC} SELECT * FROM test1 WHERE 1=0 SELECT "id" FROM test1 {code} {code:title=SQL from beeline} select * from test1 {code} Now tried Spark+JDBC with fetchsize option: {code:title=Spark+JDBC with fetchsize} spark.read.jdbc("jdbc:hive2://localhost:1/default","test1", properties={"driver": "org.apache.hive.jdbc.HiveDriver", "fetchsize": "10"}).collect() {code} This results in an error. {noformat} org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 4 times, most recent failure: Lost task 0.3 in stage 28.0 (TID 52, 192.168.0.210, executor 10): java.sql.SQLException: Cannot convert column 1 to long: java.lang.NumberFormatException: For input string: "id" at org.apache.hive.jdbc.HiveBaseResultSet.getLong(HiveBaseResultSet.java:372) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NumberFormatException: For input string: "id" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.valueOf(Long.java:803) at org.apache.hive.jdbc.HiveBaseResultSet.getLong(HiveBaseResultSet.java:368) ... 23 more {noformat} > Spark return an empty result from remote hadoop cluster > --- > > Key: SPARK-21063 > URL: