[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r251188250 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ## @@ -260,6 +260,65 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA assert(err.contains("Cannot recognize hive type string:")) } } + + test("SPARK-26630: Fix ClassCastException in TableReader while creating HadoopRDD") { +withTable("table_old", "table_pt_old", "table_new", "table_pt_new", + "table_ctas_old", "table_ctas_pt_old", "table_ctas_new", "table_ctas_pt_new") { + spark.sql( +s""" + |CREATE TABLE table_old (id int) Review comment: Done, four columns with different data types. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r251188255 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ## @@ -260,6 +260,65 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA assert(err.contains("Cannot recognize hive type string:")) } } + + test("SPARK-26630: Fix ClassCastException in TableReader while creating HadoopRDD") { +withTable("table_old", "table_pt_old", "table_new", "table_pt_new", + "table_ctas_old", "table_ctas_pt_old", "table_ctas_new", "table_ctas_pt_new") { + spark.sql( +s""" + |CREATE TABLE table_old (id int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin) + spark.sql("INSERT INTO table_old VALUES (1), (2), (3), (4), (5)") + assert(spark.sql("SELECT COUNT(1) FROM table_old").collect() === Array(Row(5))) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r251188223 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ## @@ -260,6 +260,65 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA assert(err.contains("Cannot recognize hive type string:")) } } + + test("SPARK-26630: Fix ClassCastException in TableReader while creating HadoopRDD") { +withTable("table_old", "table_pt_old", "table_new", "table_pt_new", + "table_ctas_old", "table_ctas_pt_old", "table_ctas_new", "table_ctas_pt_new") { Review comment: Done, split it into 4 test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r250445471 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,16 +285,26 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. Review comment: Yes, added more comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r250037620 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -311,6 +317,29 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd(tableDesc: TableDesc, path: String): RDD[Writable] = { Review comment: Yes, renamed these classes and changed `java.lang.Class` to `Class`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249622891 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -311,6 +317,29 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd(tableDesc: TableDesc, path: String): RDD[Writable] = { Review comment: NewHadoopRDD needs a JobConf and HadoopRDD needs a function(initializeJobConfFunc). The inputFormatClass of these two methods are also different. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249622532 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -311,6 +317,29 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd(tableDesc: TableDesc, path: String): RDD[Writable] = { Review comment: It seems no duplication in these two methods as I tried moving them into createHadoodRdd. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249430434 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -311,6 +318,30 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd(tableDesc: TableDesc, path: String): RDD[Writable] = { + Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249429295 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala ## @@ -192,4 +192,44 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH case p: HiveTableScanExec => p }.get } + + test("[SPARK-26630] Fix ClassCastException in TableReader while creating HadoopRDD") { Review comment: Done, moved to `HiveCatalogedDDLSuite ` and added several test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249348817 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -243,7 +239,8 @@ class HadoopTableReader( // Create local references so that the outer object isn't serialized. val localTableDesc = tableDesc - createHadoopRdd(localTableDesc, inputPathStr, ifc).mapPartitions { iter => + createHadoopRDD(localTableDesc, inputPathStr) +.mapPartitions { iter => Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249348806 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,16 +285,28 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { +localTableDesc.getInputFileFormatClass match { + case c: Class[_] Review comment: Reverted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249324697 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -311,6 +322,31 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd( + tableDesc: TableDesc, path: String, inputClassName: String): RDD[Writable] = { + +val newJobConf = new JobConf(hadoopConf) +HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(newJobConf) +val inputFormatClass = Utils.classForName(inputClassName) + .asInstanceOf[java.lang.Class[org.apache.hadoop.mapreduce.InputFormat[Writable, Writable]]] Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249324564 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala ## @@ -192,4 +192,44 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH case p: HiveTableScanExec => p }.get } + + test("[SPARK-26630] Fix ClassCastException in TableReader while creating HadoopRDD") { +withTable("table_old", "table_pt_old", "table_new", "table_pt_new") { + sql( +s""" + |CREATE TABLE table_old (id int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin) + sql( +s""" + |CREATE TABLE table_pt_old (id int) + |PARTITIONED BY (a int, b int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin) + sql( +s""" + |CREATE TABLE table_new (id int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin) + sql( +s""" + |CREATE TABLE table_pt_new (id int) + |PARTITIONED BY (a int, b int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin) + + sql("SELECT count(1) FROM table_old").show() + sql("SELECT count(1) FROM table_pt_old").show() + sql("SELECT count(1) FROM table_new").show() + sql("SELECT count(1) FROM table_pt_new").show() +} Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249324469 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -289,15 +287,28 @@ class HadoopTableReader( } /** - * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * The entry of creating a RDD. + */ + private def createHadoopRDD( + inputClassName: String, localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { +if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]] Review comment: I get it, done! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249324516 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,16 +286,29 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def createHadoopRDD( + inputClassName: String, localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { +if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]] + .isAssignableFrom(Utils.classForName(inputClassName))) { + createNewHadoopRdd(localTableDesc, inputPathStr, inputClassName) +} else { + createOldHadoopRdd(localTableDesc, inputPathStr, inputClassName) +} + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each slave. */ - private def createHadoopRdd( -tableDesc: TableDesc, -path: String, -inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { + private def createOldHadoopRdd( + tableDesc: TableDesc, path: String, inputClassName: String): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ +val inputFormatClass = Utils.classForName(inputClassName) + .asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]]] Review comment: Done, so I can removed `inputClassName: String`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249324488 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -311,6 +322,31 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd( + tableDesc: TableDesc, path: String, inputClassName: String): RDD[Writable] = { Review comment: Done, because of removing `inputClassName: String`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249318001 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -311,6 +322,31 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd( + tableDesc: TableDesc, path: String, inputClassName: String): RDD[Writable] = { Review comment: Maybe it's also okay when a method declaration parameters fit two lines? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249313928 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -31,12 +31,12 @@ import org.apache.hadoop.hive.serde2.Deserializer import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} +import org.apache.spark.rdd._ Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r24933 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -300,20 +300,21 @@ class HadoopTableReader( } /** - * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * Creates a OldHadoopRDD based on the broadcasted HiveConf and other job properties that will be Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249310923 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -289,15 +287,28 @@ class HadoopTableReader( } /** - * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * The entry of creating a RDD. + */ + private def createHadoopRDD( + inputClassName: String, localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { +if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]] Review comment: I think we need that because the result of `hiveTable.getInputFormatClass` can be various. It's difficult to list all of the input format classes and we can find the similar usage in `org.apache.spark.scheduler.InputFormatInfo`(line 71 and 76) ![image](https://user-images.githubusercontent.com/25916266/51449050-45b67a80-1d65-11e9-931d-826b0aba1ab9.png) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249277289 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +286,65 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def createHadoopRDD( + inputClassName: String, localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { +if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]] + .isAssignableFrom(Utils.classForName(inputClassName))) { + createNewHadoopRdd(localTableDesc, inputPathStr, inputClassName) +} else { + createOldHadoopRdd(localTableDesc, inputPathStr, inputClassName) +} + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each slave. */ - private def createHadoopRdd( -tableDesc: TableDesc, -path: String, -inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { + private def createOldHadoopRdd( + tableDesc: TableDesc, path: String, inputClassName: String): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( sparkSession.sparkContext, _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), - inputFormatClass, + Utils.classForName(inputClassName) + .asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]]], Review comment: I get it, done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249277179 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +286,65 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def createHadoopRDD( + inputClassName: String, localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { +if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]] + .isAssignableFrom(Utils.classForName(inputClassName))) { + createNewHadoopRdd(localTableDesc, inputPathStr, inputClassName) +} else { + createOldHadoopRdd(localTableDesc, inputPathStr, inputClassName) +} + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each slave. */ - private def createHadoopRdd( -tableDesc: TableDesc, -path: String, -inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { + private def createOldHadoopRdd( + tableDesc: TableDesc, path: String, inputClassName: String): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( sparkSession.sparkContext, _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), - inputFormatClass, + Utils.classForName(inputClassName) + .asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]]], classOf[Writable], classOf[Writable], _minSplitsPerRDD) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd( + tableDesc: TableDesc, path: String, inputClassName: String): RDD[Writable] = { + +val newJobConf = new JobConf(hadoopConf) +(HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _).apply(newJobConf) Review comment: Oops, done.. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249277162 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +285,65 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def createHadoopRDD( + inputClassName: String, localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { +if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]] + .isAssignableFrom(Utils.classForName(inputClassName))) { + createNewHadoopRdd(localTableDesc, inputPathStr, inputClassName) +} else { + createOldHadoopRdd(localTableDesc, inputPathStr, inputClassName) +} + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each slave. */ - private def createHadoopRdd( -tableDesc: TableDesc, -path: String, -inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { + private def createOldHadoopRdd( + tableDesc: TableDesc, path: String, inputClassName: String): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( sparkSession.sparkContext, _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), - inputFormatClass, + Utils.classForName(inputClassName) + .asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]]], classOf[Writable], classOf[Writable], _minSplitsPerRDD) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be Review comment: Done, the same for OldHadoopRDD. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249276700 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +285,105 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def getRDD( +inputClassName: String, +localTableDesc: TableDesc, +inputPathStr: String): RDD[Writable] = { +if (isCreateNewHadoopRDD(inputClassName)) { + createNewHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} else { + createHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each slave. */ private def createHadoopRdd( tableDesc: TableDesc, path: String, -inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { +inputClassName: String): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( sparkSession.sparkContext, _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), - inputFormatClass, + getInputFormat(inputClassName), classOf[Writable], classOf[Writable], _minSplitsPerRDD) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd( +tableDesc: TableDesc, +path: String, +inputClassName: String): RDD[Writable] = { Review comment: Sorry for making this kind of mistake, done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249276704 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +285,105 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def getRDD( +inputClassName: String, +localTableDesc: TableDesc, +inputPathStr: String): RDD[Writable] = { +if (isCreateNewHadoopRDD(inputClassName)) { + createNewHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} else { + createHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249276636 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +285,105 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def getRDD( +inputClassName: String, +localTableDesc: TableDesc, +inputPathStr: String): RDD[Writable] = { +if (isCreateNewHadoopRDD(inputClassName)) { + createNewHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} else { + createHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each slave. */ private def createHadoopRdd( tableDesc: TableDesc, path: String, -inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { +inputClassName: String): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( sparkSession.sparkContext, _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), - inputFormatClass, + getInputFormat(inputClassName), classOf[Writable], classOf[Writable], _minSplitsPerRDD) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd( +tableDesc: TableDesc, +path: String, +inputClassName: String): RDD[Writable] = { + +val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ + +val newJobConf = new JobConf(hadoopConf) +initializeJobConfFunc.apply(newJobConf) +val rdd = new NewHadoopRDD( + sparkSession.sparkContext, + getNewInputFormat(inputClassName), + classOf[Writable], + classOf[Writable], + newJobConf +) + +// Only take the value (skip the key) because Hive works only with values. +rdd.map(_._2) + } + + /** + * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will optimize the input + * method while reading Hive tables. + * For old input format `org.apache.hadoop.mapred.InputFormat`. + */ + private def getInputFormat( Review comment: Make sense, done! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249276638 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +285,105 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def getRDD( Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249276602 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +285,105 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def getRDD( +inputClassName: String, +localTableDesc: TableDesc, +inputPathStr: String): RDD[Writable] = { +if (isCreateNewHadoopRDD(inputClassName)) { + createNewHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} else { + createHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each slave. */ private def createHadoopRdd( tableDesc: TableDesc, path: String, -inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { +inputClassName: String): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( sparkSession.sparkContext, _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), - inputFormatClass, + getInputFormat(inputClassName), classOf[Writable], classOf[Writable], _minSplitsPerRDD) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd( +tableDesc: TableDesc, +path: String, +inputClassName: String): RDD[Writable] = { + +val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ Review comment: yes, optimized. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r249276580 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -123,9 +123,7 @@ class HadoopTableReader( val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) // logDebug("Table input: %s".format(tablePath)) -val ifc = hiveTable.getInputFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] -val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc) +val hadoopRDD = getRDD(hiveTable.getInputFormatClass.getName, localTableDesc, inputPathStr) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r248572552 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +285,119 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def getRDD( +inputClassName: String, +localTableDesc: TableDesc, +inputPathStr: String): RDD[Writable] = { +if (isCreateNewHadoopRDD(inputClassName)) { + createNewHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} else { + createHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each slave. */ private def createHadoopRdd( tableDesc: TableDesc, path: String, -inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { +inputClassName: String): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( sparkSession.sparkContext, _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), - inputFormatClass, + getInputFormat(inputClassName), classOf[Writable], classOf[Writable], _minSplitsPerRDD) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd( +tableDesc: TableDesc, +path: String, +inputClassName: String): RDD[Writable] = { + +val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ + +val newJobConf = new JobConf(hadoopConf) +initializeJobConfFunc.apply(newJobConf) +val rdd = new NewHadoopRDD( + sparkSession.sparkContext, + getNewInputFormat(inputClassName), + classOf[Writable], + classOf[Writable], + newJobConf +) + +// Only take the value (skip the key) because Hive works only with values. +rdd.map(_._2) + } + + /** + * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will optimize the input + * method while reading Hive tables. + * For old input format `org.apache.hadoop.mapred.InputFormat`. + */ + private def getInputFormat( +inputClassName: String): Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]] = { + +var ifc = Utils.classForName(inputClassName) + .asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]]] +if (conf.getConf(HiveUtils.HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED) && + "org.apache.hadoop.mapred.TextInputFormat".equals(inputClassName)) { +ifc = Utils.classForName("org.apache.hadoop.mapred.lib.CombineTextInputFormat") Review comment: I've added the newline character. Could you please help restart the test? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD URL: https://github.com/apache/spark/pull/23559#discussion_r248571444 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ## @@ -288,29 +285,119 @@ class HadoopTableReader( } } + /** + * The entry of creating a RDD. + */ + private def getRDD( +inputClassName: String, +localTableDesc: TableDesc, +inputPathStr: String): RDD[Writable] = { +if (isCreateNewHadoopRDD(inputClassName)) { + createNewHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} else { + createHadoopRdd( +localTableDesc, +inputPathStr, +inputClassName) +} + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each slave. */ private def createHadoopRdd( tableDesc: TableDesc, path: String, -inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { +inputClassName: String): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( sparkSession.sparkContext, _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), - inputFormatClass, + getInputFormat(inputClassName), classOf[Writable], classOf[Writable], _minSplitsPerRDD) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createNewHadoopRdd( +tableDesc: TableDesc, +path: String, +inputClassName: String): RDD[Writable] = { + +val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ + +val newJobConf = new JobConf(hadoopConf) +initializeJobConfFunc.apply(newJobConf) +val rdd = new NewHadoopRDD( + sparkSession.sparkContext, + getNewInputFormat(inputClassName), + classOf[Writable], + classOf[Writable], + newJobConf +) + +// Only take the value (skip the key) because Hive works only with values. +rdd.map(_._2) + } + + /** + * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will optimize the input + * method while reading Hive tables. + * For old input format `org.apache.hadoop.mapred.InputFormat`. + */ + private def getInputFormat( +inputClassName: String): Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]] = { + +var ifc = Utils.classForName(inputClassName) + .asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]]] +if (conf.getConf(HiveUtils.HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED) && + "org.apache.hadoop.mapred.TextInputFormat".equals(inputClassName)) { +ifc = Utils.classForName("org.apache.hadoop.mapred.lib.CombineTextInputFormat") Review comment: You're right, I've removed this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org