So I have an anonymous class implementing MongoDeserializationSchema
new MongoDeserializationSchema<Data>() {
@Override
public Data deserialize(BsonDocument document) {
String json = document.toJson();
Data data = null;
try {
data = gson.fromJson(json, Data.class);
} catch (JsonSyntaxException e) {
logger.error("Error decoding Data {}", json, e);
}
return data;
}
@Override
public TypeInformation<Data> getProducedType() {
return Types.POJO(Data.class);
}
}
I don't see any errors logged in from these methods.
Also the pipeline works fine for a while and then it stops working due to
the error posted.
Error states something with the key and not the record itself. Maybe it
partitioned records based on some fields of the collection and that value
is null for that record ?
I am using PartitionStrategy.SAMPLE. Also watermark strategy is
WatermarkStrategy.*noWatermarks.*
Thanks
Sachin
On Mon, Aug 5, 2024 at 5:47 PM Xiqian YU <[email protected]> wrote:
> Hi Sachin,
>
>
>
> Seems KeyGroupStreamPartitioner is complaining about receiving a null
> StreamRecord, which is abnormal since MongoDeserializationSchema ensures
> non-nullability before putting it into stream:
>
>
>
> ```
>
> default void deserialize(BsonDocument document, Collector<T> out) throws
> IOException {
> T deserialize = deserialize(document);
> if (deserialize != null) {
> out.collect(deserialize); // No null value will be emitted
> }
> }
>
> ```
>
>
>
> Could you please clarify what methods does the
> MongoDeserializationSchema<Data>
> class overrides, like `deserialize(BsonDocument)` method, or
> `deserialize(BsonDocument, Collector)`, too?
>
>
>
> Regards,
>
> Xiqian
>
>
>
> *De : *Sachin Mittal <[email protected]>
> *Date : *lundi, 5 août 2024 à 19:59
> *À : *[email protected] <[email protected]>
> *Objet : *How can I debug Assigned key must not be null error when
> reading from Mongodb source
>
> Hi,
>
> I am using mongodb connector provided in here:
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/
>
>
>
> I am instantiating it pretty much in the recommended way:
>
>
>
> MongoSource<Data> source =
>
> MongoSource.<Data>builder()
>
> .setUri("...")
>
> .setDatabase("...")
>
> .setCollection("...")
>
> .setFetchSize(2048)
>
> .setNoCursorTimeout(true)
>
> .setPartitionStrategy(PartitionStrategy.SAMPLE)
>
> .setPartitionSize(MemorySize.ofMebiBytes(64))
>
> .setSamplesPerPartition(10)
>
> .setDeserializationSchema(
>
> new MongoDeserializationSchema<Data>() {
>
> ...
>
> })
>
> .build();
>
> final DataStream<Data> events =
> env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "Src");
>
>
>
> It is running fine but after a while the job crashed with following
> exception:
>
>
>
> 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: Src (1/1)#0
> (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0)
> switched from RUNNING to FAILED with failure cause:
>
> java.lang.NullPointerException: Assigned key must not be null!
>
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.runtime.state.KeyGroupRangeAssignment
> .assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.partitioner.
> KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.partitioner.
> KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.runtime.io.network.api.writer.
> ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> .pushToRecordWriter(RecordWriterOutput.java:134) ~[flink-dist-1.18.1.jar:
> 1.18.1]
>
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> .collectAndCheckIfChained(RecordWriterOutput.java:114) ~[flink-dist-1.18.1
> .jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.tasks.
> SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(
> SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.api.operators.source.
> SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.api.operators.source.
> SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.connector.mongodb.source.reader.emitter.
> MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62)
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
>
> at org.apache.flink.connector.mongodb.source.reader.deserializer.
> MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60)
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
>
> at org.apache.flink.connector.mongodb.source.reader.emitter.
> MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54)
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
>
> at org.apache.flink.connector.mongodb.source.reader.emitter.
> MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34)
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
>
> ...
>
>
>
> Any idea where should I look and how can I fix this ?
>
>
>
> Thanks
>
> Sachin
>
>
>