Re: How to handle tuple keys with null values

2024-04-02 Thread Hang Ruan
Hi Sachin.

I think maybe we could cast the Long as String to handle the null value. Or
as Asimansu said, try to filter out the null data.

Best,
Hang

Asimansu Bera  于2024年4月3日周三 08:35写道:

> Hello Sachin,
>
> The same issue had been reported in the past and JIRA was closed without
> resolution.
>
> https://issues.apache.org/jira/browse/FLINK-4823
>
> I do see this is as a data quality issue. You need to understand what you
> would like to do with the null value. Either way, better to filter out the
> null data earlier so that you may not necessary manage the null or you may
> also try using POJO as POJO might support null.
>
> Sincerely,
> -A
>
>
> On Tue, Apr 2, 2024 at 12:21 PM Sachin Mittal  wrote:
>
>> Hello folks,
>> I am keying my stream using a Tuple:
>>
>> example:
>>
>> public class MyKeySelector implements KeySelector> {
>>
>> @Override
>> public Tuple2 getKey(Data data) {
>>   return Tuple2.of(data.id, data.id1);
>> }
>>
>> }
>>
>> Now id1 can have null values. In this case how should I handle this?
>>
>> Right now I am getting this error:
>>
>> java.lang.RuntimeException: Exception occurred while setting the current key 
>> context.
>> at 
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
>> [flink-dist-1.17.1.jar:1.17.1]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
>> [flink-dist-1.17.1.jar:1.17.1]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
>> [flink-dist-1.17.1.jar:1.17.1]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
>> Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but 
>> expected to hold a value.
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431)
>>  ~[flink-dist-1.17.1.jar:1.17.1]
>> at 
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
>>  

Re: How to handle tuple keys with null values

2024-04-02 Thread Asimansu Bera
Hello Sachin,

The same issue had been reported in the past and JIRA was closed without
resolution.

https://issues.apache.org/jira/browse/FLINK-4823

I do see this is as a data quality issue. You need to understand what you
would like to do with the null value. Either way, better to filter out the
null data earlier so that you may not necessary manage the null or you may
also try using POJO as POJO might support null.

Sincerely,
-A


On Tue, Apr 2, 2024 at 12:21 PM Sachin Mittal  wrote:

> Hello folks,
> I am keying my stream using a Tuple:
>
> example:
>
> public class MyKeySelector implements KeySelector> {
>
> @Override
> public Tuple2 getKey(Data data) {
>   return Tuple2.of(data.id, data.id1);
> }
>
> }
>
> Now id1 can have null values. In this case how should I handle this?
>
> Right now I am getting this error:
>
> java.lang.RuntimeException: Exception occurred while setting the current key 
> context.
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
> [flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
> [flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-dist-1.17.1.jar:1.17.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
> Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but 
> expected to hold a value.
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> ... 18 more
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:67)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> 

How to handle tuple keys with null values

2024-04-02 Thread Sachin Mittal
Hello folks,
I am keying my stream using a Tuple:

example:

public class MyKeySelector implements KeySelector> {

@Override
public Tuple2 getKey(Data data) {
  return Tuple2.of(data.id, data.id1);
}

}

Now id1 can have null values. In this case how should I handle this?

Right now I am getting this error:

java.lang.RuntimeException: Exception occurred while setting the
current key context.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
[flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
Caused by: org.apache.flink.types.NullFieldException: Field 1 is null,
but expected to hold a value.
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
~[flink-dist-1.17.1.jar:1.17.1]
... 18 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:67)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:30)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:133)
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31)
~[flink-dist-1.17.1.jar:1.17.1]


Thanks
Sachin