This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 07caebf [SPARK-17398][SQL] Fix ClassCastException when querying partitioned JSON table 07caebf is described below commit 07caebf2194fc34f1b4aacf2aa6c2d6961587482 Author: Wing Yew Poon <wyp...@cloudera.com> AuthorDate: Fri Dec 20 10:39:26 2019 -0800 [SPARK-17398][SQL] Fix ClassCastException when querying partitioned JSON table When querying a partitioned table with format `org.apache.hive.hcatalog.data.JsonSerDe` and more than one task runs in each executor concurrently, the following exception is encountered: `java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.apache.hive.hcatalog.data.HCatRecord` The exception occurs in `HadoopTableReader.fillObject`. `org.apache.hive.hcatalog.data.JsonSerDe#initialize` populates a `cachedObjectInspector` field by calling `HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector`, which is not thread-safe; this `cachedObjectInspector` is returned by `JsonSerDe#getObjectInspector`. We protect against this Hive bug by synchronizing on an object when we need to call `initialize` on `org.apache.hadoop.hive.serde2.Deserializer` instances (which may be `JsonSerDe` instances). By doing so, the `ObjectInspector` for the `Deserializer` of the partitions of the JSON table and that of the table `SerDe` are the same cached `ObjectInspector` and `HadoopTableReader.fillObject` then works correctly. (If the `ObjectInspector`s are different, then a bug in `HCatRecordObjectInsp [...] To avoid HIVE-15773 / HIVE-21752. No. Tested manually on a cluster with a partitioned JSON table and running a query using more than one core per executor. Before this change, the ClassCastException happens consistently. With this change it does not happen. Closes #26895 from wypoon/SPARK-17398. Authored-by: Wing Yew Poon <wyp...@cloudera.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> (cherry picked from commit c72f88b0ba20727e831ba9755d9628d0347ee3cb) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../org/apache/spark/sql/hive/TableReader.scala | 23 +++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 7d57389..6631073 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -133,7 +133,9 @@ class HadoopTableReader( val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => val hconf = broadcastedHadoopConf.value.value val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, localTableDesc.getProperties) + DeserializerLock.synchronized { + deserializer.initialize(hconf, localTableDesc.getProperties) + } HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) } @@ -255,10 +257,14 @@ class HadoopTableReader( partProps.asScala.foreach { case (key, value) => props.setProperty(key, value) } - deserializer.initialize(hconf, props) + DeserializerLock.synchronized { + deserializer.initialize(hconf, props) + } // get the table deserializer val tableSerDe = localTableDesc.getDeserializerClass.newInstance() - tableSerDe.initialize(hconf, localTableDesc.getProperties) + DeserializerLock.synchronized { + tableSerDe.initialize(hconf, localTableDesc.getProperties) + } // fill the non partition key attributes HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, @@ -337,6 +343,17 @@ private[hive] object HiveTableUtil { } } +/** + * Object to synchronize on when calling org.apache.hadoop.hive.serde2.Deserializer#initialize. + * + * [SPARK-17398] org.apache.hive.hcatalog.data.JsonSerDe#initialize calls the non-thread-safe + * HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector, the results of which are + * returned by JsonSerDe#getObjectInspector. + * To protect against this bug in Hive (HIVE-15773/HIVE-21752), we synchronize on this object + * when calling initialize on Deserializer instances that could be JsonSerDe instances. + */ +private[hive] object DeserializerLock + private[hive] object HadoopTableReader extends HiveInspectors with Logging { /** * Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org