[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772815#comment-15772815
 ] 

ASF GitHub Bot commented on FLINK-5280:
---------------------------------------

Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3039#discussion_r93761548
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
    @@ -535,4 +509,74 @@ object TableEnvironment {
     
         new ScalaStreamTableEnv(executionEnvironment, tableConfig)
       }
    +
    +  /**
    +    * Returns field names and field positions for a given 
[[TypeInformation]].
    +    *
    +    * Field names are automatically extracted for
    +    * [[org.apache.flink.api.common.typeutils.CompositeType]].
    +    * The method fails if inputType is not a
    +    * [[org.apache.flink.api.common.typeutils.CompositeType]].
    +    *
    +    * @param inputType The TypeInformation extract the field names and 
positions from.
    +    * @tparam A The type of the TypeInformation.
    +    * @return A tuple of two arrays holding the field names and 
corresponding field positions.
    +    */
    +  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
    +    validateType(inputType)
    +
    +    val fieldNames: Array[String] = inputType match {
    +      case t: TupleTypeInfo[A] => t.getFieldNames
    +      case c: CaseClassTypeInfo[A] => c.getFieldNames
    +      case p: PojoTypeInfo[A] => p.getFieldNames
    +      case r: RowTypeInfo => r.getFieldNames
    +      case tpe =>
    +        throw new TableException(s"Type $tpe lacks explicit field naming")
    +    }
    +    val fieldIndexes = fieldNames.indices.toArray
    +
    +    if (fieldNames.contains("*")) {
    +      throw new TableException("Field name can not be '*'.")
    +    }
    +
    +    (fieldNames, fieldIndexes)
    +  }
    +
    +  def validateType(typeInfo: TypeInformation[_]): Unit = {
    +    val clazz = typeInfo.getTypeClass
    +    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
    +      !Modifier.isPublic(clazz.getModifiers) ||
    +      clazz.getCanonicalName == null) {
    +      throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
    +        s"static and globally accessible.")
    +    }
    +  }
    +
    +  /**
    +    * Returns field types for a given [[TypeInformation]].
    +    *
    +    * Field types are automatically extracted for
    +    * [[org.apache.flink.api.common.typeutils.CompositeType]].
    +    * The method fails if inputType is not a
    +    * [[org.apache.flink.api.common.typeutils.CompositeType]].
    --- End diff --
    
    Ok, this makes sense.


> Extend TableSource to support nested data
> -----------------------------------------
>
>                 Key: FLINK-5280
>                 URL: https://issues.apache.org/jira/browse/FLINK-5280
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to