Hi Jungtaek, If it is "only" about the missing support to parse a string as timestamp, you could also implement a custom TimestampExtractor that works similar to the ExistingField extractor [1]. You would need to adjust a few things and use the expression "Cast(Cast('tsString, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)" to convert the String to a Long. So far this works only if the date is formatted like "2018-05-28 12:34:56.000"
Regarding the side outputs, these would not be handled as results but just redirect late records into separate data streams. We would offer a configuration to write them to a sink like HDFS or Kafka. Best, Fabian [1] https://github.com/apache/flink/blob/master/flink- libraries/flink-table/src/main/scala/org/apache/flink/ table/sources/tsextractors/ExistingField.scala 2018-07-04 11:54 GMT+02:00 Jungtaek Lim <kabh...@gmail.com>: > Thanks Chesnay! Great news to hear. I'll try out with latest master branch. > > Thanks Fabian for providing the docs! > > I guess I already tried out with KafkaJsonTableSource and failed back to > custom TableSource since the type of rowtime field is string unfortunately, > and I needed to parse and map to new SQL timestamp field in order to use it > to rowtime attribute. > > I guess JSON -> table fields mapping is provided only for renaming, and > "withRowtimeAttribute" doesn't help defining new field to use it as rowtime. > > Are there better approaches on this scenario? Or would we be better to > assume the type of rowtime field is always timestamp? > > Btw, providing late-data side output in Table API might be just a matter > of how to define it correctly (not a technical or syntactic issue), though > providing in SQL might be tricky (as the semantic of SQL query is not for > multiple outputs). > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 2018년 7월 4일 (수) 오후 5:49, Fabian Hueske <fhue...@gmail.com>님이 작성: > >> Hi Jungtaek, >> >> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl. >> timestamp & watemark generation [2]. >> It would be great if you could let us know, if that addresses your use >> case and if not what's missing or not working. >> >> So far Table API / SQL does not have support for late-data side outputs. >> However, that's on the road map. The idea is to filter streams during >> ingestion for late events and passing them to a side output. >> Currently, operators just drop late events. >> >> Best, Fabian >> >> [1] https://ci.apache.org/projects/flink/flink-docs- >> release-1.5/dev/table/sourceSinks.html#kafkajsontablesource >> [2] https://ci.apache.org/projects/flink/flink-docs- >> release-1.5/dev/table/sourceSinks.html#configuring-a-rowtime-attribute >> >> 2018-07-04 10:39 GMT+02:00 Chesnay Schepler <ches...@apache.org>: >> >>> The watermark display in the UI is bugged in 1.5.0. >>> >>> It is fixed on master and the release-1.5 branch, and will be included >>> in 1.5.1 that is slated to be released next week. >>> >>> >>> On 04.07.2018 10:22, Jungtaek Lim wrote: >>> >>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app in >>> IntelliJ, not tried from cluster. >>> >>> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <kabh...@gmail.com>님이 작성: >>> >>>> Hi Flink users, >>>> >>>> I'm new to Flink and trying to evaluate couple of streaming frameworks >>>> via implementing same apps. >>>> >>>> While implementing apps with both Table API and SQL, I found there's >>>> 'no watermark' presented in Flink UI, whereas I had been struggling to >>>> apply row time attribute. >>>> >>>> For example, below is one of TableSource implementation which wraps >>>> DataStream reading from Kafka. >>>> >>>> https://github.com/HeartSaVioR/iot-trucking-app- >>>> flink/blob/master/src/main/scala/net/heartsavior/flink/ >>>> datasource/TruckSpeedSource.scala >>>> >>>> (Actually I ended up implementing TableSource to address adding rowtime >>>> attribute as well as reading and parsing JSON. I'd be really happy if >>>> someone can guide a way to get rid of needed of custom implementation of >>>> TableSource.) >>>> >>>> and below is one of app I implemented: >>>> >>>> https://github.com/HeartSaVioR/iot-trucking-app- >>>> flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/ >>>> IotTruckingAppMovingAggregationsOnSpeedSql.scala >>>> >>>> Btw, I'm about to experiment side-output with late events, but is it >>>> possible to leverage side-output with Table API / SQL? Looks like >>>> DataStream exposes late events only when it's converted to >>>> AllWindowedStream. >>>> >>>> Thanks in advance! >>>> >>>> Best Regards, >>>> Jungtaek Lim (HeartSaVioR) >>>> >>> >>> >>