sivabalan narayanan created HUDI-6693: -----------------------------------------
Summary: Streaming writes fail in quick start w/ 0.14.0 Key: HUDI-6693 URL: https://issues.apache.org/jira/browse/HUDI-6693 Project: Apache Hudi Issue Type: Improvement Components: spark, writer-core Reporter: sivabalan narayanan Quick starts fails w/ streaming ingestion {code:java} scala> df.writeStream.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, streamingTableName). | outputMode("append"). | option("path", baseStreamingPath). | option("checkpointLocation", checkpointLocation). | trigger(Trigger.Once()). | start() warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation' 23/08/10 14:31:09 WARN HoodieStreamingSink: Ignore TableNotFoundException as it is first microbatch. 23/08/10 14:31:09 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. res12: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@75143003 scala> 23/08/10 14:31:10 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist 23/08/10 14:31:10 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist 23/08/10 14:31:10 WARN AutoRecordKeyGenerationUtils$: Precombine field ts will be ignored with auto record key generation enabled 23/08/10 14:31:10 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark 23/08/10 14:31:10 ERROR HoodieStreamingSink: Micro batch id=0 threw following exception: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start(); LogicalRDD [_hoodie_commit_time#1063, _hoodie_commit_seqno#1064, _hoodie_record_key#1065, _hoodie_partition_path#1066, _hoodie_file_name#1067, begin_lat#1068, begin_lon#1069, driver#1070, end_lat#1071, end_lon#1072, fare#1073, partitionpath#1074, rider#1075, ts#1076L, uuid#1077], true at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:447) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:109) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:125) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:121) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:117) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:135) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:172) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:171) at org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:107) at org.apache.hudi.HoodieCreateRecordUtils$.createHoodieRecordRdd(HoodieCreateRecordUtils.scala:114) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:386) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:132) at scala.util.Try$.apply(Try.scala:213) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:129) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:233) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:128) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193) at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209) 23/08/10 14:31:12 WARN AutoRecordKeyGenerationUtils$: Precombine field ts will be ignored with auto record key generation enabled 23/08/10 14:31:12 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark 23/08/10 14:31:12 ERROR HoodieStreamingSink: Micro batch id=0 threw following exception: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start(); LogicalRDD [_hoodie_commit_time#1063, _hoodie_commit_seqno#1064, _hoodie_record_key#1065, _hoodie_partition_path#1066, _hoodie_file_name#1067, begin_lat#1068, begin_lon#1069, driver#1070, end_lat#1071, end_lon#1072, fare#1073, partitionpath#1074, rider#1075, ts#1076L, uuid#1077], true at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:447) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:109) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:125) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:121) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:117) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:135) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:172) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:171) at org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:107) at org.apache.hudi.HoodieCreateRecordUtils$.createHoodieRecordRdd(HoodieCreateRecordUtils.scala:114) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:386) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:132) at scala.util.Try$.apply(Try.scala:213) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:129) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:233) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:128) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193) at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209) 23/08/10 14:31:16 WARN AutoRecordKeyGenerationUtils$: Precombine field ts will be ignored with auto record key generation enabled 23/08/10 14:31:16 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark 23/08/10 14:31:16 ERROR HoodieStreamingSink: Micro batch id=0 threw following exception: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start(); LogicalRDD [_hoodie_commit_time#1063, _hoodie_commit_seqno#1064, _hoodie_record_key#1065, _hoodie_partition_path#1066, _hoodie_file_name#1067, begin_lat#1068, begin_lon#1069, driver#1070, end_lat#1071, end_lon#1072, fare#1073, partitionpath#1074, rider#1075, ts#1076L, uuid#1077], true at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:447) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:109) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:125) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:121) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:117) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:135) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:172) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:171) at org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:107) at org.apache.hudi.HoodieCreateRecordUtils$.createHoodieRecordRdd(HoodieCreateRecordUtils.scala:114) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:386) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:132) at scala.util.Try$.apply(Try.scala:213) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:129) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:233) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:128) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193) at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209) 23/08/10 14:31:16 ERROR HoodieStreamingSink: Micro batch id=0 threw following expections,aborting streaming app to avoid data loss: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start(); LogicalRDD [_hoodie_commit_time#1063, _hoodie_commit_seqno#1064, _hoodie_record_key#1065, _hoodie_partition_path#1066, _hoodie_file_name#1067, begin_lat#1068, begin_lon#1069, driver#1070, end_lat#1071, end_lon#1072, fare#1073, partitionpath#1074, rider#1075, ts#1076L, uuid#1077], true at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:447) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:109) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:125) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:121) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:117) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:135) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:172) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:171) at org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:107) at org.apache.hudi.HoodieCreateRecordUtils$.createHoodieRecordRdd(HoodieCreateRecordUtils.scala:114) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:386) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:132) at scala.util.Try$.apply(Try.scala:213) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:129) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:233) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:128) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193) at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)