Hi,

We were trying to collect the sideOutput.
But failed to understand as to how to convert this windowed stream to a
datastream.

final OutputTag<Tuple6<String, String, String, String, String, Timestamp>>
lateOutputTag = new OutputTag<Tuple6<String, String, String, String,
String, Timestamp>>("late-data"){};
withTime.keyBy(0, 2)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateOutputTag);

I would now have a windowed stream with records coming in late, tagged as
lateOutputTag. How to convert the packets which are not late , back to a
datastream. Do we need to use the .apply function to collect this data ...
quite unsure of this. Appreciate your help.

Best Regards,



On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Ramya,
>
> This would be a great feature, but unfortunately is not support (yet) by
> Flink SQL.
> Currently, all late records are dropped.
>
> A workaround is to ingest the stream as a DataStream, have a custom
> operator that routes all late records to a side output, and registering the
> DataStream without late records as a table on which the SQL query is
> evaluated.
> This requires quite a bit of boilerplate code but could be hidden in a util
> class.
>
> Best, Fabian
>
> Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
> hair...@gmail.com>:
>
> > Hi,
> >
> > I have a query with regard to Late arriving records.
> > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> > In my sink operators, which converts this table to a stream which is
> being
> > pushed to Elastic Search, I am able to see this metric "
> > *numLateRecordsDropped*".
> >
> > My Kafka consumers doesn't seem to have any lag and the events are
> > processed properly. To be able to take these events to a side outputs
> > doesn't seem to be possible with tables. Below is the snippet:
> >
> >         tableEnv.connect(new Kafka()
> >           /* setting of all kafka properties */
> >                .startFromLatest())
> >                .withSchema(new Schema()
> >                        .field("sid", Types.STRING())
> >                        .field("_zpsbd6", Types.STRING())
> >                        .field("r1", Types.STRING())
> >                        .field("r2", Types.STRING())
> >                        .field("r5", Types.STRING())
> >                        .field("r10", Types.STRING())
> >                        .field("isBot", Types.BOOLEAN())
> >                        .field("botcode", Types.STRING())
> >                        .field("ts", Types.SQL_TIMESTAMP())
> >                        .rowtime(new Rowtime()
> >                                .timestampsFromField("recvdTime")
> >                                .watermarksPeriodicBounded(10000)
> >                        )
> >                )
> >                .withFormat(new Json().deriveSchema())
> >                .inAppendMode()
> >                .registerTableSource("sourceTopic");
> >
> >        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, "
> >                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, "
> >                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM
> > sourceTopic "
> >                + "WHERE r1='true' or r2='true' or r5='true' or r10='true'
> > and isBot='true' "
> >                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,
> _zpsbd6";
> >
> > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is
> > showing the lateRecordsDropped, while executing the group by operation.
> >
> > Is there  a way to get the sideOutput of this to be able to debug better
> ??
> >
> > Thanks,
> > ~Ramya.
> >
>

Reply via email to