eric9204 commented on issue #6966: URL: https://github.com/apache/hudi/issues/6966#issuecomment-1294832847
@fengjian428 This case can reproduce above issue ``` import java.io.EOFException import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Trigger} object RatePerMicroBatchSourceTest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val spark = SparkSession.builder().config(sparkConf).master("local[6]").getOrCreate() val dataFrame = spark.readStream .format("rate") .option("rowsPerBatch", 1) .option("numPartitions", 1) .option("startTimestamp", 0) .option("advanceMillisPerBatch", 1000) .load() dataFrame.createTempView("t") val resultDf = spark.sql( """ |select timestamp,value,date_format(now(),'yyyyMMddHH') as part from t |""".stripMargin) try { runListener(spark) resultDf.writeStream .queryName("RateStreamSource") .format("hudi") .options(getHudiConfig) .outputMode(OutputMode.Append()) .trigger(Trigger.ProcessingTime("10 seconds")) .start() .awaitTermination() } catch { case e: EOFException => println("eof" + e) case _: Exception => println("停止查询") } } def runListener(spark: SparkSession) = { spark.streams.addListener(new StreamingQueryListener() { override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { println("查询开始: " + queryStarted.id) } override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { println(queryTerminated.id) } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { if (queryProgress.progress.batchId >= 4) spark.streams.active.foreach(_.stop()) println("查询进度 batchId : " + queryProgress.progress.batchId) } }) spark.streams.active.foreach(_.stop()) } def getHudiConfig = { Map( "hoodie.datasource.write.operation" -> "insert", "hoodie.merge.allow.duplicate.on.inserts" -> "true", "hoodie.datasource.write.table.type" -> "MERGE_ON_READ", "hoodie.datasource.write.precombine.field" -> "timestamp", "hoodie.datasource.write.recordkey.field" -> "value", "hoodie.datasource.write.partitionpath.field" -> "part", "hoodie.table.name" -> "hudiTest", "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.SimpleKeyGenerator", "hoodie.datasource.write.drop.partition.columns" -> "true", "hoodie.upsert.shuffle.parallelism" -> "1", "hoodie.insert.shuffle.parallelism" -> "1", "hoodie.datasource.compaction.async.enable" -> "false", "hoodie.compact.inline"->"true", "hoodie.compact.inline.max.delta.commits" -> "3", "hoodie.index.type" -> "BUCKET", "hoodie.bucket.index.num.buckets" -> "1", "hoodie.bucket.index.hash.field" -> "value", "hoodie.storage.layout.partitioner.class" -> "org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner", "hoodie.storage.layout.type" -> "BUCKET", "hoodie.metadata.enable" -> "false", "hoodie.embed.timeline.server" -> "false", "path" -> "/tmp/hudi/hudiTest", "checkpointLocation" -> "/tmp/hudi/ckp" ) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org