Hi, We were trying to collect the sideOutput. But failed to understand as to how to convert this windowed stream to a datastream.
final OutputTag<Tuple6<String, String, String, String, String, Timestamp>> lateOutputTag = new OutputTag<Tuple6<String, String, String, String, String, Timestamp>>("late-data"){}; withTime.keyBy(0, 2) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .allowedLateness(Time.minutes(1)) .sideOutputLateData(lateOutputTag); I would now have a windowed stream with records coming in late, tagged as lateOutputTag. How to convert the packets which are not late , back to a datastream. Do we need to use the .apply function to collect this data ... quite unsure of this. Appreciate your help. Best Regards, On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Ramya, > > This would be a great feature, but unfortunately is not support (yet) by > Flink SQL. > Currently, all late records are dropped. > > A workaround is to ingest the stream as a DataStream, have a custom > operator that routes all late records to a side output, and registering the > DataStream without late records as a table on which the SQL query is > evaluated. > This requires quite a bit of boilerplate code but could be hidden in a util > class. > > Best, Fabian > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy < > hair...@gmail.com>: > > > Hi, > > > > I have a query with regard to Late arriving records. > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. > > In my sink operators, which converts this table to a stream which is > being > > pushed to Elastic Search, I am able to see this metric " > > *numLateRecordsDropped*". > > > > My Kafka consumers doesn't seem to have any lag and the events are > > processed properly. To be able to take these events to a side outputs > > doesn't seem to be possible with tables. Below is the snippet: > > > > tableEnv.connect(new Kafka() > > /* setting of all kafka properties */ > > .startFromLatest()) > > .withSchema(new Schema() > > .field("sid", Types.STRING()) > > .field("_zpsbd6", Types.STRING()) > > .field("r1", Types.STRING()) > > .field("r2", Types.STRING()) > > .field("r5", Types.STRING()) > > .field("r10", Types.STRING()) > > .field("isBot", Types.BOOLEAN()) > > .field("botcode", Types.STRING()) > > .field("ts", Types.SQL_TIMESTAMP()) > > .rowtime(new Rowtime() > > .timestampsFromField("recvdTime") > > .watermarksPeriodicBounded(10000) > > ) > > ) > > .withFormat(new Json().deriveSchema()) > > .inAppendMode() > > .registerTableSource("sourceTopic"); > > > > String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, " > > + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, " > > + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM > > sourceTopic " > > + "WHERE r1='true' or r2='true' or r5='true' or r10='true' > > and isBot='true' " > > + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, > _zpsbd6"; > > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is > > showing the lateRecordsDropped, while executing the group by operation. > > > > Is there a way to get the sideOutput of this to be able to debug better > ?? > > > > Thanks, > > ~Ramya. > > >