Fwd: Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Danny Chan

Best,
Danny Chan
-- 转发信息 --
发件人: Danny Chan 
日期: 2020年7月20日 +0800 PM4:51
收件人: Dongwon Kim 
主题: Re: [Table API] how to configure a nested timestamp field

> Or is it possible you pre-define a catalog there and register through the SQL 
> CLI yaml ?
>
> Best,
> Danny Chan
> 在 2020年7月20日 +0800 PM3:23,Dongwon Kim ,写道:
> > Hi Leonard,
> >
> > > Unfortunately the answer is no, the YAML you defined will parse by Table 
> > > API and then execute, the root cause of your post error is Table API does 
> > > not support computed column now,
> > > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, 
> > > I think DDL is recommended way since FLINK 1.11.0.
> > Okay, thanks a lot for your input.
> >
> > I just tried out Flink SQL client and wanted to store pre-defined YAML 
> > files each declaring a source table from a Kafka topic.
> > As you advised, I have to manually enter DDL in the SQL client on FLINK 
> > 1.11.x
> >
> > Best,
> >
> > Dongwon
> >
> >
> > > On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu  wrote:
> > > > Hi, Kim
> > > >
> > > > > Hi Leonard,
> > > > >
> > > > > Can I have a YAML definition corresponding to the DDL you suggested?
> > > >
> > > > Unfortunately the answer is no, the YAML you defined will parse by 
> > > > Table API and then execute, the root cause of your post error is Table 
> > > > API does not support computed column now,
> > > >
> > > > there is a FLIP under discussion[1], this should be ready in 1.12.0. 
> > > > BTW, I think DDL is recommended way since FLINK 1.11.0.
> > > >
> > > > Best,
> > > > Leonard Xu
> > > > [1] 
> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
> > > >
> > > > > 在 2020年7月20日,14:30,Dongwon Kim  写道:
> > > > >
> > > > >
> > > > > I tried below (Flink 1.11.0) but got some error:
> > > > > > tables:
> > > > > >   - name: test
> > > > > >     type: source-table
> > > > > >     update-mode: append
> > > > > >     connector:
> > > > > >       property-version: 1
> > > > > >       type: kafka
> > > > > >       version: universal
> > > > > >       topic: ...
> > > > > >       properties:
> > > > > >         bootstrap.servers: ...
> > > > > >         group.id: ...
> > > > > >     format:
> > > > > >       property-version: 1
> > > > > >       type: json
> > > > > >     schema:
> > > > > >       - name: type
> > > > > >         data-type: STRING
> > > > > >       - name: location
> > > > > >         data-type: >
> > > > > >           ROW<
> > > > > >             id STRING,
> > > > > >             lastUpdateTime BIGINT
> > > > > >           >
> > > > > >       - name: timestampCol
> > > > > >         data-type: TIMESTAMP(3)
> > > > > >         rowtime:
> > > > > >           timestamps:
> > > > > >             type: from-field
> > > > > >             from: 
> > > > > > TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 
> > > > > > '-MM-dd HH:mm:ss'))
> > > > > >           watermarks:
> > > > > >             type: periodic-bounded
> > > > > >             delay: 5000
> > > > >
> > > > > SQL client doesn't complain about the file but, when I execute 
> > > > > "SELECT timestampCol from test", the job fails with the following 
> > > > > error message:
> > > > > > Caused by: java.lang.NullPointerException
> > > > > > at 
> > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> > > > > > at 
> > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> > > > > > at 
> > > >

Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Dongwon Kim
Hi Leonard,

Unfortunately the answer is no, the YAML you defined will parse by Table
> API and then execute, the root cause of your post error is Table API does
> not support computed column now,
> there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW,
> I think DDL is recommended way since FLINK 1.11.0.

Okay, thanks a lot for your input.

I just tried out Flink SQL client and wanted to store pre-defined YAML
files each declaring a source table from a Kafka topic.
As you advised, I have to manually enter DDL in the SQL client on FLINK
1.11.x

Best,

Dongwon


On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu  wrote:

> Hi, Kim
>
> Hi Leonard,
>
> Can I have a YAML definition corresponding to the DDL you suggested?
>
>
> Unfortunately the answer is no, the YAML you defined will parse by Table
> API and then execute, the root cause of your post error is Table API does
> not support computed column now,
>
> there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW,
> I think DDL is recommended way since FLINK 1.11.0.
>
> Best,
> Leonard Xu
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
> 
>
> 在 2020年7月20日,14:30,Dongwon Kim  写道:
>
>
> I tried below (Flink 1.11.0) but got some error:
>
>> tables:
>>   - name: test
>> type: source-table
>> update-mode: append
>> connector:
>>   property-version: 1
>>   type: kafka
>>   version: universal
>>   topic: ...
>>   properties:
>> bootstrap.servers: ...
>> group.id: ...
>> format:
>>   property-version: 1
>>   type: json
>> schema:
>>   - name: type
>> data-type: STRING
>>   - name: location
>> data-type: >
>>   ROW<
>> id STRING,
>> lastUpdateTime BIGINT
>>   >
>>   - name: timestampCol
>> data-type: TIMESTAMP(3)
>> rowtime:
>>   timestamps:
>> type: from-field
>> from:
>> TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, '-MM-dd
>> HH:mm:ss'))
>>   watermarks:
>> type: periodic-bounded
>> delay: 5000
>>
>
> SQL client doesn't complain about the file but, when I execute "SELECT
> timestampCol from test", the job fails with the following error message:
>
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
>> at
>> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at SourceConversion$4.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> 

Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Leonard Xu
Hi, Kim

> Hi Leonard,
> 
> Can I have a YAML definition corresponding to the DDL you suggested?

Unfortunately the answer is no, the YAML you defined will parse by Table API 
and then execute, the root cause of your post error is Table API does not 
support computed column now, 

there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I 
think DDL is recommended way since FLINK 1.11.0.

Best,
Leonard Xu
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
 


> 在 2020年7月20日,14:30,Dongwon Kim  写道:
> 
> 
> I tried below (Flink 1.11.0) but got some error:
> tables:
>   - name: test
> type: source-table
> update-mode: append
> connector:
>   property-version: 1
>   type: kafka
>   version: universal
>   topic: ...
>   properties:
> bootstrap.servers: ...
> group.id : ...
> format:
>   property-version: 1
>   type: json
> schema:
>   - name: type
> data-type: STRING
>   - name: location
> data-type: >
>   ROW<
> id STRING,
> lastUpdateTime BIGINT
>   >
>   - name: timestampCol
> data-type: TIMESTAMP(3)
> rowtime:
>   timestamps:
> type: from-field
> from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 
> '-MM-dd HH:mm:ss'))
>   watermarks:
> type: periodic-bounded
> delay: 5000
> 
> SQL client doesn't complain about the file but, when I execute "SELECT 
> timestampCol from test", the job fails with the following error message:
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> 
> On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim  > wrote:
> Hi Leonard,
> 
> Wow, that's great! It works like a charm.
> I've never considered this approach at all.
> Thanks a lot.
> 
> Best,
> Dongwon
> 
> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu  > wrote:
> Hi, Kim
> 
> The reason your attempts (2) and (3) failed is that the json format does not 
> support convert a BIGINT to TIMESTAMP, you can 

Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Dongwon Kim
Hi Leonard,

Can I have a YAML definition corresponding to the DDL you suggested?

I tried below (Flink 1.11.0) but got some error:

> tables:
>   - name: test
> type: source-table
> update-mode: append
> connector:
>   property-version: 1
>   type: kafka
>   version: universal
>   topic: ...
>   properties:
> bootstrap.servers: ...
> group.id: ...
> format:
>   property-version: 1
>   type: json
> schema:
>   - name: type
> data-type: STRING
>   - name: location
> data-type: >
>   ROW<
> id STRING,
> lastUpdateTime BIGINT
>   >
>   - name: timestampCol
> data-type: TIMESTAMP(3)
> rowtime:
>   timestamps:
> type: from-field
> from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000,
> '-MM-dd HH:mm:ss'))
>   watermarks:
> type: periodic-bounded
> delay: 5000
>

SQL client doesn't complain about the file but, when I execute "SELECT
timestampCol from test", the job fails with the following error message:

> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$4.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)


On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim  wrote:

> Hi Leonard,
>
> Wow, that's great! It works like a charm.
> I've never considered this approach at all.
> Thanks a lot.
>
> Best,
> Dongwon
>
> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu  wrote:
>
>> Hi, Kim
>>
>> The reason your attempts (2) and (3) failed is that the json format does
>> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
>> field and then use a computed column to extract TIMESTAMP field, you can
>> also define the time attribute on TIMESTAMP filed for using time-based
>> operations in Flink 1.10.1. But the computed column only support in pure
>> DDL, the Table API lacks the support and should be aligned in 1.12 as I
>> know.
>> The DDL syntax  as following:
>>
>> create table test (
>>   `type` STRING,
>>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>>timestampCol as
>> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, '-MM-dd
>> HH:mm:ss')), —computed column
>>WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
>> )   with (
>>   'connector' = '...',
>>   'format' = 'json',
>>   ...
>> );
>>

Re: [Table API] how to configure a nested timestamp field

2020-07-06 Thread Dongwon Kim
Hi Leonard,

Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.

Best,
Dongwon

On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu  wrote:

> Hi, Kim
>
> The reason your attempts (2) and (3) failed is that the json format does
> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
> field and then use a computed column to extract TIMESTAMP field, you can
> also define the time attribute on TIMESTAMP filed for using time-based
> operations in Flink 1.10.1. But the computed column only support in pure
> DDL, the Table API lacks the support and should be aligned in 1.12 as I
> know.
> The DDL syntax  as following:
>
> create table test (
>   `type` STRING,
>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>timestampCol as
> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, '-MM-dd
> HH:mm:ss')), —computed column
>WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
> )   with (
>   'connector' = '...',
>   'format' = 'json',
>   ...
> );
>
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
>
>
> 在 2020年7月4日,21:21,Dongwon Kim  写道:
>
> Hi,
> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The
> message looks like below.
>
>> {
>>"type":"Update",
>>"location":{
>>   "id":"123e4567-e89b-12d3-a456-42665234",
>>   "lastUpdateTime":1593866161436
>>}
>> }
>
>
> I wrote the following program just to see whether json messages are
> correctly parsed by Table API:
>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> EnvironmentSettings envSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> envSettings);
>> tEnv
>>   .connect(
>> new Kafka()
>>   .version("universal")
>>   .topic(consumerTopic)
>>   .startFromLatest()
>>   .properties(consumerProperties)
>>   )
>>   .withFormat(new Json())
>>   .withSchema(new Schema().schema(
>> TableSchema.builder()
>>   .field("type", STRING())
>>   .field("location",
>> ROW(
>>   FIELD("id", STRING()),
>>   // (1)
>>   FIELD("lastUpdateTime", BIGINT())
>>   // (2)
>>   FIELD("lastUpdateTime", TIMESTAMP())
>>   // (3)
>>   FIELD("lastUpdateTime",
>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>> ))
>>   .build()
>>   ))
>>   .createTemporaryTable("message");
>> tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>   .print();
>
>
> Note that I tried BIGINT(), TIMESTAMP(), and
> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
> (1) it works fine but later I can't use time-based operations like
> windowing.
>
> (2) it causes the following exception
>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING,
>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match
>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of
>> the 'location' field of the TableSource return type.
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>> at
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>> at
>> 

Re: [Table API] how to configure a nested timestamp field

2020-07-05 Thread Leonard Xu
Hi, Kim

The reason your attempts (2) and (3) failed is that the json format does not 
support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field 
and then use a computed column to extract TIMESTAMP field, you can also define 
the time attribute on TIMESTAMP filed for using time-based operations in Flink 
1.10.1. But the computed column only support in pure DDL, the Table API lacks 
the support and should be aligned in 1.12 as I know.
The DDL syntax  as following:

create table test (
  `type` STRING,
  `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
   timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 
'-MM-dd HH:mm:ss')), —computed column
   WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
)   with (
  'connector' = '...',
  'format' = 'json',
  ...
);


Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
 



> 在 2020年7月4日,21:21,Dongwon Kim  写道:
> 
> Hi,
> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The 
> message looks like below.
> {
>"type":"Update",
>"location":{
>   "id":"123e4567-e89b-12d3-a456-42665234",
>   "lastUpdateTime":1593866161436
>}
> }
> 
> I wrote the following program just to see whether json messages are correctly 
> parsed by Table API:
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> EnvironmentSettings envSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> tEnv
>   .connect(
> new Kafka()
>   .version("universal")
>   .topic(consumerTopic)
>   .startFromLatest()
>   .properties(consumerProperties)
>   )
>   .withFormat(new Json())
>   .withSchema(new Schema().schema(
> TableSchema.builder()
>   .field("type", STRING())
>   .field("location",
> ROW(
>   FIELD("id", STRING()),
>   // (1)
>   FIELD("lastUpdateTime", BIGINT())
>   // (2)
>   FIELD("lastUpdateTime", TIMESTAMP())
>   // (3)
>   FIELD("lastUpdateTime", 
> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
> ))
>   .build()
>   ))
>   .createTemporaryTable("message");
> tEnv.toAppendStream(tEnv.from("message"), Row.class)
>   .print();
> 
> Note that I tried BIGINT(), TIMESTAMP(), and 
> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
> (1) it works fine but later I can't use time-based operations like windowing.
> 
> (2) it causes the following exception
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 
> 'location' does not match with the physical type ROW<`id` STRING, 
> `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource 
> return type.
> at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
> at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at 
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
> at 
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
> at 
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at 
> 

[Table API] how to configure a nested timestamp field

2020-07-04 Thread Dongwon Kim
Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The
message looks like below.

> {
>"type":"Update",
>"location":{
>   "id":"123e4567-e89b-12d3-a456-42665234",
>   "lastUpdateTime":1593866161436
>}
> }


I wrote the following program just to see whether json messages are
correctly parsed by Table API:

> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> envSettings);
> tEnv
>   .connect(
> new Kafka()
>   .version("universal")
>   .topic(consumerTopic)
>   .startFromLatest()
>   .properties(consumerProperties)
>   )
>   .withFormat(new Json())
>   .withSchema(new Schema().schema(
> TableSchema.builder()
>   .field("type", STRING())
>   .field("location",
> ROW(
>   FIELD("id", STRING()),
>   // (1)
>   FIELD("lastUpdateTime", BIGINT())
>   // (2)
>   FIELD("lastUpdateTime", TIMESTAMP())
>   // (3)
>   FIELD("lastUpdateTime",
> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
> ))
>   .build()
>   ))
>   .createTemporaryTable("message");
> tEnv.toAppendStream(tEnv.from("message"), Row.class)
>   .print();


Note that I tried BIGINT(), TIMESTAMP(), and
TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like
windowing.

(2) it causes the following exception

> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field
> 'location' does not match with the physical type ROW<`id` STRING,
> `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource
> return type.
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
>