Hello,

my spark client program is as following:

import org.apache.spark.sql.SparkSession

object Sparkafka {
    def main(args:Array[String]):Unit = {

      val spark = SparkSession.builder.appName("Mykafka").getOrCreate()

      val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "dh-cn-shenzhen.aliyuncs.com:9092")
      .option("security.protocol", "SASL_SSL")
      .option("sasl.mechanism", "PLAIN")
      .option("group.id", "test_project")
      .option("subscribe", "xxxx")
      .load()

      import spark.implicits._

      df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]

      val myCount = df.groupBy("key").count()

      val query = myCount.writeStream
      .outputMode("complete")
      .format("console")
      .start()

      query.awaitTermination()

  }
}

This is the JAAS file for authentication:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="***"
  password="***";
};

And I submitted the job as:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 \
        --driver-java-options
"-Djava.security.auth.login.config=/home/pyh/kafka/jaas.conf" \
        --conf
spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/pyh/kafka/jaas.conf
\
        --class "Sparkafka" \
        --master local[2] \
        target/scala-2.12/sparkafka_2.12-0.1.jar



The error always shows:

22/07/28 13:16:11 WARN NetworkClient: [Consumer
clientId=consumer-spark-kafka-source-62c48917-7df1-4513-89c0-938390d89257--1440940498-driver-0-1,
groupId=spark-kafka-source-62c48917-7df1-4513-89c0-938390d89257--1440940498-driver-0]
Bootstrap broker dh-cn-shenzhen.aliyuncs.com:9092 (id: -1 rack: null)
disconnected


Spark 3.3.0 in local deployment.

Can you help? thanks



---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to