This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 07caebf  [SPARK-17398][SQL] Fix ClassCastException when querying 
partitioned JSON table
07caebf is described below

commit 07caebf2194fc34f1b4aacf2aa6c2d6961587482
Author: Wing Yew Poon <wyp...@cloudera.com>
AuthorDate: Fri Dec 20 10:39:26 2019 -0800

    [SPARK-17398][SQL] Fix ClassCastException when querying partitioned JSON 
table
    
    When querying a partitioned table with format 
`org.apache.hive.hcatalog.data.JsonSerDe` and more than one task runs in each 
executor concurrently, the following exception is encountered:
    
    `java.lang.ClassCastException: java.util.ArrayList cannot be cast to 
org.apache.hive.hcatalog.data.HCatRecord`
    
    The exception occurs in `HadoopTableReader.fillObject`.
    
    `org.apache.hive.hcatalog.data.JsonSerDe#initialize` populates a 
`cachedObjectInspector` field by calling 
`HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector`, which is not 
thread-safe; this `cachedObjectInspector` is returned by 
`JsonSerDe#getObjectInspector`.
    
    We protect against this Hive bug by synchronizing on an object when we need 
to call `initialize` on `org.apache.hadoop.hive.serde2.Deserializer` instances 
(which may be `JsonSerDe` instances). By doing so, the `ObjectInspector` for 
the `Deserializer` of the partitions of the JSON table and that of the table 
`SerDe` are the same cached `ObjectInspector` and 
`HadoopTableReader.fillObject` then works correctly. (If the `ObjectInspector`s 
are different, then a bug in `HCatRecordObjectInsp [...]
    
    To avoid HIVE-15773 / HIVE-21752.
    
    No.
    
    Tested manually on a cluster with a partitioned JSON table and running a 
query using more than one core per executor. Before this change, the 
ClassCastException happens consistently. With this change it does not happen.
    
    Closes #26895 from wypoon/SPARK-17398.
    
    Authored-by: Wing Yew Poon <wyp...@cloudera.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
    (cherry picked from commit c72f88b0ba20727e831ba9755d9628d0347ee3cb)
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/sql/hive/TableReader.scala    | 23 +++++++++++++++++++---
 1 file changed, 20 insertions(+), 3 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 7d57389..6631073 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -133,7 +133,9 @@ class HadoopTableReader(
     val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
       val hconf = broadcastedHadoopConf.value.value
       val deserializer = deserializerClass.newInstance()
-      deserializer.initialize(hconf, localTableDesc.getProperties)
+      DeserializerLock.synchronized {
+        deserializer.initialize(hconf, localTableDesc.getProperties)
+      }
       HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, 
mutableRow, deserializer)
     }
 
@@ -255,10 +257,14 @@ class HadoopTableReader(
         partProps.asScala.foreach {
           case (key, value) => props.setProperty(key, value)
         }
-        deserializer.initialize(hconf, props)
+        DeserializerLock.synchronized {
+          deserializer.initialize(hconf, props)
+        }
         // get the table deserializer
         val tableSerDe = localTableDesc.getDeserializerClass.newInstance()
-        tableSerDe.initialize(hconf, localTableDesc.getProperties)
+        DeserializerLock.synchronized {
+          tableSerDe.initialize(hconf, localTableDesc.getProperties)
+        }
 
         // fill the non partition key attributes
         HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
@@ -337,6 +343,17 @@ private[hive] object HiveTableUtil {
   }
 }
 
+/**
+ * Object to synchronize on when calling 
org.apache.hadoop.hive.serde2.Deserializer#initialize.
+ *
+ * [SPARK-17398] org.apache.hive.hcatalog.data.JsonSerDe#initialize calls the 
non-thread-safe
+ * HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector, the results 
of which are
+ * returned by JsonSerDe#getObjectInspector.
+ * To protect against this bug in Hive (HIVE-15773/HIVE-21752), we synchronize 
on this object
+ * when calling initialize on Deserializer instances that could be JsonSerDe 
instances.
+ */
+private[hive] object DeserializerLock
+
 private[hive] object HadoopTableReader extends HiveInspectors with Logging {
   /**
    * Curried. After given an argument for 'path', the resulting JobConf => 
Unit closure is used to


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to