Hello folks, I am keying my stream using a Tuple: example:
public class MyKeySelector implements KeySelector<Data, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> getKey(Data data) { return Tuple2.of(data.id, data.id1); } } Now id1 can have null values. In this case how should I handle this? Right now I am getting this error: java.lang.RuntimeException: Exception occurred while setting the current key context. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but expected to hold a value. at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371) ~[flink-dist-1.17.1.jar:1.17.1] ... 18 more Caused by: java.lang.NullPointerException at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:67) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:30) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:133) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31) ~[flink-dist-1.17.1.jar:1.17.1] Thanks Sachin