[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-25 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r251188250
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 ##
 @@ -260,6 +260,65 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   assert(err.contains("Cannot recognize hive type string:"))
}
   }
+
+  test("SPARK-26630: Fix ClassCastException in TableReader while creating 
HadoopRDD") {
+withTable("table_old", "table_pt_old", "table_new", "table_pt_new",
+  "table_ctas_old", "table_ctas_pt_old", "table_ctas_new", 
"table_ctas_pt_new") {
+  spark.sql(
+s"""
+   |CREATE TABLE table_old (id int)
 
 Review comment:
   Done, four columns with different data types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-25 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r251188255
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 ##
 @@ -260,6 +260,65 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   assert(err.contains("Cannot recognize hive type string:"))
}
   }
+
+  test("SPARK-26630: Fix ClassCastException in TableReader while creating 
HadoopRDD") {
+withTable("table_old", "table_pt_old", "table_new", "table_pt_new",
+  "table_ctas_old", "table_ctas_pt_old", "table_ctas_new", 
"table_ctas_pt_new") {
+  spark.sql(
+s"""
+   |CREATE TABLE table_old (id int)
+   |STORED AS
+   |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+   |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+  spark.sql("INSERT INTO table_old VALUES (1), (2), (3), (4), (5)")
+  assert(spark.sql("SELECT COUNT(1) FROM table_old").collect() === 
Array(Row(5)))
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-25 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r251188223
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 ##
 @@ -260,6 +260,65 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   assert(err.contains("Cannot recognize hive type string:"))
}
   }
+
+  test("SPARK-26630: Fix ClassCastException in TableReader while creating 
HadoopRDD") {
+withTable("table_old", "table_pt_old", "table_new", "table_pt_new",
+  "table_ctas_old", "table_ctas_pt_old", "table_ctas_new", 
"table_ctas_pt_new") {
 
 Review comment:
   Done, split it into 4 test cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-23 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r250445471
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,16 +285,26 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
 
 Review comment:
   Yes, added more comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-22 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r250037620
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -311,6 +317,29 @@ class HadoopTableReader(
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(tableDesc: TableDesc, path: String): 
RDD[Writable] = {
 
 Review comment:
   Yes, renamed these classes and changed `java.lang.Class` to `Class`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-21 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249622891
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -311,6 +317,29 @@ class HadoopTableReader(
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(tableDesc: TableDesc, path: String): 
RDD[Writable] = {
 
 Review comment:
   NewHadoopRDD needs a JobConf and HadoopRDD needs a 
function(initializeJobConfFunc). The inputFormatClass of these two methods are 
also different.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-21 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249622532
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -311,6 +317,29 @@ class HadoopTableReader(
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(tableDesc: TableDesc, path: String): 
RDD[Writable] = {
 
 Review comment:
   It seems no duplication in these two methods as I tried moving them into 
createHadoodRdd.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-21 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249430434
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -311,6 +318,30 @@ class HadoopTableReader(
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(tableDesc: TableDesc, path: String): 
RDD[Writable] = {
+
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-21 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249429295
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
 ##
 @@ -192,4 +192,44 @@ class HiveTableScanSuite extends HiveComparisonTest with 
SQLTestUtils with TestH
   case p: HiveTableScanExec => p
 }.get
   }
+
+  test("[SPARK-26630] Fix ClassCastException in TableReader while creating 
HadoopRDD") {
 
 Review comment:
   Done, moved to `HiveCatalogedDDLSuite ` and added several test cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249348817
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -243,7 +239,8 @@ class HadoopTableReader(
 
   // Create local references so that the outer object isn't serialized.
   val localTableDesc = tableDesc
-  createHadoopRdd(localTableDesc, inputPathStr, ifc).mapPartitions { iter 
=>
+  createHadoopRDD(localTableDesc, inputPathStr)
+.mapPartitions { iter =>
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249348806
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,16 +285,28 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: 
String): RDD[Writable] = {
+localTableDesc.getInputFileFormatClass match {
+  case c: Class[_]
 
 Review comment:
   Reverted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249324697
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -311,6 +322,31 @@ class HadoopTableReader(
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(
+  tableDesc: TableDesc, path: String, inputClassName: String): 
RDD[Writable] = {
+
+val newJobConf = new JobConf(hadoopConf)
+HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(newJobConf)
+val inputFormatClass = Utils.classForName(inputClassName)
+  
.asInstanceOf[java.lang.Class[org.apache.hadoop.mapreduce.InputFormat[Writable, 
Writable]]]
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249324564
 
 

 ##
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
 ##
 @@ -192,4 +192,44 @@ class HiveTableScanSuite extends HiveComparisonTest with 
SQLTestUtils with TestH
   case p: HiveTableScanExec => p
 }.get
   }
+
+  test("[SPARK-26630] Fix ClassCastException in TableReader while creating 
HadoopRDD") {
+withTable("table_old", "table_pt_old", "table_new", "table_pt_new") {
+  sql(
+s"""
+   |CREATE TABLE table_old (id int)
+   |STORED AS
+   |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+   |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+  sql(
+s"""
+   |CREATE TABLE table_pt_old (id int)
+   |PARTITIONED BY (a int, b int)
+   |STORED AS
+   |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+   |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+  sql(
+s"""
+   |CREATE TABLE table_new (id int)
+   |STORED AS
+   |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+   |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+   """.stripMargin)
+  sql(
+s"""
+   |CREATE TABLE table_pt_new (id int)
+   |PARTITIONED BY (a int, b int)
+   |STORED AS
+   |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+   |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+   """.stripMargin)
+
+  sql("SELECT count(1) FROM table_old").show()
+  sql("SELECT count(1) FROM table_pt_old").show()
+  sql("SELECT count(1) FROM table_new").show()
+  sql("SELECT count(1) FROM table_pt_new").show()
+}
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249324469
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -289,15 +287,28 @@ class HadoopTableReader(
   }
 
   /**
-   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * The entry of creating a RDD.
+   */
+  private def createHadoopRDD(
+  inputClassName: String, localTableDesc: TableDesc, inputPathStr: 
String): RDD[Writable] = {
+if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]]
 
 Review comment:
   I get it, done!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249324516
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,16 +286,29 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def createHadoopRDD(
+  inputClassName: String, localTableDesc: TableDesc, inputPathStr: 
String): RDD[Writable] = {
+if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]]
+  .isAssignableFrom(Utils.classForName(inputClassName))) {
+  createNewHadoopRdd(localTableDesc, inputPathStr, inputClassName)
+} else {
+  createOldHadoopRdd(localTableDesc, inputPathStr, inputClassName)
+}
+  }
+
   /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
* applied locally on each slave.
*/
-  private def createHadoopRdd(
-tableDesc: TableDesc,
-path: String,
-inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
+  private def createOldHadoopRdd(
+  tableDesc: TableDesc, path: String, inputClassName: String): 
RDD[Writable] = {
 
 val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
+val inputFormatClass = Utils.classForName(inputClassName)
+  
.asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, 
Writable]]]
 
 Review comment:
   Done, so I can removed `inputClassName: String`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249324488
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -311,6 +322,31 @@ class HadoopTableReader(
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(
+  tableDesc: TableDesc, path: String, inputClassName: String): 
RDD[Writable] = {
 
 Review comment:
   Done, because of removing `inputClassName: String`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249318001
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -311,6 +322,31 @@ class HadoopTableReader(
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(
+  tableDesc: TableDesc, path: String, inputClassName: String): 
RDD[Writable] = {
 
 Review comment:
   Maybe it's also okay when a method declaration parameters fit two lines? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249313928
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -31,12 +31,12 @@ import org.apache.hadoop.hive.serde2.Deserializer
 import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, 
StructObjectInspector}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
+import org.apache.spark.rdd._
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r24933
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -300,20 +300,21 @@ class HadoopTableReader(
   }
 
   /**
-   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * Creates a OldHadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249310923
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -289,15 +287,28 @@ class HadoopTableReader(
   }
 
   /**
-   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * The entry of creating a RDD.
+   */
+  private def createHadoopRDD(
+  inputClassName: String, localTableDesc: TableDesc, inputPathStr: 
String): RDD[Writable] = {
+if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]]
 
 Review comment:
   I think we need that because the result of `hiveTable.getInputFormatClass` 
can be various.
   It's difficult to list all of the input format classes and we can find the 
similar usage in `org.apache.spark.scheduler.InputFormatInfo`(line 71 and 76)
   
   
   
![image](https://user-images.githubusercontent.com/25916266/51449050-45b67a80-1d65-11e9-931d-826b0aba1ab9.png)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249277289
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +286,65 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def createHadoopRDD(
+  inputClassName: String, localTableDesc: TableDesc, inputPathStr: 
String): RDD[Writable] = {
+if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]]
+  .isAssignableFrom(Utils.classForName(inputClassName))) {
+  createNewHadoopRdd(localTableDesc, inputPathStr, inputClassName)
+} else {
+  createOldHadoopRdd(localTableDesc, inputPathStr, inputClassName)
+}
+  }
+
   /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
* applied locally on each slave.
*/
-  private def createHadoopRdd(
-tableDesc: TableDesc,
-path: String,
-inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
+  private def createOldHadoopRdd(
+  tableDesc: TableDesc, path: String, inputClassName: String): 
RDD[Writable] = {
 
 val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
 
 val rdd = new HadoopRDD(
   sparkSession.sparkContext,
   
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
   Some(initializeJobConfFunc),
-  inputFormatClass,
+  Utils.classForName(inputClassName)
+
.asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, 
Writable]]],
 
 Review comment:
   I get it, done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249277179
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +286,65 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def createHadoopRDD(
+  inputClassName: String, localTableDesc: TableDesc, inputPathStr: 
String): RDD[Writable] = {
+if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]]
+  .isAssignableFrom(Utils.classForName(inputClassName))) {
+  createNewHadoopRdd(localTableDesc, inputPathStr, inputClassName)
+} else {
+  createOldHadoopRdd(localTableDesc, inputPathStr, inputClassName)
+}
+  }
+
   /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
* applied locally on each slave.
*/
-  private def createHadoopRdd(
-tableDesc: TableDesc,
-path: String,
-inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
+  private def createOldHadoopRdd(
+  tableDesc: TableDesc, path: String, inputClassName: String): 
RDD[Writable] = {
 
 val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
 
 val rdd = new HadoopRDD(
   sparkSession.sparkContext,
   
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
   Some(initializeJobConfFunc),
-  inputFormatClass,
+  Utils.classForName(inputClassName)
+
.asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, 
Writable]]],
   classOf[Writable],
   classOf[Writable],
   _minSplitsPerRDD)
 
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(
+  tableDesc: TableDesc, path: String, inputClassName: String): 
RDD[Writable] = {
+
+val newJobConf = new JobConf(hadoopConf)
+(HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) 
_).apply(newJobConf)
 
 Review comment:
   Oops, done..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249277162
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +285,65 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def createHadoopRDD(
+  inputClassName: String, localTableDesc: TableDesc, inputPathStr: 
String): RDD[Writable] = {
+if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]]
+  .isAssignableFrom(Utils.classForName(inputClassName))) {
+  createNewHadoopRdd(localTableDesc, inputPathStr, inputClassName)
+} else {
+  createOldHadoopRdd(localTableDesc, inputPathStr, inputClassName)
+}
+  }
+
   /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
* applied locally on each slave.
*/
-  private def createHadoopRdd(
-tableDesc: TableDesc,
-path: String,
-inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
+  private def createOldHadoopRdd(
+  tableDesc: TableDesc, path: String, inputClassName: String): 
RDD[Writable] = {
 
 val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
 
 val rdd = new HadoopRDD(
   sparkSession.sparkContext,
   
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
   Some(initializeJobConfFunc),
-  inputFormatClass,
+  Utils.classForName(inputClassName)
+
.asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, 
Writable]]],
   classOf[Writable],
   classOf[Writable],
   _minSplitsPerRDD)
 
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
 
 Review comment:
   Done, the same for OldHadoopRDD.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249276700
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +285,105 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def getRDD(
+inputClassName: String,
+localTableDesc: TableDesc,
+inputPathStr: String): RDD[Writable] = {
+if (isCreateNewHadoopRDD(inputClassName)) {
+  createNewHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+} else {
+  createHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+}
+  }
+
   /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
* applied locally on each slave.
*/
   private def createHadoopRdd(
 tableDesc: TableDesc,
 path: String,
-inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
+inputClassName: String): RDD[Writable] = {
 
 val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
 
 val rdd = new HadoopRDD(
   sparkSession.sparkContext,
   
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
   Some(initializeJobConfFunc),
-  inputFormatClass,
+  getInputFormat(inputClassName),
   classOf[Writable],
   classOf[Writable],
   _minSplitsPerRDD)
 
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(
+tableDesc: TableDesc,
+path: String,
+inputClassName: String): RDD[Writable] = {
 
 Review comment:
   Sorry for making this kind of mistake, done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249276704
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +285,105 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def getRDD(
+inputClassName: String,
+localTableDesc: TableDesc,
+inputPathStr: String): RDD[Writable] = {
+if (isCreateNewHadoopRDD(inputClassName)) {
+  createNewHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+} else {
+  createHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249276636
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +285,105 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def getRDD(
+inputClassName: String,
+localTableDesc: TableDesc,
+inputPathStr: String): RDD[Writable] = {
+if (isCreateNewHadoopRDD(inputClassName)) {
+  createNewHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+} else {
+  createHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+}
+  }
+
   /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
* applied locally on each slave.
*/
   private def createHadoopRdd(
 tableDesc: TableDesc,
 path: String,
-inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
+inputClassName: String): RDD[Writable] = {
 
 val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
 
 val rdd = new HadoopRDD(
   sparkSession.sparkContext,
   
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
   Some(initializeJobConfFunc),
-  inputFormatClass,
+  getInputFormat(inputClassName),
   classOf[Writable],
   classOf[Writable],
   _minSplitsPerRDD)
 
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(
+tableDesc: TableDesc,
+path: String,
+inputClassName: String): RDD[Writable] = {
+
+val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
+
+val newJobConf = new JobConf(hadoopConf)
+initializeJobConfFunc.apply(newJobConf)
+val rdd = new NewHadoopRDD(
+  sparkSession.sparkContext,
+  getNewInputFormat(inputClassName),
+  classOf[Writable],
+  classOf[Writable],
+  newJobConf
+)
+
+// Only take the value (skip the key) because Hive works only with values.
+rdd.map(_._2)
+  }
+
+  /**
+   * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will 
optimize the input
+   * method while reading Hive tables.
+   * For old input format `org.apache.hadoop.mapred.InputFormat`.
+   */
+  private def getInputFormat(
 
 Review comment:
   Make sense, done!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249276638
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +285,105 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def getRDD(
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249276602
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +285,105 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def getRDD(
+inputClassName: String,
+localTableDesc: TableDesc,
+inputPathStr: String): RDD[Writable] = {
+if (isCreateNewHadoopRDD(inputClassName)) {
+  createNewHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+} else {
+  createHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+}
+  }
+
   /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
* applied locally on each slave.
*/
   private def createHadoopRdd(
 tableDesc: TableDesc,
 path: String,
-inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
+inputClassName: String): RDD[Writable] = {
 
 val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
 
 val rdd = new HadoopRDD(
   sparkSession.sparkContext,
   
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
   Some(initializeJobConfFunc),
-  inputFormatClass,
+  getInputFormat(inputClassName),
   classOf[Writable],
   classOf[Writable],
   _minSplitsPerRDD)
 
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(
+tableDesc: TableDesc,
+path: String,
+inputClassName: String): RDD[Writable] = {
+
+val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
 
 Review comment:
   yes, optimized.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-20 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r249276580
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -123,9 +123,7 @@ class HadoopTableReader(
 val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
 
 // logDebug("Table input: %s".format(tablePath))
-val ifc = hiveTable.getInputFormatClass
-  .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
-val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)
+val hadoopRDD = getRDD(hiveTable.getInputFormatClass.getName, 
localTableDesc, inputPathStr)
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-17 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r248572552
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +285,119 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def getRDD(
+inputClassName: String,
+localTableDesc: TableDesc,
+inputPathStr: String): RDD[Writable] = {
+if (isCreateNewHadoopRDD(inputClassName)) {
+  createNewHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+} else {
+  createHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+}
+  }
+
   /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
* applied locally on each slave.
*/
   private def createHadoopRdd(
 tableDesc: TableDesc,
 path: String,
-inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
+inputClassName: String): RDD[Writable] = {
 
 val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
 
 val rdd = new HadoopRDD(
   sparkSession.sparkContext,
   
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
   Some(initializeJobConfFunc),
-  inputFormatClass,
+  getInputFormat(inputClassName),
   classOf[Writable],
   classOf[Writable],
   _minSplitsPerRDD)
 
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(
+tableDesc: TableDesc,
+path: String,
+inputClassName: String): RDD[Writable] = {
+
+val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
+
+val newJobConf = new JobConf(hadoopConf)
+initializeJobConfFunc.apply(newJobConf)
+val rdd = new NewHadoopRDD(
+  sparkSession.sparkContext,
+  getNewInputFormat(inputClassName),
+  classOf[Writable],
+  classOf[Writable],
+  newJobConf
+)
+
+// Only take the value (skip the key) because Hive works only with values.
+rdd.map(_._2)
+  }
+
+  /**
+   * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will 
optimize the input
+   * method while reading Hive tables.
+   * For old input format `org.apache.hadoop.mapred.InputFormat`.
+   */
+  private def getInputFormat(
+inputClassName: String): 
Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]] = {
+
+var ifc = Utils.classForName(inputClassName)
+  
.asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, 
Writable]]]
+if (conf.getConf(HiveUtils.HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED) &&
+  "org.apache.hadoop.mapred.TextInputFormat".equals(inputClassName)) {
+ifc = 
Utils.classForName("org.apache.hadoop.mapred.lib.CombineTextInputFormat")
 
 Review comment:
   I've added the newline character. Could you please help restart the test?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix ClassCastException in TableReader while creating HadoopRDD

2019-01-17 Thread GitBox
Deegue commented on a change in pull request #23559: [SPARK-26630][SQL] Fix 
ClassCastException in TableReader while creating HadoopRDD
URL: https://github.com/apache/spark/pull/23559#discussion_r248571444
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
 ##
 @@ -288,29 +285,119 @@ class HadoopTableReader(
 }
   }
 
+  /**
+   * The entry of creating a RDD.
+   */
+  private def getRDD(
+inputClassName: String,
+localTableDesc: TableDesc,
+inputPathStr: String): RDD[Writable] = {
+if (isCreateNewHadoopRDD(inputClassName)) {
+  createNewHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+} else {
+  createHadoopRdd(
+localTableDesc,
+inputPathStr,
+inputClassName)
+}
+  }
+
   /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
* applied locally on each slave.
*/
   private def createHadoopRdd(
 tableDesc: TableDesc,
 path: String,
-inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
+inputClassName: String): RDD[Writable] = {
 
 val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
 
 val rdd = new HadoopRDD(
   sparkSession.sparkContext,
   
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
   Some(initializeJobConfFunc),
-  inputFormatClass,
+  getInputFormat(inputClassName),
   classOf[Writable],
   classOf[Writable],
   _minSplitsPerRDD)
 
 // Only take the value (skip the key) because Hive works only with values.
 rdd.map(_._2)
   }
+
+  /**
+   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRdd(
+tableDesc: TableDesc,
+path: String,
+inputClassName: String): RDD[Writable] = {
+
+val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
+
+val newJobConf = new JobConf(hadoopConf)
+initializeJobConfFunc.apply(newJobConf)
+val rdd = new NewHadoopRDD(
+  sparkSession.sparkContext,
+  getNewInputFormat(inputClassName),
+  classOf[Writable],
+  classOf[Writable],
+  newJobConf
+)
+
+// Only take the value (skip the key) because Hive works only with values.
+rdd.map(_._2)
+  }
+
+  /**
+   * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will 
optimize the input
+   * method while reading Hive tables.
+   * For old input format `org.apache.hadoop.mapred.InputFormat`.
+   */
+  private def getInputFormat(
+inputClassName: String): 
Class[org.apache.hadoop.mapred.InputFormat[Writable, Writable]] = {
+
+var ifc = Utils.classForName(inputClassName)
+  
.asInstanceOf[java.lang.Class[org.apache.hadoop.mapred.InputFormat[Writable, 
Writable]]]
+if (conf.getConf(HiveUtils.HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED) &&
+  "org.apache.hadoop.mapred.TextInputFormat".equals(inputClassName)) {
+ifc = 
Utils.classForName("org.apache.hadoop.mapred.lib.CombineTextInputFormat")
 
 Review comment:
   You're right, I've removed this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org