Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8509#discussion_r38812386
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 ---
    @@ -160,4 +101,168 @@ private[parquet] object CatalystReadSupport {
       val SPARK_ROW_REQUESTED_SCHEMA = 
"org.apache.spark.sql.parquet.row.requested_schema"
     
       val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
    +
    +  /**
    +   * Tailors `parquetSchema` according to `catalystSchema` by removing 
column paths don't exist
    +   * in `catalystSchema`, and adding those only exist in `catalystSchema`.
    +   */
    +  def clipParquetSchema(parquetSchema: MessageType, catalystSchema: 
StructType): MessageType = {
    +    val clippedParquetFields = 
clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
    +    Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
    +  }
    +
    +  private def clipParquetType(parquetType: Type, catalystType: DataType): 
Type = {
    +    catalystType match {
    +      case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
    +        // Only clips array types with nested type as element type.
    +        clipParquetListType(parquetType.asGroupType(), t.elementType)
    +
    +      case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
    +        // Only clips map types with nested type as value type.
    +        clipParquetMapType(parquetType.asGroupType(), t.keyType, 
t.valueType)
    +
    +      case t: StructType =>
    +        clipParquetGroup(parquetType.asGroupType(), t)
    +
    +      case _ =>
    +        parquetType
    +    }
    +  }
    +
    +  /**
    +   * Whether a Catalyst [[DataType]] is primitive.  Primitive [[DataType]] 
is not equivalent to
    +   * [[AtomicType]].  For example, [[CalendarIntervalType]] is primitive, 
but it's not an
    +   * [[AtomicType]].
    +   */
    +  private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
    +    dataType match {
    +      case _: ArrayType | _: MapType | _: StructType => false
    +      case _ => true
    +    }
    +  }
    +
    +  /**
    +   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[ArrayType]].  The element type
    +   * of the [[ArrayType]] should also be a nested type, namely an 
[[ArrayType]], a [[MapType]], or a
    +   * [[StructType]].
    +   */
    +  private def clipParquetListType(parquetList: GroupType, elementType: 
DataType): Type = {
    +    // Precondition of this method, should only be called for lists with 
nested element types.
    +    assert(!isPrimitiveCatalystType(elementType))
    +
    +    // Unannotated repeated group should be interpreted as required list 
of required element, so
    +    // list element type is just the group itself.  Clip it.
    +    if (parquetList.getOriginalType == null && 
parquetList.isRepetition(Repetition.REPEATED)) {
    +      clipParquetType(parquetList, elementType)
    +    } else {
    +      assert(
    +        parquetList.getOriginalType == OriginalType.LIST,
    +        "Invalid Parquet schema. " +
    +          "Original type of annotated Parquet lists must be LIST: " +
    +          parquetList.toString)
    +
    +      assert(
    +        parquetList.getFieldCount == 1 && 
parquetList.getType(0).isRepetition(Repetition.REPEATED),
    +        "Invalid Parquet schema. " +
    +          "LIST-annotated group should only have exactly one repeated 
field: " +
    +          parquetList)
    +
    +      // Precondition of this method, should only be called for lists with 
nested element types.
    +      assert(!parquetList.getType(0).isPrimitive)
    +
    +      val repeatedGroup = parquetList.getType(0).asGroupType()
    +
    +      // If the repeated field is a group with multiple fields, or the 
repeated field is a group
    +      // with one field and is named either "array" or uses the 
LIST-annotated group's name with
    +      // "_tuple" appended then the repeated type is the element type and 
elements are required.
    +      // Build a new LIST-annotated group with clipped `repeatedGroup` as 
element type and the
    +      // only field.
    +      if (
    +        repeatedGroup.getFieldCount > 1 ||
    +        repeatedGroup.getName == "array" ||
    +        repeatedGroup.getName == parquetList.getName + "_tuple"
    +      ) {
    +        Types
    +          .buildGroup(parquetList.getRepetition)
    +          .as(OriginalType.LIST)
    +          .addField(clipParquetType(repeatedGroup, elementType))
    +          .named(parquetList.getName)
    +      } else {
    +        // Otherwise, the repeated field's type is the element type with 
the repeated field's
    +        // repetition.
    +        Types
    +          .buildGroup(parquetList.getRepetition)
    +          .as(OriginalType.LIST)
    +          .addField(
    +            Types
    +              .repeatedGroup()
    +              .addField(clipParquetType(repeatedGroup.getType(0), 
elementType))
    +              .named(repeatedGroup.getName))
    +          .named(parquetList.getName)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[MapType]].  The value type
    +   * of the [[MapType]] should also be a nested type, namely an 
[[ArrayType]], a [[MapType]], or a
    +   * [[StructType]].  Note that key type of any [[MapType]] is always a 
primitive type.
    +   */
    +  private def clipParquetMapType(
    +      parquetMap: GroupType, keyType: DataType, valueType: DataType): 
GroupType = {
    +    // Precondition of this method, should only be called for maps with 
nested value types.
    +    assert(!isPrimitiveCatalystType(valueType))
    +
    +    val repeatedGroup = parquetMap.getType(0).asGroupType()
    +    val parquetKeyType = repeatedGroup.getType(0)
    +    val parquetValueType = repeatedGroup.getType(1)
    +
    +    val clippedRepeatedGroup =
    +      Types
    +        .repeatedGroup()
    +        .as(repeatedGroup.getOriginalType)
    +        .addField(parquetKeyType)
    --- End diff --
    
    Actually, although complex map keys are not allowed while using HiveQL in 
Spark SQL, they are allowed otherwise, and we can read/write them from/to 
Parquet successfully. So we do need to handle complex map key here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to