Hi Team, I am facing this issue again. I am using Spark 3.0.1 with Python.
Could you please suggest why it says the below error: *Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]: {“my-topic”:{“1":1498,“0”:1410}}}Current Available Offsets: {KafkaV2[Subscribe[my-topic]]: {“my-topic”:{“1”:1499,“0":1410}}}* Kind Regards, Sachit Murarka On Fri, Mar 12, 2021 at 5:44 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Please see that driver side for example resolved in 3.1.0... > > G > > > On Fri, Mar 12, 2021 at 1:03 PM Sachit Murarka <connectsac...@gmail.com> > wrote: > >> Hi Gabor, >> >> Thanks a lot for the response. I am using Spark 3.0.1 and this is spark >> structured streaming. >> >> Kind Regards, >> Sachit Murarka >> >> >> On Fri, Mar 12, 2021 at 5:30 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Since you've not provided any version I guess you're using 2.x and >>> you're hitting this issue: >>> https://issues.apache.org/jira/browse/SPARK-28367 >>> The executor side must be resolved out of the box in the latest Spark >>> version however on driver side one must set " >>> spark.sql.streaming.kafka.useDeprecatedOffsetFetching=false" to use the >>> new way of fetching. >>> >>> If it doesn't solve your problem then Kafka side must be checked why >>> it's not returning... >>> >>> Hope this helps! >>> >>> G >>> >>> >>> On Fri, Mar 12, 2021 at 12:29 PM Sachit Murarka <connectsac...@gmail.com> >>> wrote: >>> >>>> Hi All, >>>> >>>> I am getting following error in spark structured streaming while >>>> connecting to Kakfa >>>> >>>> Main issue from logs:: >>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of >>>> 60000ms expired before the position for partition my-topic-1 could be >>>> determined >>>> >>>> Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]: >>>> {“my-topic”:{“1":1498,“0”:1410}}} >>>> Current Available Offsets: {KafkaV2[Subscribe[my-topic]]: >>>> {“my-topic”:{“1”:1499,“0":1410}}} >>>> >>>> >>>> Full logs:: >>>> >>>> 21/03/12 11:04:35 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 >>>> times; aborting job >>>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write >>>> support >>>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c >>>> is aborting. >>>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write >>>> support >>>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c >>>> aborted. >>>> 21/03/12 11:04:35 ERROR MicroBatchExecution: Query [id = >>>> 2d788a3a-f0ee-4903-9679-0d13bc401e12, runId = >>>> 1b387c28-c8e3-4336-9c9f-57db16aa8132] terminated with error >>>> org.apache.spark.SparkException: Writing job aborted. >>>> at >>>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413) >>>> at >>>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361) >>>> at >>>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322) >>>> at >>>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329) >>>> at >>>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39) >>>> at >>>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39) >>>> at >>>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45) >>>> at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) >>>> at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940) >>>> at >>>> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) >>>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) >>>> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) >>>> at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940) >>>> at >>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:575) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) >>>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) >>>> at >>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:570) >>>> at >>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) >>>> at >>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) >>>> at >>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) >>>> at >>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570) >>>> at >>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) >>>> at >>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >>>> at >>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) >>>> at >>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) >>>> at >>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) >>>> at >>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) >>>> at >>>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) >>>> at >>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) >>>> at org.apache.spark.sql.execution.streaming.StreamExecution.org >>>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) >>>> at >>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) >>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage >>>> failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task >>>> 0.3 in stage 0.0 (TID 3, 10.244.2.68, executor 1): >>>> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired >>>> before the position for partition my-topic-1 could be determined >>>> >>>> Driver stacktrace: >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) >>>> at >>>> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) >>>> at >>>> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) >>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) >>>> at scala.Option.foreach(Option.scala:407) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) >>>> at >>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) >>>> at >>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) >>>> at >>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) >>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) >>>> at >>>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:382) >>>> ... 37 more >>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of >>>> 60000ms expired before the position for partition my-topic-1 could be >>>> determined >>>> >>>> Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]: >>>> {“my-topic”:{“1":1498,“0”:1410}}} >>>> Current Available Offsets: {KafkaV2[Subscribe[my-topic]]: >>>> {“my-topic”:{“1”:1499,“0":1410}}} >>>> >>>> Kind Regards, >>>> Sachit Murarka >>>> >>>