Hi Sachin,

Could you please check if you have used the keyBy operator and ensure that the 
keyBy field is not null?

Best,
Jiabao

On 2024/08/05 12:33:27 Sachin Mittal wrote:
> So I have an anonymous class implementing MongoDeserializationSchema
> 
> new MongoDeserializationSchema<Data>() {
>   @Override
>   public Data deserialize(BsonDocument document) {
>     String json = document.toJson();
>     Data data = null;
>     try {
>       data = gson.fromJson(json, Data.class);
>     } catch (JsonSyntaxException e) {
>       logger.error("Error decoding Data {}", json, e);
>     }
>     return data;
>   }
> 
>   @Override
>   public TypeInformation<Data> getProducedType() {
>     return Types.POJO(Data.class);
>   }
> }
> 
> I don't see any errors logged in from these methods.
> Also the pipeline works fine for a while and then it stops working due to
> the error posted.
> Error states something with the key and not the record itself. Maybe it
> partitioned records based on some fields of the collection and that value
> is null for that record ?
> I am using PartitionStrategy.SAMPLE. Also watermark strategy is
> WatermarkStrategy.*noWatermarks.*
> 
> Thanks
> Sachin
> 
> 
> On Mon, Aug 5, 2024 at 5:47 PM Xiqian YU <kono....@outlook.com> wrote:
> 
> > Hi Sachin,
> >
> >
> >
> > Seems KeyGroupStreamPartitioner is complaining about receiving a null
> > StreamRecord, which is abnormal since MongoDeserializationSchema ensures
> > non-nullability before putting it into stream:
> >
> >
> >
> > ```
> >
> > default void deserialize(BsonDocument document, Collector<T> out) throws
> > IOException {
> >     T deserialize = deserialize(document);
> >     if (deserialize != null) {
> >         out.collect(deserialize); // No null value will be emitted
> >     }
> > }
> >
> > ```
> >
> >
> >
> > Could you please clarify what methods does the 
> > MongoDeserializationSchema<Data>
> > class overrides, like `deserialize(BsonDocument)` method, or
> > `deserialize(BsonDocument, Collector)`, too?
> >
> >
> >
> > Regards,
> >
> > Xiqian
> >
> >
> >
> > *De : *Sachin Mittal <sjmit...@gmail.com>
> > *Date : *lundi, 5 août 2024 à 19:59
> > *À : *user@flink.apache.org <user@flink.apache.org>
> > *Objet : *How can I debug Assigned key must not be null error when
> > reading from Mongodb source
> >
> > Hi,
> >
> > I am using mongodb connector provided in here:
> >
> >
> > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/
> >
> >
> >
> > I am instantiating it pretty much in the recommended way:
> >
> >
> >
> > MongoSource<Data> source =
> >
> > MongoSource.<Data>builder()
> >
> > .setUri("...")
> >
> > .setDatabase("...")
> >
> > .setCollection("...")
> >
> > .setFetchSize(2048)
> >
> > .setNoCursorTimeout(true)
> >
> > .setPartitionStrategy(PartitionStrategy.SAMPLE)
> >
> > .setPartitionSize(MemorySize.ofMebiBytes(64))
> >
> > .setSamplesPerPartition(10)
> >
> > .setDeserializationSchema(
> >
> > new MongoDeserializationSchema<Data>() {
> >
> > ...
> >
> > })
> >
> > .build();
> >
> > final DataStream<Data> events =
> >     env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "Src");
> >
> >
> >
> > It is running fine but after a while the job crashed with following
> > exception:
> >
> >
> >
> > 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task []
> > - Source: Src (1/1)#0 
> > (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0)
> > switched from RUNNING to FAILED with failure cause:
> >
> > java.lang.NullPointerException: Assigned key must not be null!
> >
> > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.runtime.state.KeyGroupRangeAssignment
> > .assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.partitioner.
> > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.partitioner.
> > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.runtime.io.network.api.writer.
> > ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> > .pushToRecordWriter(RecordWriterOutput.java:134) ~[flink-dist-1.18.1.jar:
> > 1.18.1]
> >
> > at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> > .collectAndCheckIfChained(RecordWriterOutput.java:114) ~[flink-dist-1.18.1
> > .jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> > RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> > RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> > CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> > CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.tasks.
> > SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(
> > SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.api.operators.source.
> > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.api.operators.source.
> > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.connector.mongodb.source.reader.emitter.
> > MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62)
> > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
> >
> > at org.apache.flink.connector.mongodb.source.reader.deserializer.
> > MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60)
> > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
> >
> > at org.apache.flink.connector.mongodb.source.reader.emitter.
> > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54)
> > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
> >
> > at org.apache.flink.connector.mongodb.source.reader.emitter.
> > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34)
> > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
> >
> > ...
> >
> >
> >
> > Any idea where should I look and how can I fix this ?
> >
> >
> >
> > Thanks
> >
> > Sachin
> >
> >
> >
> 

Reply via email to