Hi,
I have a query with regard to Late arriving records.
We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
In my sink operators, which converts this table to a stream which is being
pushed to Elastic Search, I am able to see this metric "
*numLateRecordsDropped*".
My Kafka consumers doesn't seem to have any lag and the events are
processed properly. To be able to take these events to a side outputs
doesn't seem to be possible with tables. Below is the snippet:
tableEnv.connect(new Kafka()
/* setting of all kafka properties */
.startFromLatest())
.withSchema(new Schema()
.field("sid", Types.STRING())
.field("_zpsbd6", Types.STRING())
.field("r1", Types.STRING())
.field("r2", Types.STRING())
.field("r5", Types.STRING())
.field("r10", Types.STRING())
.field("isBot", Types.BOOLEAN())
.field("botcode", Types.STRING())
.field("ts", Types.SQL_TIMESTAMP())
.rowtime(new Rowtime()
.timestampsFromField("recvdTime")
.watermarksPeriodicBounded(10000)
)
)
.withFormat(new Json().deriveSchema())
.inAppendMode()
.registerTableSource("sourceTopic");
String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, "
+ "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, "
+ "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM
sourceTopic "
+ "WHERE r1='true' or r2='true' or r5='true' or r10='true'
and isBot='true' "
+ "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, _zpsbd6";
Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is
showing the lateRecordsDropped, while executing the group by operation.
Is there a way to get the sideOutput of this to be able to debug better ??
Thanks,
~Ramya.