Hi, Kim

> Hi Leonard,
> 
> Can I have a YAML definition corresponding to the DDL you suggested?

Unfortunately the answer is no, the YAML you defined will parse by Table API 
and then execute, the root cause of your post error is Table API does not 
support computed column now, 

there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I 
think DDL is recommended way since FLINK 1.11.0.

Best,
Leonard Xu
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
 
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-129:+Refactor+Descriptor+API+to+register+connectors+in+Table+API>

> 在 2020年7月20日,14:30,Dongwon Kim <eastcirc...@gmail.com> 写道:
> 
> 
> I tried below (Flink 1.11.0) but got some error:
> tables:
>   - name: test
>     type: source-table
>     update-mode: append
>     connector:
>       property-version: 1
>       type: kafka
>       version: universal
>       topic: ...
>       properties:
>         bootstrap.servers: ...
>         group.id <http://group.id/>: ...
>     format:
>       property-version: 1
>       type: json
>     schema:
>       - name: type
>         data-type: STRING
>       - name: location
>         data-type: >
>           ROW<
>             id STRING,
>             lastUpdateTime BIGINT
>           >
>       - name: timestampCol
>         data-type: TIMESTAMP(3)
>         rowtime:
>           timestamps:
>             type: from-field
>             from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 
> 'yyyy-MM-dd HH:mm:ss'))
>           watermarks:
>             type: periodic-bounded
>             delay: 5000
> 
> SQL client doesn't complain about the file but, when I execute "SELECT 
> timestampCol from test", the job fails with the following error message:
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> 
> On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <eastcirc...@gmail.com 
> <mailto:eastcirc...@gmail.com>> wrote:
> Hi Leonard,
> 
> Wow, that's great! It works like a charm.
> I've never considered this approach at all.
> Thanks a lot.
> 
> Best,
> Dongwon
> 
> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjt...@gmail.com 
> <mailto:xbjt...@gmail.com>> wrote:
> Hi, Kim
> 
> The reason your attempts (2) and (3) failed is that the json format does not 
> support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field 
> and then use a computed column to extract TIMESTAMP field, you can also 
> define the time attribute on TIMESTAMP filed for using time-based operations 
> in Flink 1.10.1. But the computed column only support in pure DDL, the Table 
> API lacks the support and should be aligned in 1.12 as I know.
> The DDL syntax  as following:
> 
> create table test (
>   `type` STRING,
>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>    timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 
> 'yyyy-MM-dd HH:mm:ss')), —computed column
>    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
> )   with (
>   'connector' = '...',
>   'format' = 'json',
>   ...
> );
> 
> 
> Best,
> Leonard Xu
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html>
> 
> 
>> 在 2020年7月4日,21:21,Dongwon Kim <eastcirc...@gmail.com 
>> <mailto:eastcirc...@gmail.com>> 写道:
>> 
>> Hi,
>> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The 
>> message looks like below.
>>     {
>>        "type":"Update",
>>        "location":{
>>           "id":"123e4567-e89b-12d3-a456-426652340000",
>>           "lastUpdateTime":1593866161436
>>        }
>>     }
>> 
>> I wrote the following program just to see whether json messages are 
>> correctly parsed by Table API:
>>     StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>     EnvironmentSettings envSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>> envSettings);
>>     tEnv
>>       .connect(
>>         new Kafka()
>>           .version("universal")
>>           .topic(consumerTopic)
>>           .startFromLatest()
>>           .properties(consumerProperties)
>>       )
>>       .withFormat(new Json())
>>       .withSchema(new Schema().schema(
>>         TableSchema.builder()
>>           .field("type", STRING())
>>           .field("location",
>>             ROW(
>>               FIELD("id", STRING()),
>>               // (1)
>>               FIELD("lastUpdateTime", BIGINT())
>>               // (2)
>>               FIELD("lastUpdateTime", TIMESTAMP())
>>               // (3)
>>               FIELD("lastUpdateTime", 
>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>>             ))
>>           .build()
>>       ))
>>       .createTemporaryTable("message");
>>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>       .print();
>> 
>> Note that I tried BIGINT(), TIMESTAMP(), and 
>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
>> (1) it works fine but later I can't use time-based operations like windowing.
>> 
>> (2) it causes the following exception
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 
>> 'location' does not match with the physical type ROW<`id` STRING, 
>> `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource 
>> return type.
>> at 
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at 
>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>> at 
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>> at 
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>> at 
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at 
>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>> at 
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>> at 
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>> at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at 
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>> at 
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>> at 
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
>> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` 
>> STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not 
>> match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> 
>> of the 'location' field of the TableSource return type.
>> at 
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at 
>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
>> ... 38 more
>> 
>> (3) it causes the following exception
>> Caused by: java.time.format.DateTimeParseException: Text '1593868714814' 
>> could not be parsed at index 0
>> at 
>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> ... 7 more
>> 
>> Can I read such json messages with time information in Flink 1.10.1?
>> 
>> Thanks
>> 
>> Dongwon
> 

Reply via email to