eric9204 commented on issue #6966:
URL: https://github.com/apache/hudi/issues/6966#issuecomment-1294832847

   @fengjian428 This case can reproduce above issue
   ```
   import java.io.EOFException
   
   import org.apache.spark.SparkConf
   import org.apache.spark.sql.SparkSession
   import 
org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, 
QueryStartedEvent, QueryTerminatedEvent}
   import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, 
Trigger}
   
   object RatePerMicroBatchSourceTest {
     def main(args: Array[String]): Unit = {
       val sparkConf = new SparkConf().set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
       val spark = 
SparkSession.builder().config(sparkConf).master("local[6]").getOrCreate()
       val dataFrame = spark.readStream
         .format("rate")
         .option("rowsPerBatch", 1)
         .option("numPartitions", 1)
         .option("startTimestamp", 0)
         .option("advanceMillisPerBatch", 1000)
         .load()
       dataFrame.createTempView("t")
       val resultDf = spark.sql(
         """
           |select timestamp,value,date_format(now(),'yyyyMMddHH') as part from 
t
           |""".stripMargin)
       try {
         runListener(spark)
         resultDf.writeStream
           .queryName("RateStreamSource")
           .format("hudi")
           .options(getHudiConfig)
           .outputMode(OutputMode.Append())
           .trigger(Trigger.ProcessingTime("10 seconds"))
           .start()
           .awaitTermination()
       } catch {
         case e: EOFException => println("eof" + e)
         case _: Exception => println("停止查询")
       }
     }
   
     def runListener(spark: SparkSession) = {
       spark.streams.addListener(new StreamingQueryListener() {
         override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
           println("查询开始: " + queryStarted.id)
         }
   
         override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = {
           println(queryTerminated.id)
         }
   
         override def onQueryProgress(queryProgress: QueryProgressEvent): Unit 
= {
           if (queryProgress.progress.batchId >= 4) 
spark.streams.active.foreach(_.stop())
           println("查询进度 batchId : " + queryProgress.progress.batchId)
         }
       })
       spark.streams.active.foreach(_.stop())
     }
   
     def getHudiConfig = {
       Map(
         "hoodie.datasource.write.operation" -> "insert",
         "hoodie.merge.allow.duplicate.on.inserts" -> "true",
         "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
         "hoodie.datasource.write.precombine.field" -> "timestamp",
         "hoodie.datasource.write.recordkey.field" -> "value",
         "hoodie.datasource.write.partitionpath.field" -> "part",
         "hoodie.table.name" -> "hudiTest",
         "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.SimpleKeyGenerator",
         "hoodie.datasource.write.drop.partition.columns" -> "true",
         "hoodie.upsert.shuffle.parallelism" -> "1",
         "hoodie.insert.shuffle.parallelism" -> "1",
         "hoodie.datasource.compaction.async.enable" -> "false",
         "hoodie.compact.inline"->"true",
         "hoodie.compact.inline.max.delta.commits" -> "3",
         "hoodie.index.type" -> "BUCKET",
         "hoodie.bucket.index.num.buckets" -> "1",
         "hoodie.bucket.index.hash.field" -> "value",
         "hoodie.storage.layout.partitioner.class" -> 
"org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner",
         "hoodie.storage.layout.type" -> "BUCKET",
         "hoodie.metadata.enable" -> "false",
         "hoodie.embed.timeline.server" -> "false",
         "path" -> "/tmp/hudi/hudiTest",
         "checkpointLocation" -> "/tmp/hudi/ckp"
       )
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to