Hi Enrico, that's (for now) the right approach. I agree, that the KafkaTableSource should implement both DefinedXTimeAttribute interfaces.
Best, Fabian 2017-05-25 3:20 GMT+02:00 enrico canzonieri <ecanzoni...@gmail.com>: > I solved this implementing a new Kafka09TableSource in my application. The > class I implemented extends both DefinedRowTimeAttribute and > DefinedProcTimeAttribute and it exposes the consumer so that I can assign > the timestamp extractor. > > I'm not sure if this is the right approach, but if that's the case I > wonder if we could make those changes into KafkaTableSource to make it more > generic. > > On Wed, May 24, 2017 at 12:23 PM, enrico canzonieri <ecanzoni...@gmail.com > > wrote: > >> Hi Timo, thanks for your help! >> >> I tried to follow the examples in the tests but I still have the same >> issue. >> I changed my schema and added an additional field "rowtime". My schema >> now is: >> root >> |-- rowtime: org.apache.flink.table.expressions.RowtimeAttribute(expr: >> GenericType<org.apache.flink.table.expressions.Expression>) >> |-- time: Long >> |-- host: String >> >> If I run the code: >> table.select('rowtime).toDataStream[Row].print() >> I get: >> RowtimeAttribute(1495580133000) >> RowtimeAttribute(1495580143000) >> RowtimeAttribute(1495580153000) >> >> But If I run: >> table.window(Tumble over 1.minutes on 'rowtime as 'w).groupBy('host, >> 'w).select('host) >> I still get the previous error: >> TumblingGroupWindow('w, 'rowtime, 60000.millis) is invalid: Tumbling >> window expects a time attribute for grouping in a stream environment. >> >> I'm using a Kafka09TableSource as data source, but it doesn't allow me to >> specify the timestamp assigner. I think the actual consumer is not exposed >> to the user so I cannot really call assignTimestampsAndWatermarks. May >> that be the problem? Should we expose that function so that we can assign >> timestamp and watermark to a TableSource? >> >> The time characteristic in the execution environment is set to EventTime >> in my code. >> >> Cheers, >> Enrico >> >> On Wed, May 24, 2017 at 2:08 AM, Timo Walther <twal...@apache.org> wrote: >> >>> Hi Enrico, >>> >>> the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they >>> will be updated in the next days/1-2 weeks. >>> >>> We recently introduced so-called "time indicators". These are attributes >>> that correspond to Flink's time and watermarks. You declare a logical field >>> that represents Flink's internal time in a table program. >>> >>> In your example you need to append a "time.rowtime" or "time.proctime" >>> to your table schema definition. >>> >>> You can find some examples here: >>> https://github.com/apache/flink/blob/master/flink-libraries/ >>> flink-table/src/test/scala/org/apache/flink/table/runtime/ >>> datastream/TimeAttributesITCase.scala >>> >>> If you have further question, feel free to ask them. It helps us to >>> improve the documenation. >>> >>> Regards, >>> Timo >>> >>> >>> >>> Am 24.05.17 um 04:15 schrieb enrico canzonieri: >>> >>> Hi, >>> I'm trying to window and groupBy a stream using the table api, but I get >>> ValidationException in the windowing function. >>> Here is the relevant code: >>> >>> tableEnv.registerTableSource(schema.getName, src) >>> val table = tableEnv.scan(schema.getName) >>> val t = table.window(Tumble over 1.minutes on 'time as >>> 'w).groupBy('host, 'w).select('host) >>> >>> "time" is defined as Long in my schema. The error I get is: >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>> TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window >>> expects a time attribute for grouping in a stream environment. >>> >>> I also tried to define a window that was using processing time, but what >>> described in the documentation "Tumble over 1.minutes as 'w" doesn't >>> seem to work anymore. Specifically it seems that a window now always >>> expects the "on" call. >>> >>> Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT. >>> >>> thanks >>> >>> >>> >> >