May I ask why you don’t use spark.read and spark.write instead of readStream and writeStream? Thanks.

On 2021-12-17 15:09, Abhinav Gundapaneni wrote:
Hello Spark community,

I’m using Apache spark(version 3.2) to read a CSV file to a
dataframe using ReadStream, process the dataframe and write the
dataframe to Delta file using WriteStream. I’m getting a failure
during the WriteStream process. I’m trying to run the script locally
in my windows 11 machine. Below is the stack trace of the error I’m
facing. Please let me know if there’s anything that I’m missing.

java.lang.NoSuchMethodError:
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(Lscala/collection/Seq;)V


        at
org.apache.spark.sql.delta.schema.SchemaUtils$.checkFieldNames(SchemaUtils.scala:958)


        at
org.apache.spark.sql.delta.OptimisticTransactionImpl.verifyNewMetadata(OptimisticTransaction.scala:216)


        at
org.apache.spark.sql.delta.OptimisticTransactionImpl.verifyNewMetadata$(OptimisticTransaction.scala:214)


        at
org.apache.spark.sql.delta.OptimisticTransaction.verifyNewMetadata(OptimisticTransaction.scala:80)


        at
org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata(OptimisticTransaction.scala:208)


        at
org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata$(OptimisticTransaction.scala:195)


        at
org.apache.spark.sql.delta.OptimisticTransaction.updateMetadata(OptimisticTransaction.scala:80)


        at
org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:101)


        at
org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata$(ImplicitMetadataOperation.scala:62)


        at
org.apache.spark.sql.delta.sources.DeltaSink.updateMetadata(DeltaSink.scala:37)


        at
org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:59)


        at
org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata$(ImplicitMetadataOperation.scala:50)


        at
org.apache.spark.sql.delta.sources.DeltaSink.updateMetadata(DeltaSink.scala:37)


        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1(DeltaSink.scala:80)


        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1$adapted(DeltaSink.scala:54)


        at
org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)


        at
org.apache.spark.sql.delta.sources.DeltaSink.addBatch(DeltaSink.scala:54)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)


        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)


        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)


        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)


        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)

        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)


        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)


        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)


        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)


        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)


        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)


        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)


        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)


        at
org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)


        at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)


        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)


        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)

        at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)


        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)


Exception in thread "stream execution thread for [id =
aafe131a-0785-4285-8b5e-7735b30959a7, runId =
effac477-1036-498e-961b-41e9b76c68df]" java.lang.NoSuchMethodError:
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(Lscala/collection/Seq;)V


        at
org.apache.spark.sql.delta.schema.SchemaUtils$.checkFieldNames(SchemaUtils.scala:958)


        at
org.apache.spark.sql.delta.OptimisticTransactionImpl.verifyNewMetadata(OptimisticTransaction.scala:216)


        at
org.apache.spark.sql.delta.OptimisticTransactionImpl.verifyNewMetadata$(OptimisticTransaction.scala:214)


        at
org.apache.spark.sql.delta.OptimisticTransaction.verifyNewMetadata(OptimisticTransaction.scala:80)


        at
org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata(OptimisticTransaction.scala:208)


        at
org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata$(OptimisticTransaction.scala:195)


        at
org.apache.spark.sql.delta.OptimisticTransaction.updateMetadata(OptimisticTransaction.scala:80)


        at
org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:101)


        at
org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata$(ImplicitMetadataOperation.scala:62)


        at
org.apache.spark.sql.delta.sources.DeltaSink.updateMetadata(DeltaSink.scala:37)


        at
org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:59)


        at
org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata$(ImplicitMetadataOperation.scala:50)


        at
org.apache.spark.sql.delta.sources.DeltaSink.updateMetadata(DeltaSink.scala:37)


        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1(DeltaSink.scala:80)


        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1$adapted(DeltaSink.scala:54)


        at
org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)


        at
org.apache.spark.sql.delta.sources.DeltaSink.addBatch(DeltaSink.scala:54)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)


        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)


        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)


        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)


        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)

        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)


        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)


        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)


        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)


        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)


        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)


        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)


        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)Traceback
(most recent call last):

  File
"C:\Users\agundapaneni\Development\ModernDataEstate\tests\test_mdefbasic.py",
line 60, in <module>

        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)


        at
org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)


        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)


        at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)


        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)


        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)

        at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)


        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)


    obj.test_ingest_incremental_data_batch1()

  File
"C:\Users\agundapaneni\Development\ModernDataEstate\tests\test_mdefbasic.py",
line 56, in test_ingest_incremental_data_batch1

    mdef.ingest_incremental_data('example', entity,
self.schemas['studentattendance'], 'school_year')

  File
"C:\Users\agundapaneni\Development\ModernDataEstate/src\MDEFBasic.py",
line 109, in ingest_incremental_data

    query.awaitTermination()   # block until query is terminated, with
stop() or with error; A StreamingQueryException will be thrown if an
exception occurs.

  File
"C:\Users\agundapaneni\Development\ModernDataEstate\.tox\default\lib\site-packages\pyspark\sql\streaming.py",
line 101, in awaitTermination

    return self._jsq.awaitTermination()

  File
"C:\Users\agundapaneni\Development\ModernDataEstate\.tox\default\lib\site-packages\py4j\java_gateway.py",
line 1309, in __call__

    return_value = get_return_value(

  File
"C:\Users\agundapaneni\Development\ModernDataEstate\.tox\default\lib\site-packages\pyspark\sql\utils.py",
line 117, in deco

    raise converted from None

pyspark.sql.utils.StreamingQueryException:
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(Lscala/collection/Seq;)V


=== Streaming Query ===

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to