[ 
https://issues.apache.org/jira/browse/FLINK-15801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-15801:
----------------------------
    Fix Version/s:     (was: 1.11.2)
                   1.11.3

> Timestamp extractor created from properties does not work for some physical 
> fields
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-15801
>                 URL: https://issues.apache.org/jira/browse/FLINK-15801
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.10.3, 1.11.3
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> If a timestamp extractor is created from properties it can not use a physical 
> field if the name of that field is equal to the logical field of the rowtime 
> field.
> The code below fails:
> {code}
>               StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               
> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>               StreamTableEnvironment fsTableEnv = 
> StreamTableEnvironment.create(streamExecutionEnvironment);
>               String allEventsTable = "allEventsTable";
>               fsTableEnv.connect(new Kafka()
>                       .version("universal")
>                       .topic("events")
>                       .property("zookeeper.connect", "")
>                       .property("bootstrap.servers", "localhost:9092")
>                       .property("group.id", "dummyquery").startFromLatest())
>                       .withSchema(new Schema()
>                               .field("rule_id", Types.INT)
>                               .field("sourceAddress", Types.STRING)
>                               .field("deviceProduct", Types.STRING)
>                               .field("destHost", Types.STRING)
>                               .field("extra", Types.STRING)
>                               .field("rowtime", Types.SQL_TIMESTAMP)
>                               .rowtime(new 
> Rowtime().timestampsFromField("rowtime").watermarksPeriodicBounded(2000))
>                       )
>                       .withFormat(new 
> Json().failOnMissingField(false).deriveSchema())
>                       .inAppendMode()
>                       .registerTableSource(allEventsTable);
>               Table result = fsTableEnv.sqlQuery("select * from 
> allEventsTable where sourceAddress='12345431'");
>               DataStream alert = fsTableEnv.toAppendStream(result, Row.class);
>               alert.print();
> {code}
> with exception:
> {code}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Field 'rowtime' could not be resolved by the field mapping.
>     at 
> org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
>     at 
> org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
>     at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
>     at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>     at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> {code}
> The problem is that the field is removed from the {{FieldMapping}} in 
> {{org.apache.flink.table.descriptors.SchemaValidator#deriveFieldMapping}}. 
> One possible solution could be to add: 
> {code}
>                               if (isRowtime) {
>                                       Optional<String> timestampSource = 
> properties.getOptionalString(SCHEMA + "." + i + "." + 
> ROWTIME_TIMESTAMPS_FROM);
>                                       timestampSource.ifPresent(s -> 
> mapping.put(s, s));
>                               }
> {code}
> We should also consider the case what happens if we compute generated columns 
> on fields that were pruned in a similar way.
> Reported by a user: 
> https://stackoverflow.com/questions/59857057/how-to-define-an-apache-flink-table-with-row-time-attribute



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to