cb149 opened a new issue #3161: URL: https://github.com/apache/hudi/issues/3161
**Describe the problem you faced** I have a Spark Structured Streaming application running on YARN which I trigger once at the start of every hour. Now I have discovered missing messages in my Hudi table, which is confusing since Structured Streaming proclaims _the system ensures end-to-end exactly-once fault-tolerance guarantees_ Effectively the application runs: ```scala spark .readStream .format("kafka") .options(options) // the usual, bootstrapServers etc. .load() .selectExpr(s"CAST(key AS STRING) as $name", "CAST(value AS STRING)") // multiple .withColumn("xyz", from_json(col("value"), struct)) and explode() .writeStream .trigger(Trigger.Once()) .format("hudi") .option("path", config.outputPath) .option("checkpointLocation", config.checkpointPath) .options(getHudiConfig()) .outputMode(OutputMode.Append) .start() ``` Commit operation is `upsert` and my Hudi config settings include (among the usual): ```scala HoodieWriteConfig.FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP -> "false", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "ts", DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> "false", DataSourceWriteOptions.STREAMING_RETRY_CNT_OPT_KEY -> "0" ``` and my spark settings include ``` "spark.rdd.compress": "true", "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.unsafe.sorter.spill.read.ahead.enabled": "false", ``` I have rerun the exact same application for the Kafka offsets containing the missing messages (about 2 days after the application ran for those offsets), writing to a new Hudi table and the missing message was not missing from that one, so it seems to me there is an issue with StructuredStreaming from Kafka to Hudi in particular with running shortly after a message was ingested. Looking into the log of the application that missed the message, I can see no errors, but the following: ``` 21/06/24 08:04:06 INFO hudi.HoodieStreamingSink: Micro batch id=322 succeeded for commit=20210624080101 21/06/24 08:04:06 INFO hudi.HoodieStreamingSink: Micro batch id=322 succeeded 21/06/24 08:04:06 INFO streaming.CheckpointFileManager: Writing atomically to hdfs://{path}/checkpoint/commits/322 using temp file hdfs://{path}/checkpoint/commits/.322.e7795a54-3ea1-435a-a5de-253f2e66d8fe.tmp 21/06/24 08:04:06 INFO streaming.CheckpointFileManager: Renamed temp file hdfs://{path}/checkpoint/commits/.322.e7795a54-3ea1-435a-a5de-253f2e66d8fe.tmp to hdfs://{path}/checkpoint/commits/322 21/06/24 08:04:06 INFO streaming.MicroBatchExecution: Streaming query made progress: { "id" : "85adfe77-fa91-4094-ba2a-1ac65dd193cc", "runId" : "f1012bb4-0c38-4b99-9bb9-7f02ed3f3087", "name" : null, "timestamp" : "2021-06-24T06:00:56.265Z", "batchId" : 322, "numInputRows" : 2101, "processedRowsPerSecond" : 11.066864721932513, "durationMs" : { "addBatch" : 184786, "getBatch" : 8, "getEndOffset" : 0, "queryPlanning" : 256, "setOffsetRange" : 3766, "triggerExecution" : 189845, "walCommit" : 716 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaV2[Subscribe[topic]]", "startOffset" : { "topic" : { "2" : 2354732, "1" : 852378, "0" : 4484872 } }, "endOffset" : { "topic" : { "2" : 2355432, "1" : 852507, "0" : 4486272 } }, "numInputRows" : 2101, "processedRowsPerSecond" : 11.066864721932513 } ], "sink" : { "description" : "HoodieStreamingSink[table]" } } ``` The weird thing to me is that if I substract `startOffset` from the `endOffset` I get **2229**. This should however match the **2101** of `numInputRows` right? In the other runs of the application, I can see that the difference between the offsets is always `numInputrows - 1`. This would also be the case here if we only combine the differences for partitions 2 and 0, resulting in 2100, which makes me assume the messages I am missing are in partition 1, which shows a difference of 129. Now since I effectively ran the code twice, and produced two different outputs, I am wondering where the issue comes from and how to fix it. Is the mismatch between the offsets and `numInputRows` from above the issue and indicates I lost 128 messages? Would this issue originate from the Kafka input or from the HoodieStreamingSink? Sadly, I can't replicate the issue. I've been thinking about getting the `numInputRows` and offsets from `query.lastProgress` in my code, to compare and throw an error on mismatch, but that would only fail after the commit was commited, so I would have to manually rerun for the missing data. **Environment Description** * Hudi version : 0.8.0 * Spark version : 2.4.0 * Hadoop version : 3.0.0 * Storage (HDFS/S3/GCS..) : HDFS * Running on Docker? (yes/no) : no **Additional context** The message that is missing would have been ingested into Kafka shortly before the query execution at 6:00 am and is about 40KB big. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org