Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14862289
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: 
TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
             val hconf = broadcastedHiveConf.value.value
             val rowWithPartArr = new Array[Object](2)
    -        // Map each tuple to a row object
    -        iter.map { value =>
    -          val deserializer = localDeserializer.newInstance()
    -          deserializer.initialize(hconf, partProps)
    -          val deserializedRow = deserializer.deserialize(value)
    -          rowWithPartArr.update(0, deserializedRow)
    -          rowWithPartArr.update(1, partValues)
    -          rowWithPartArr.asInstanceOf[Object]
    +
    +        val partSerDe = localDeserializer.newInstance()
    +        val tableSerDe = tableSerDeClass.newInstance()
    +        partSerDe.initialize(hconf, partProps)
    +        tableSerDe.initialize(hconf,  tableDesc.getProperties)
    +
    +        val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
    +          partSerDe.getObjectInspector, tableSerDe.getObjectInspector, 
true)
    +          .asInstanceOf[StructObjectInspector]
    +        val partTblObjectInspectorConverter = 
ObjectInspectorConverters.getConverter(
    +          partSerDe.getObjectInspector, tblConvertedOI)
    +
    +        // This is done per partition, and unnecessary to put it in the 
iterations (in iter.map).
    +        rowWithPartArr.update(1, partValues)
    +
    +        // Map each tuple to a row object.
    +        if 
(partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
    +          iter.map { case value =>
    +            rowWithPartArr.update(0, partSerDe.deserialize(value))
    +            rowWithPartArr.asInstanceOf[Object]
    +          }
    +        } else {
    +          iter.map { case value =>
    +            val deserializedRow = {
    +              // If partition schema does not match table schema, update 
the row to match.
    +              val convertedRow =
    +                
partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
    +
    +              // If conversion was performed, convertedRow will be a 
standard Object, but if
    +              // conversion wasn't necessary, it will still be lazy. We 
can't have both across
    +              // partitions, so we serialize and deserialize again to make 
it lazy.
    +              if (tableSerDe.isInstanceOf[OrcSerde]) {
    +                convertedRow
    +              } else {
    +                convertedRow match {
    +                  case _: LazyStruct => convertedRow
    +                  case _: HiveColumnarStruct => convertedRow
    +                  case _ => tableSerDe.deserialize(
    +                    
tableSerDe.asInstanceOf[Serializer].serialize(convertedRow, tblConvertedOI))
    --- End diff --
    
    As mentioned by @chenghao-intel, can we avoid it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to