cb149 opened a new issue #3161:
URL: https://github.com/apache/hudi/issues/3161


   **Describe the problem you faced**
   
   I have a Spark Structured Streaming application running on YARN which I 
trigger once at the start of every hour. Now I have discovered missing messages 
in my Hudi table, which is confusing since Structured Streaming proclaims _the 
system ensures end-to-end exactly-once fault-tolerance guarantees_
   
   Effectively the application runs:
   
   ```scala
   spark
         .readStream
         .format("kafka")
         .options(options) // the usual, bootstrapServers etc.
         .load()
         .selectExpr(s"CAST(key AS STRING) as $name", "CAST(value AS STRING)")
         // multiple .withColumn("xyz", from_json(col("value"), struct)) and 
explode()
         .writeStream
         .trigger(Trigger.Once())
         .format("hudi")
         .option("path", config.outputPath)
         .option("checkpointLocation", config.checkpointPath)
         .options(getHudiConfig())
         .outputMode(OutputMode.Append)
         .start()
   ```
   
   Commit operation is `upsert` and my Hudi config settings include (among the 
usual):
   ```scala
   HoodieWriteConfig.FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP -> "false",
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "ts",
   DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> "false", 
   DataSourceWriteOptions.STREAMING_RETRY_CNT_OPT_KEY -> "0"
   ```
   and my spark settings include
   ```
   "spark.rdd.compress": "true",
   "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
   "spark.unsafe.sorter.spill.read.ahead.enabled": "false",
   ```
   
   I have rerun the exact same application for the Kafka offsets containing the 
missing messages (about 2 days after the application ran for those offsets), 
writing to a new Hudi table and the missing message was not missing from that 
one, so it seems to me there is an issue with StructuredStreaming from Kafka to 
Hudi in particular with running shortly after a message was ingested. Looking 
into the log of the application that missed the message, I can see no errors, 
but the following:
   
   ```
   21/06/24 08:04:06 INFO hudi.HoodieStreamingSink: Micro batch id=322 
succeeded for commit=20210624080101
   21/06/24 08:04:06 INFO hudi.HoodieStreamingSink: Micro batch id=322 succeeded
   21/06/24 08:04:06 INFO streaming.CheckpointFileManager: Writing atomically 
to hdfs://{path}/checkpoint/commits/322 using temp file 
hdfs://{path}/checkpoint/commits/.322.e7795a54-3ea1-435a-a5de-253f2e66d8fe.tmp
   21/06/24 08:04:06 INFO streaming.CheckpointFileManager: Renamed temp file 
hdfs://{path}/checkpoint/commits/.322.e7795a54-3ea1-435a-a5de-253f2e66d8fe.tmp 
to hdfs://{path}/checkpoint/commits/322
   21/06/24 08:04:06 INFO streaming.MicroBatchExecution: Streaming query made 
progress: {
     "id" : "85adfe77-fa91-4094-ba2a-1ac65dd193cc",
     "runId" : "f1012bb4-0c38-4b99-9bb9-7f02ed3f3087",
     "name" : null,
     "timestamp" : "2021-06-24T06:00:56.265Z",
     "batchId" : 322,
     "numInputRows" : 2101,
     "processedRowsPerSecond" : 11.066864721932513,
     "durationMs" : {
       "addBatch" : 184786,
       "getBatch" : 8,
       "getEndOffset" : 0,
       "queryPlanning" : 256,
       "setOffsetRange" : 3766,
       "triggerExecution" : 189845,
       "walCommit" : 716
     },
     "stateOperators" : [ ],
     "sources" : [ {
       "description" : "KafkaV2[Subscribe[topic]]",
       "startOffset" : {
         "topic" : {
           "2" : 2354732,
           "1" : 852378,
           "0" : 4484872
         }
       },
       "endOffset" : {
         "topic" : {
           "2" : 2355432,
           "1" : 852507,
           "0" : 4486272
         }
       },
       "numInputRows" : 2101,
       "processedRowsPerSecond" : 11.066864721932513
     } ],
     "sink" : {
       "description" : "HoodieStreamingSink[table]"
     }
   }
   ```
   
   The weird thing to me is that if I substract `startOffset` from the 
`endOffset` I get **2229**. This should however match the **2101** of 
`numInputRows` right? In the other runs of the application, I can see that the 
difference between the offsets is always `numInputrows - 1`.
   This would also be the case here if we only combine the differences for 
partitions 2 and 0, resulting in 2100, which makes me assume the messages I am 
missing are in partition 1, which shows a difference of 129.
   
   Now since I effectively ran the code twice, and produced two different 
outputs, I am wondering where the issue comes from and how to fix it. Is the 
mismatch between the offsets and `numInputRows` from above the issue and 
indicates I lost 128 messages? Would this issue originate from the Kafka input 
or from the HoodieStreamingSink?
   
   Sadly, I can't replicate the issue.
   
   I've been thinking about getting the `numInputRows` and offsets from 
`query.lastProgress` in my code, to compare and throw an error on mismatch, but 
that would only fail after the commit was commited, so I would have to manually 
rerun for the missing data.
   
   **Environment Description**
   
   * Hudi version : 0.8.0
   
   * Spark version : 2.4.0
   
   * Hadoop version : 3.0.0
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   The message that is missing would have been ingested into Kafka shortly 
before the query execution at 6:00 am and is about 40KB big.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to