Hi Leonard,

Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.

Best,
Dongwon

On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjt...@gmail.com> wrote:

> Hi, Kim
>
> The reason your attempts (2) and (3) failed is that the json format does
> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
> field and then use a computed column to extract TIMESTAMP field, you can
> also define the time attribute on TIMESTAMP filed for using time-based
> operations in Flink 1.10.1. But the computed column only support in pure
> DDL, the Table API lacks the support and should be aligned in 1.12 as I
> know.
> The DDL syntax  as following:
>
> create table test (
>   `type` STRING,
>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>    timestampCol as
> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd
> HH:mm:ss')), —computed column
>    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
> )   with (
>   'connector' = '...',
>   'format' = 'json',
>   ...
> );
>
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
>
>
> 在 2020年7月4日,21:21,Dongwon Kim <eastcirc...@gmail.com> 写道:
>
> Hi,
> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The
> message looks like below.
>
>>     {
>>        "type":"Update",
>>        "location":{
>>           "id":"123e4567-e89b-12d3-a456-426652340000",
>>           "lastUpdateTime":1593866161436
>>        }
>>     }
>
>
> I wrote the following program just to see whether json messages are
> correctly parsed by Table API:
>
>>     StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>     EnvironmentSettings envSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> envSettings);
>>     tEnv
>>       .connect(
>>         new Kafka()
>>           .version("universal")
>>           .topic(consumerTopic)
>>           .startFromLatest()
>>           .properties(consumerProperties)
>>       )
>>       .withFormat(new Json())
>>       .withSchema(new Schema().schema(
>>         TableSchema.builder()
>>           .field("type", STRING())
>>           .field("location",
>>             ROW(
>>               FIELD("id", STRING()),
>>               // (1)
>>               FIELD("lastUpdateTime", BIGINT())
>>               // (2)
>>               FIELD("lastUpdateTime", TIMESTAMP())
>>               // (3)
>>               FIELD("lastUpdateTime",
>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>>             ))
>>           .build()
>>       ))
>>       .createTemporaryTable("message");
>>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>       .print();
>
>
> Note that I tried BIGINT(), TIMESTAMP(), and
> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
> (1) it works fine but later I can't use time-based operations like
> windowing.
>
> (2) it causes the following exception
>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING,
>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match
>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of
>> the 'location' field of the TableSource return type.
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>> at
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>> at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>> at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
>> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id`
>> STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not
>> match with the physical type ROW<`id` STRING, `lastUpdateTime`
>> TIMESTAMP(3)> of the 'location' field of the TableSource return type.
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
>> ... 38 more
>
>
> (3) it causes the following exception
>
>> Caused by: java.time.format.DateTimeParseException: Text '1593868714814'
>> could not be parsed at index 0
>> at
>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> ... 7 more
>
>
> Can I read such json messages with time information in Flink 1.10.1?
>
> Thanks
>
> Dongwon
>
>
>

Reply via email to