anishshri-db commented on code in PR #41791: URL: https://github.com/apache/spark/pull/41791#discussion_r1251219532
########## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala: ########## @@ -18,30 +18,108 @@ package org.apache.spark.sql.kafka010 import java.util.Locale - -import scala.collection.JavaConverters._ +import java.util.concurrent.ExecutionException import org.mockito.Mockito.{mock, when} -import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite, TestUtils} +import org.apache.spark.sql.Row import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.streaming.StreamingQueryException +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap -class KafkaSourceProviderSuite extends SparkFunSuite { +class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession { + import scala.collection.JavaConverters._ private val expected = "1111" + protected var testUtils: KafkaTestUtils = _ + override protected def afterEach(): Unit = { - SparkEnv.set(null) super.afterEach() } + override protected def beforeAll(): Unit = { + super.beforeAll() + testUtils = new KafkaTestUtils + testUtils.setup() + } + + override protected def afterAll(): Unit = { + try { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + } + } finally { + super.afterAll() + } + } + test("batch mode - options should be handled as case-insensitive") { verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, batch => { assert(expected.toLong === batch.pollTimeoutMs) }) } + /* if testType contains source/sink, kafka is used as a source/sink option respectively + if testType contains stream/batch, it is used either in readStream/read or writeStream/write */ + Seq("source and stream", "sink and stream", + "source and batch", "sink and batch").foreach { testType => + test(s"test MSK IAM auth on kafka '$testType' side") { + val options: Map[String, String] = Map( + "kafka.bootstrap.servers" -> testUtils.brokerAddress, + "subscribe" -> "msk-123", + "startingOffsets" -> "earliest", + "kafka.sasl.mechanism" -> "AWS_MSK_IAM", + "kafka.sasl.jaas.config" -> + "software.amazon.msk.auth.iam.IAMLoginModule required;", + "kafka.security.protocol" -> "SASL_SSL", + "kafka.sasl.client.callback.handler.class" -> + "software.amazon.msk.auth.iam.IAMClientCallbackHandler" + ) + + var e: Throwable = null + if (testType.contains("stream")) { + if (testType.contains("source")) { + e = intercept[StreamingQueryException] { + spark.readStream.format("kafka").options(options).load() + .writeStream.format("console").start().processAllAvailable() + } + TestUtils.assertExceptionMsg(e, "Timed out") Review Comment: Can you add a comment here ? We expect a time out because the given servers/broker don't exist ? How does it exercise the IAM path ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org