The error is clear: Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
On Fri, 3 Jul 2020, 15:40 dwgw, <dwijadas...@gmail.com> wrote: > Hi > > I am trying to stream confluent kafka topic in the spark shell. For that i > have invoked spark shell using following command. > > # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 > --conf > > "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" > --driver-java-options > "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files > /home/spark/kafka_jaas.conf > > kafka_jaas.conf > ----------------- > > KafkaClient { > > org.apache.kafka.common.security.plain.PlainLoginModule required > username="XXX" > password="XXX"; > }; > > Readstream > ------------- > > scala> val df = spark. > | readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092"). > | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS"). > | option("kafka.sasl.mechanisms", "PLAIN"). > | option("kafka.security.protocol", "SASL_SSL"). > | option("startingOffsets", "earliest"). > | load. > | select($"value".cast("string").alias("value")) > df: org.apache.spark.sql.DataFrame = [value: string] > > Writestream > -------------- > > scala> df.writeStream. > | format("console"). > | outputMode("append"). > | trigger(Trigger.ProcessingTime("20 seconds")). > | start > 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary > checkpoint location created which is deleted normally when the query didn't > fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required > to delete it under any circumstances, please set > spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to > know deleting temp checkpoint folder is best effort. > res0: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741 > > scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in > attempt 1 getting Kafka offsets: > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > at > > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612) > at > > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298) > at > > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151) > at scala.Option.getOrElse(Option.scala:189) > at > > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148) > at > > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378) > at scala.Option.getOrElse(Option.scala:189) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378) > at > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) > at > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) > at > > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368) > at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:598) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:364) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:208) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) > at > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) > at > > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) > at > > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org > $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) > at > > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) > Caused by: org.apache.kafka.common.KafkaException: > java.lang.IllegalArgumentException: No serviceName defined in either JAAS > or > Kafka config > at > > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158) > at > > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) > at > > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) > at > > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741) > ... 43 more > Caused by: java.lang.IllegalArgumentException: No serviceName defined in > either JAAS or Kafka config > at > > org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301) > > The error No serviceName defined in either JAAS or Kafka config is > misleading because, i am not using kerberos rather .PlainLoginModule And in > this case it is not required to have serviceName parameter in either JAAS > or > Kafka config. > > To make my point, i can run kafka avro console producer/consumer with the > following client properties (without servicename): > > ssl.endpoint.identification.algorithm=https > sasl.mechanism=PLAIN > request.timeout.ms=200000 > retry.backoff.ms=5000 > sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule > required username="XXX" > password="XXX"; > security.protocol=SASL_SSL > > I am using: > Spark 3.0.0 > Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212) > spark-sql-kafka-0-10_2.12:3.0.0 > > What am i doing wrong ? > > Regards > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >