Hello,
my spark client program is as following:
import org.apache.spark.sql.SparkSession
object Sparkafka {
def main(args:Array[String]):Unit = {
val spark = SparkSession.builder.appName("Mykafka").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "dh-cn-shenzhen.aliyuncs.com:9092")
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
.option("group.id", "test_project")
.option("subscribe", "xxxx")
.load()
import spark.implicits._
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val myCount = df.groupBy("key").count()
val query = myCount.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
This is the JAAS file for authentication:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="***"
password="***";
};
And I submitted the job as:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 \
--driver-java-options
"-Djava.security.auth.login.config=/home/pyh/kafka/jaas.conf" \
--conf
spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/pyh/kafka/jaas.conf
\
--class "Sparkafka" \
--master local[2] \
target/scala-2.12/sparkafka_2.12-0.1.jar
The error always shows:
22/07/28 13:16:11 WARN NetworkClient: [Consumer
clientId=consumer-spark-kafka-source-62c48917-7df1-4513-89c0-938390d89257--1440940498-driver-0-1,
groupId=spark-kafka-source-62c48917-7df1-4513-89c0-938390d89257--1440940498-driver-0]
Bootstrap broker dh-cn-shenzhen.aliyuncs.com:9092 (id: -1 rack: null)
disconnected
Spark 3.3.0 in local deployment.
Can you help? thanks
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]