Hi Sachin,
Seems KeyGroupStreamPartitioner is complaining about receiving a null
StreamRecord, which is abnormal since MongoDeserializationSchema ensures
non-nullability before putting it into stream:
```
default void deserialize(BsonDocument document, Collector<T> out) throws
IOException {
T deserialize = deserialize(document);
if (deserialize != null) {
out.collect(deserialize); // No null value will be emitted
}
}
```
Could you please clarify what methods does the MongoDeserializationSchema<Data>
class overrides, like `deserialize(BsonDocument)` method, or
`deserialize(BsonDocument, Collector)`, too?
Regards,
Xiqian
De : Sachin Mittal <[email protected]>
Date : lundi, 5 août 2024 à 19:59
À : [email protected] <[email protected]>
Objet : How can I debug Assigned key must not be null error when reading from
Mongodb source
Hi,
I am using mongodb connector provided in here:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/
I am instantiating it pretty much in the recommended way:
MongoSource<Data> source =
MongoSource.<Data>builder()
.setUri("...")
.setDatabase("...")
.setCollection("...")
.setFetchSize(2048)
.setNoCursorTimeout(true)
.setPartitionStrategy(PartitionStrategy.SAMPLE)
.setPartitionSize(MemorySize.ofMebiBytes(64))
.setSamplesPerPartition(10)
.setDeserializationSchema(
new MongoDeserializationSchema<Data>() {
...
})
.build();
final DataStream<Data> events =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Src");
It is running fine but after a while the job crashed with following exception:
2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task [] -
Source: Src (1/1)#0
(394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0)
switched from RUNNING to FAILED with failure cause:
java.lang.NullPointerException: Assigned key must not be null!
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:134)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:114)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collect(RecordWriterOutput.java:95)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collect(RecordWriterOutput.java:48)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62)
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
at
org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60)
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
at
org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54)
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
at
org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34)
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
...
Any idea where should I look and how can I fix this ?
Thanks
Sachin