[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380880#comment-16380880
 ] 

ASF GitHub Bot commented on FLINK-8538:
---------------------------------------

Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r171354820
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
    @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
     object SchemaValidator {
     
       val SCHEMA = "schema"
    -  val SCHEMA_VERSION = "schema.version"
    +  val SCHEMA_NAME = "name"
    +  val SCHEMA_TYPE = "type"
    +  val SCHEMA_PROCTIME = "proctime"
    +  val SCHEMA_FROM = "from"
    +
    +  // utilities
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    */
    +  def deriveProctimeAttribute(properties: DescriptorProperties): 
Optional[String] = {
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val isProctime = toScala(
    +        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
    +      isProctime.foreach { isSet =>
    +        if (isSet) {
    +          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
    +        }
    +      }
    +    }
    +    toJava(None)
    +  }
    +
    +  /**
    +    * Finds the rowtime attributes if defined.
    +    */
    +  def deriveRowtimeAttributes(properties: DescriptorProperties)
    +    : util.List[RowtimeAttributeDescriptor] = {
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
    +
    +    // check for rowtime in every field
    +    for (i <- 0 until names.size) {
    +      RowtimeValidator
    +        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
    +        .foreach { case (extractor, strategy) =>
    +          // create descriptor
    +          attributes += new RowtimeAttributeDescriptor(
    +            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
    +            extractor,
    +            strategy)
    +        }
    +    }
    +
    +    attributes.asJava
    +  }
    +
    +  /**
    +    * Finds a table source field mapping.
    +    */
    +  def deriveFieldMapping(
    +      properties: DescriptorProperties,
    +      sourceSchema: Optional[TableSchema])
    +    : util.Map[String, String] = {
    +
    +    val mapping = mutable.Map[String, String]()
    +
    +    val schema = properties.getTableSchema(SCHEMA)
    +
    +    // add all schema fields first for implicit mappings
    +    schema.getColumnNames.foreach { name =>
    +      mapping.put(name, name)
    +    }
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
    +      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) 
match {
     
    -  // per column properties
    +        // add explicit mapping
    +        case Some(source) =>
    +          mapping.put(name, source)
     
    -  val NAME = "name"
    -  val TYPE = "type"
    -  val PROCTIME = "proctime"
    -  val PROCTIME_VALUE_TRUE = "true"
    -  val FROM = "from"
    +        // implicit mapping or time
    +        case None =>
    +          val isProctime = properties
    +            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
    +            .orElse(false)
    +          val isRowtime = properties
    +            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
    +          // remove proctime/rowtime from mapping
    +          if (isProctime || isRowtime) {
    +            mapping.remove(name)
    +          }
    +          // check for invalid fields
    +          else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
    +            throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
    +              s"from source. Please specify the source field from which it 
can be derived.")
    +          }
    +      }
    +    }
     
    +    mapping.toMap.asJava
    +  }
    +
    +  /**
    +    * Finds the fields that can be used for a format schema (without time 
attributes).
    +    */
    +  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
    --- End diff --
    
    Thanks for your explanation @twalthr. I totally agree that we should avoid 
letting the users define schemas multi-times. As the names and definitions are 
still confusing me, I'd share my understanding to see if it's correct. Let's 
take the KafkaJsonTableSource as an example. Briefly, the schema mapping can be 
illustrated with
    
    ```
    json-format-schema(physical, optional) <-- mapping --> 
result-schema(physical)
    result-schema(physical) + timestamp fields(logical) = table-schema(logical, 
required)
    ```
    
    The `JSON-format-schema` could be either defined with a JSON-schema 
string(FORMAT_JSON_SCHEMA) or a `TypeInformation`(FORMAT_SCHEMA). When the 
JSON-format-schema is not provided, we use the `deriveFormatFields()` method to 
generate it from the result-schema and add something like a "self-mapping". 
IMO, if we don't pass the `jsonSchema` to the builder, there's no need to 
define the mapping, right?
    
    The timestamp mechanism may even be complicated for Kafka since it supports 
`withKafkaTimestampAsRowtimeAttribute` (though according to FLINK-8500, this 
method seems to not work for now). I suppose we need a new 
`RowtimeDescriptor`for it? 


> Add a Kafka table source factory with JSON format support
> ---------------------------------------------------------
>
>                 Key: FLINK-8538
>                 URL: https://issues.apache.org/jira/browse/FLINK-8538
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Xingcan Cui
>            Priority: Major
>             Fix For: 1.5.0, 1.6.0
>
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to