Hi Sachin.

I think maybe we could cast the Long as String to handle the null value. Or
as Asimansu said, try to filter out the null data.

Best,
Hang

Asimansu Bera <asimansu.b...@gmail.com> 于2024年4月3日周三 08:35写道:

> Hello Sachin,
>
> The same issue had been reported in the past and JIRA was closed without
> resolution.
>
> https://issues.apache.org/jira/browse/FLINK-4823
>
> I do see this is as a data quality issue. You need to understand what you
> would like to do with the null value. Either way, better to filter out the
> null data earlier so that you may not necessary manage the null or you may
> also try using POJO as POJO might support null.
>
> Sincerely,
> -A
>
>
> On Tue, Apr 2, 2024 at 12:21 PM Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> Hello folks,
>> I am keying my stream using a Tuple:
>>
>> example:
>>
>> public class MyKeySelector implements KeySelector<Data, Tuple2<Long, Long>> {
>>
>> @Override
>> public Tuple2<Long, Long> getKey(Data data) {
>>   return Tuple2.of(data.id, data.id1);
>> }
>>
>> }
>>
>> Now id1 can have null values. In this case how should I handle this?
>>
>> Right now I am getting this error:
>>
>> java.lang.RuntimeException: Exception occurred while setting the current key 
>> context.
>>     at 
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
>> [flink-dist-1.17.1.jar:1.17.1]
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
>> [flink-dist-1.17.1.jar:1.17.1]
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
>> [flink-dist-1.17.1.jar:1.17.1]
>>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>> Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but 
>> expected to hold a value.
>>     at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     ... 18 more
>> Caused by: java.lang.NullPointerException
>>     at 
>> org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:67)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:30)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:133)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>     at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>>
>>
>> Thanks
>> Sachin
>>
>>

Reply via email to