Unable to create direct stream with SSL enabled Kafka cluster
Hi I am trying to stream a Kafka topic using createDirectStream(). The Kafka cluster is SSL enabled. The code for the same is: *** import findspark findspark.init('/u01/idp/spark') from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils kafkaParams = {"metadata.broker.list":"kfk-bro2.mydomain.com:9093", "security.protocol":"ssl", "ssl.key.password":"Password123", "ssl.keystore.location":"/tmp/keystore.jks", "ssl.keystore.password":"Password123", "ssl.truststore.location":"/tmp/truststore.jks", "ssl.truststore.password":"Password123", "ssl.endpoint.identification.algorithm":""} if __name__ == "__main__": sc = SparkContext(appName="PythonStreamingReciever") ssc = StreamingContext(sc, 30) message = KafkaUtils.createDirectStream(ssc, ["test1_topic"], kafkaParams) lines = message.map(lambda x: x[1]) lines.pprint() ssc.start() ssc.awaitTermination() *** Submitting the python script to the cluster using spark-submit # spark-submit --master yarn --deploy-mode client --files /u01/idp/spark/conf/log4j.properties --conf "spark.executor.extraJavaOptions='-Dlog4j.configuration=log4j.properties'" --driver-java-options "-Dlog4j.configuration=file:/u01/idp/spark/conf/log4j.properties" --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 streamingKafka.py But during the execution of the above script, i am getting the following error. File "/home/spark/streamingKafka.py", line 23, in message = KafkaUtils.createDirectStream(ssc, ["test1_topic"], kafkaParams) . . File "/u01/idp/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/u01/idp/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o43.createDirectStreamWithoutMessageHandler. What could be the possible causes of the error ? I can stream Kafka topic using console consumer and can reach any one of the broker. Kafka version: 2.12 Spark version: 2.4.6 Thanks -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
value col is not a member of org.apache.spark.rdd.RDD
Hi I am trying to generate a hierarchy table using Spark GraphX but during runtime i am getting following error. *error: value col is not a member of org.apache.spark.rdd.RDD[(Any, (Int, Any, String, Int, Int))] val empHirearchyDF = empHirearchyExtDF.join(empDF , empDF.col("emp_id") === empHirearchyExtDF.col("emp_id_pk")).selectExpr("emp_id","first_name","last_name","title","mgr_id","level","root","path","iscyclic","isleaf")* *Scala code:* import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import scala.util.hashing.MurmurHash3 ... ... // call the function val empHirearchyExtDF = calcTopLevelHierarcy(empVertexDF,empEdgeDF) .map{ case(pk,(level,root,path,iscyclic,isleaf)) => (pk.asInstanceOf[String],level,root.asInstanceOf[String],path,iscyclic,isleaf)} .toDF("emp_id_pk","level","root","path","iscyclic","isleaf").cache() // extend original table with new columns. *Errors occur in the following line* val empHirearchyDF = empHirearchyExtDF.join(empDF , empDF.col("emp_id") === empHirearchyExtDF.col("emp_id_pk")).selectExpr("emp_id","first_name","last_name","title","mgr_id","level","root","path","iscyclic","isleaf") ... ... The errors occur during the execution of resultstage. I have found in the spark shell that the action 'col' is available for the data frame empDF *scala> empDF.* != ## + -> == agg alias apply as asInstanceOf cache checkpoint coalesce *col * colRegex collect collectAsList columns Any insight on the issue will be appreciated. Regards -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Streaming AVRO data in console: java.lang.ArrayIndexOutOfBoundsException
Hi I am trying to stream Kafka topic (in AVRO format) in the console and for that i have loaded the avro data from kafka topic in the data-frame but when try to stream in the console i am getting following error. *scala>* val records = spark. readStream. format("kafka"). option("kafka.bootstrap.servers", "broker1:9093"). option("subscribe", "PERSON"). option("startingOffsets", "earliest"). load() records: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] *scala>* val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("/home/spark/person.avsc"))) jsonFormatSchema: String = "{ "type": "record", "name": "Person", "namespace": "io.confluent.connect.avro", "fields": [ ... ... *scala>* val output = records.select(from_avro(col("value"),jsonFormatSchema).as("person")) output: org.apache.spark.sql.DataFrame = [person: struct] *scala>* .select("icxsession.*") res15: org.apache.spark.sql.DataFrame = [SESSION_ID: bigint, VERSION_STARTSCN: bigint ... 46 more fields] *Error occurs here:* *scala>* output.writeStream .format("console") .outputMode("append") .start() .awaitTermination() 20/08/10 01:14:24 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 20, workstation.com, executor 2): *java.lang.ArrayIndexOutOfBoundsException: 1405994075* at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [Stage 5:> (0 + 1) / 1]20/08/10 01:14:25 ERROR scheduler.TaskSetManager: Task 0 in stage 5.0 failed 4 times; aborting job 20/08/10 01:14:25 ERROR v2.WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@488b8521 is aborting. 20/08/10 01:14:25 ERROR v2.WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@488b8521 aborted. *20/08/10 01:14:25 ERROR streaming.MicroBatchExecution: Query [id = 5e8ffd55-fb54-45d1-8255-56ba810c1f51, runId = 1b7245de-de96-43e7-98ef-8bc62a6f697e] terminated with error org.apache.spark.SparkException: Writing job aborted.* at org.apache.s
error: object functions is not a member of package org.apache.spark.sql.avro
Hi I am getting the following error while trying to import the package org.apache.spark.sql.avro.functions._ in the scala shell: scala> import org.apache.spark.sql.avro.functions._ :23: error: object functions is not a member of package org.apache.spark.sql.avro import org.apache.spark.sql.avro.functions._ and i have invoked the spark-shell with the following command: # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0,com.databricks:spark-avro_2.11:4.0.0,org.apache.spark:spark-avro_2.11:2.4.0 Which package i have to passed as a parameter along with spark shell ? I am trying to implement few examples from here https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html Regards -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark streaming with Confluent kafka
Hi I am trying to stream confluent kafka topic in the spark shell. For that i have invoked spark shell using following command. # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files /home/spark/kafka_jaas.conf kafka_jaas.conf - KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX"; }; Readstream - scala> val df = spark. | readStream. | format("kafka"). | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092"). | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS"). | option("kafka.sasl.mechanisms", "PLAIN"). | option("kafka.security.protocol", "SASL_SSL"). | option("startingOffsets", "earliest"). | load. | select($"value".cast("string").alias("value")) df: org.apache.spark.sql.DataFrame = [value: string] Writestream -- scala> df.writeStream. | format("console"). | outputMode("append"). | trigger(Trigger.ProcessingTime("20 seconds")). | start 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort. res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741 scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612) at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76) at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567) at org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536) at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.a
Re: Spark streaming with Kafka
Hi I am able to correct the issue. The issue was due to wrong version of JAR file I have used. I have removed the these JAR files and copied correct version of JAR files and the error has gone away. Regards -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark streaming with Kafka
Hi I am trying to stream kafka topic from spark shell but i am getting the following error. I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM, *Java 1.8.0_212*) *[spark@hdp-dev ~]$ spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0* Ivy Default Cache set to: /home/spark/.ivy2/cache The jars for the packages stored in: /home/spark/.ivy2/jars :: loading settings :: url = jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0 confs: [default] found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central found org.apache.kafka#kafka-clients;2.4.1 in central found com.github.luben#zstd-jni;1.4.4-3 in central found org.lz4#lz4-java;1.7.1 in central found org.xerial.snappy#snappy-java;1.1.7.5 in central found org.slf4j#slf4j-api;1.7.30 in central found org.spark-project.spark#unused;1.0.0 in central found org.apache.commons#commons-pool2;2.6.2 in central :: resolution report :: resolve 502ms :: artifacts dl 10ms :: modules in use: com.github.luben#zstd-jni;1.4.4-3 from central in [default] org.apache.commons#commons-pool2;2.6.2 from central in [default] org.apache.kafka#kafka-clients;2.4.1 from central in [default] org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in [default] org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from central in [default] org.lz4#lz4-java;1.7.1 from central in [default] org.slf4j#slf4j-api;1.7.30 from central in [default] org.spark-project.spark#unused;1.0.0 from central in [default] org.xerial.snappy#snappy-java;1.1.7.5 from central in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 9 | 0 | 0 | 0 || 9 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226 confs: [default] 0 artifacts copied, 9 already retrieved (0kB/13ms) Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hdp-dev.infodetics.com:4040 Spark context available as 'sc' (master = yarn, app id = application_1593620640299_0015). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. scala> val df = spark. | readStream. | format("kafka"). | option("kafka.bootstrap.servers", "XXX"). | option("subscribe", "XXX"). | option("kafka.sasl.mechanisms", "XXX"). | option("kafka.security.protocol", "XXX"). | option("kafka.sasl.username","XXX"). | option("kafka.sasl.password", "XXX"). | option("startingOffsets", "earliest"). | load java.lang.AbstractMethodError: Method org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType; is abstract at org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215) ... 57 elided Looking forward for a response. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark streaming with Kafka
HiI am trying to stream kafka topic from spark shell but i am getting the following error. I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM, *Java 1.8.0_212*)*[spark@hdp-dev ~]$ spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*Ivy Default Cache set to: /home/spark/.ivy2/cacheThe jars for the packages stored in: /home/spark/.ivy2/jars:: loading settings :: url = jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlorg.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency:: resolving dependencies :: org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0 confs: [default]found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in centralfound org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central found org.apache.kafka#kafka-clients;2.4.1 in centralfound com.github.luben#zstd-jni;1.4.4-3 in centralfound org.lz4#lz4-java;1.7.1 in centralfound org.xerial.snappy#snappy-java;1.1.7.5 in centralfound org.slf4j#slf4j-api;1.7.30 in centralfound org.spark-project.spark#unused;1.0.0 in centralfound org.apache.commons#commons-pool2;2.6.2 in central:: resolution report :: resolve 502ms :: artifacts dl 10ms:: modules in use: com.github.luben#zstd-jni;1.4.4-3 from central in [default] org.apache.commons#commons-pool2;2.6.2 from central in [default] org.apache.kafka#kafka-clients;2.4.1 from central in [default] org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in [default] org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from central in [default]org.lz4#lz4-java;1.7.1 from central in [default] org.slf4j#slf4j-api;1.7.30 from central in [default] org.spark-project.spark#unused;1.0.0 from central in [default] org.xerial.snappy#snappy-java;1.1.7.5 from central in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 9 | 0 | 0 | 0 || 9 | 0 | -:: retrieving :: org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226 confs: [default]0 artifacts copied, 9 already retrieved (0kB/13ms)Setting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Spark context Web UI available at http://hdp-dev.infodetics.com:4040Spark context available as 'sc' (master = yarn, app id = application_1593620640299_0015).Spark session available as 'spark'.Welcome to __ / __/__ ___ _/ /___\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)Type in expressions to have them evaluated.Type :help for more information.scala> val df = spark. | readStream. | format("kafka"). | option("kafka.bootstrap.servers", "XXX"). | option("subscribe", "XXX"). | option("kafka.sasl.mechanisms", "XXX"). | option("kafka.security.protocol", "XXX"). | option("kafka.sasl.username","XXX"). | option("kafka.sasl.password", "XXX"). | option("startingOffsets", "earliest"). | loadjava.lang.AbstractMethodError: Method org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType; is abstract at org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215) ... 57 elidedLooking forward for a response. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/