Hi Niels,
if you are coming from DataStream API, all you need to do is to write a
timestamp extractor.
When you call:
tableEnv.registerDataStream("TestStream", letterStream,
"EventTime.rowtime, letter, counter");
The ".rowtime" means that the framework will extract the rowtime from
the stream record timestamp. You don't need to name all fields again but
could simply construct a string from
letterStream.getTypeInfo().getFieldNames(). I hope we can improve this
further in the future as part of FLIP-37.
Regards,
Timo
Am 14.08.19 um 17:00 schrieb Niels Basjes:
Hi,
Experimenting with the StreamTableEnvironment I build something like this:
DataStream<Tuple3<Long, String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream", letterStream,
"EventTime.rowtime, letter, counter");
Because the "EventTime" was tagged with ".rowtime" it is now being
used as the rowtime and has the DATETIME so I can do this
TUMBLE_START(eventTime, INTERVAL '1' MINUTE)
So far so good.
Working towards a more realistic scenario I have a source that
produces a stream of records that have been defined using Apache Avro.
So I have a Measurement.avdl that (among other things) contains
something like this:
record Measurement {
/** The time (epoch in milliseconds since 1970-01-01 UTC) when
the event occurred */
long timestamp;
string letter;
long pageviews;
}
Now because the registerDataStream call can also derive the schema
from the provided data I can do this:
DataStream<Measurement> inputStream = ...
tableEnv.registerDataStream("DataStream", inputStream);
This is very nice because any real schema is big (few hundred columns)
and changes over time.
Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a
consequence I get this error
Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>,
<INTERVAL MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>,
<DATETIME_INTERVAL>)'
So far I have now yet figured how to make the system understand that
the timestamp column show be treated as the rowtime.
How do I do that?
--
Best regards / Met vriendelijke groeten,
Niels Basjes