hi. Could you share more info for us, e.g. exception stack? Do you set the assigner for all the source? I think you can modify the KeyedProcessFuncition to print the message whose timestamp is null.
Best, Shengkai bat man <tintin0...@gmail.com> 于2022年6月15日周三 14:57写道: > Has anyone experienced this or has any clue? > > On Tue, Jun 14, 2022 at 6:21 PM bat man <tintin0...@gmail.com> wrote: > >> Hi, >> >> We are using flink 12.1 on AWS EMR. The job reads the event stream and >> enrich stream from another topic. >> We extend AssignerWithPeriodicWatermarks to assign watermarks and extract >> timestamp from the event and handle idle source partitions. >> AutoWatermarkInterval set to 5000L. >> The timestamp extractor looks like below - >> >> @Override >> public long extractTimestamp(Raw event, long >> previousElementTimestamp) { >> lastRecordProcessingTime = System.currentTimeMillis(); >> Double eventTime = >> >> Double.parseDouble(event.getTimestamp().toString()).longValue(); >> long timestamp = Instant.ofEpochMilli(eventTime >> *1_000).toEpochMilli(); >> if (timestamp > currentMaxTimestamp) { >> currentMaxTimestamp = timestamp; >> } >> return timestamp; >> } >> >> Second step the rules are joined to events, this is done in keyedprocess >> function. >> What we have observed is that at times when the job starts consuming from >> the beginning of the event source stream, the timestamp accessed in >> the keyedprocess fn using context.timestamp comes as null and the code is >> throwing NPE. >> This happens only for some records intermittently and the same event when >> we try to process in another environment it processes fine, that means the >> event is getting parsed fine. >> >> What could be the issue, anyone has any idea, because as far as timestamp >> goes it could only be null if the timestamp extractor sends null. >> >> Thanks. >> >