hi.

Could you share more info for us, e.g. exception stack? Do you set the
assigner for all the source? I think you can modify the
KeyedProcessFuncition to print the message whose timestamp is null.

Best,
Shengkai

bat man <tintin0...@gmail.com> 于2022年6月15日周三 14:57写道:

> Has anyone experienced this or has any clue?
>
> On Tue, Jun 14, 2022 at 6:21 PM bat man <tintin0...@gmail.com> wrote:
>
>> Hi,
>>
>> We are using flink 12.1 on AWS EMR. The job reads the event stream and
>> enrich stream from another topic.
>> We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
>> timestamp from the event and handle idle source partitions.
>> AutoWatermarkInterval set to 5000L.
>>  The timestamp extractor looks like below -
>>
>>         @Override
>>         public long extractTimestamp(Raw event, long
>> previousElementTimestamp) {
>>             lastRecordProcessingTime = System.currentTimeMillis();
>>             Double eventTime =
>>
>> Double.parseDouble(event.getTimestamp().toString()).longValue();
>>             long timestamp = Instant.ofEpochMilli(eventTime
>> *1_000).toEpochMilli();
>>             if (timestamp > currentMaxTimestamp) {
>>                 currentMaxTimestamp = timestamp;
>>             }
>>             return timestamp;
>>         }
>>
>> Second step the rules are joined to events, this is done in keyedprocess
>> function.
>> What we have observed is that at times when the job starts consuming from
>> the beginning of the event source stream, the timestamp accessed in
>> the keyedprocess fn using context.timestamp comes as null and the code is
>> throwing NPE.
>> This happens only for some records intermittently and the same event when
>> we try to process in another environment it processes fine, that means the
>> event is getting parsed fine.
>>
>> What could be the issue, anyone has any idea, because as far as timestamp
>> goes it could only be null if the timestamp extractor sends null.
>>
>> Thanks.
>>
>

Reply via email to