Hi Sachin, Could you please check if you have used the keyBy operator and ensure that the keyBy field is not null?
Best, Jiabao On 2024/08/05 12:33:27 Sachin Mittal wrote: > So I have an anonymous class implementing MongoDeserializationSchema > > new MongoDeserializationSchema<Data>() { > @Override > public Data deserialize(BsonDocument document) { > String json = document.toJson(); > Data data = null; > try { > data = gson.fromJson(json, Data.class); > } catch (JsonSyntaxException e) { > logger.error("Error decoding Data {}", json, e); > } > return data; > } > > @Override > public TypeInformation<Data> getProducedType() { > return Types.POJO(Data.class); > } > } > > I don't see any errors logged in from these methods. > Also the pipeline works fine for a while and then it stops working due to > the error posted. > Error states something with the key and not the record itself. Maybe it > partitioned records based on some fields of the collection and that value > is null for that record ? > I am using PartitionStrategy.SAMPLE. Also watermark strategy is > WatermarkStrategy.*noWatermarks.* > > Thanks > Sachin > > > On Mon, Aug 5, 2024 at 5:47 PM Xiqian YU <kono....@outlook.com> wrote: > > > Hi Sachin, > > > > > > > > Seems KeyGroupStreamPartitioner is complaining about receiving a null > > StreamRecord, which is abnormal since MongoDeserializationSchema ensures > > non-nullability before putting it into stream: > > > > > > > > ``` > > > > default void deserialize(BsonDocument document, Collector<T> out) throws > > IOException { > > T deserialize = deserialize(document); > > if (deserialize != null) { > > out.collect(deserialize); // No null value will be emitted > > } > > } > > > > ``` > > > > > > > > Could you please clarify what methods does the > > MongoDeserializationSchema<Data> > > class overrides, like `deserialize(BsonDocument)` method, or > > `deserialize(BsonDocument, Collector)`, too? > > > > > > > > Regards, > > > > Xiqian > > > > > > > > *De : *Sachin Mittal <sjmit...@gmail.com> > > *Date : *lundi, 5 août 2024 à 19:59 > > *À : *user@flink.apache.org <user@flink.apache.org> > > *Objet : *How can I debug Assigned key must not be null error when > > reading from Mongodb source > > > > Hi, > > > > I am using mongodb connector provided in here: > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/ > > > > > > > > I am instantiating it pretty much in the recommended way: > > > > > > > > MongoSource<Data> source = > > > > MongoSource.<Data>builder() > > > > .setUri("...") > > > > .setDatabase("...") > > > > .setCollection("...") > > > > .setFetchSize(2048) > > > > .setNoCursorTimeout(true) > > > > .setPartitionStrategy(PartitionStrategy.SAMPLE) > > > > .setPartitionSize(MemorySize.ofMebiBytes(64)) > > > > .setSamplesPerPartition(10) > > > > .setDeserializationSchema( > > > > new MongoDeserializationSchema<Data>() { > > > > ... > > > > }) > > > > .build(); > > > > final DataStream<Data> events = > > env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "Src"); > > > > > > > > It is running fine but after a while the job crashed with following > > exception: > > > > > > > > 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task [] > > - Source: Src (1/1)#0 > > (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0) > > switched from RUNNING to FAILED with failure cause: > > > > java.lang.NullPointerException: Assigned key must not be null! > > > > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76) > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.runtime.state.KeyGroupRangeAssignment > > .assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51) > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.streaming.runtime.partitioner. > > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63) > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.streaming.runtime.partitioner. > > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35) > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.runtime.io.network.api.writer. > > ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > > .pushToRecordWriter(RecordWriterOutput.java:134) ~[flink-dist-1.18.1.jar: > > 1.18.1] > > > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > > .collectAndCheckIfChained(RecordWriterOutput.java:114) ~[flink-dist-1.18.1 > > .jar:1.18.1] > > > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect( > > RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect( > > RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > > CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > > CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.streaming.runtime.tasks. > > SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord( > > SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.streaming.api.operators.source. > > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.streaming.api.operators.source. > > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > at org.apache.flink.connector.mongodb.source.reader.emitter. > > MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62) > > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > > > at org.apache.flink.connector.mongodb.source.reader.deserializer. > > MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60) > > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > > > at org.apache.flink.connector.mongodb.source.reader.emitter. > > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54) > > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > > > at org.apache.flink.connector.mongodb.source.reader.emitter. > > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34) > > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > > > ... > > > > > > > > Any idea where should I look and how can I fix this ? > > > > > > > > Thanks > > > > Sachin > > > > > > >