This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 3cc47a178a6 Revert "[SPARK-36837][BUILD] Upgrade Kafka to 3.1.0" 3cc47a178a6 is described below commit 3cc47a178a68e957cde70fc1c3f10dbcca9bf84b Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Thu May 12 10:55:21 2022 -0700 Revert "[SPARK-36837][BUILD] Upgrade Kafka to 3.1.0" ### What changes were proposed in this pull request? This PR aims to revert commit 973ea0f06e72ab64574cbf00e095922a3415f864 from `branch-3.3` to exclude it from Apache Spark 3.3 scope. ### Why are the changes needed? SPARK-36837 tried to use Apache Kafka 3.1.0 at Apache Spark 3.3.0 and initially wanted to upgrade to Apache Kafka 3.3.1 before the official release. However, we can use the stable Apache Kafka 2.8.1 at Spark 3.3.0 and wait for more proven versions, Apache Kafka 3.2.x or 3.3.x. Apache Kafka 3.2.0 vote is already passed and will arrive. - https://lists.apache.org/thread/9k5sysvchg98lchv2rvvvq6xhpgk99cc Apache Kafka 3.3.0 release discussion is started too. - https://lists.apache.org/thread/cmol5bcf011s1xl91rt4ylb1dgz2vb1r ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. Closes #36517 from dongjoon-hyun/SPARK-36837-REVERT. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 7 ++++--- .../spark/streaming/kafka010/KafkaRDDSuite.scala | 20 ++++++-------------- .../spark/streaming/kafka010/KafkaTestUtils.scala | 3 +-- pom.xml | 2 +- 4 files changed, 12 insertions(+), 20 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index c5d2a99d156..058563dfa16 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -44,7 +44,6 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT} import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.utils.SystemTime -import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.zookeeper.server.auth.SASLAuthenticationProvider import org.scalatest.Assertions._ @@ -267,7 +266,7 @@ class KafkaTestUtils( // Get the actual zookeeper binding port zkPort = zookeeper.actualPort zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false, zkSessionTimeout, - zkConnectionTimeout, 1, new SystemTime(), "test", new ZKClientConfig) + zkConnectionTimeout, 1, new SystemTime()) zkReady = true } @@ -489,7 +488,9 @@ class KafkaTestUtils( protected def brokerConfiguration: Properties = { val props = new Properties() props.put("broker.id", "0") - props.put("listeners", s"PLAINTEXT://127.0.0.1:$brokerPort") + props.put("host.name", "127.0.0.1") + props.put("advertised.host.name", "127.0.0.1") + props.put("port", brokerPort.toString) props.put("log.dir", Utils.createTempDir().getAbsolutePath) props.put("zookeeper.connect", zkAddress) props.put("zookeeper.connection.timeout.ms", "60000") diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 9c57663b3d8..b9ef16fb58c 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -21,17 +21,15 @@ import java.{ util => ju } import java.io.File import scala.collection.JavaConverters._ -import scala.concurrent.duration._ import scala.util.Random -import kafka.log.{CleanerConfig, LogCleaner, LogConfig, UnifiedLog} +import kafka.log.{CleanerConfig, Log, LogCleaner, LogConfig, ProducerStateManager} import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils.Pool import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark._ import org.apache.spark.scheduler.ExecutorCacheTaskLocation @@ -86,7 +84,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]): Unit = { val mockTime = new MockTime() - val logs = new Pool[TopicPartition, UnifiedLog]() + val logs = new Pool[TopicPartition, Log]() val logDir = kafkaTestUtils.brokerLogDir val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() @@ -95,7 +93,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f)) val logDirFailureChannel = new LogDirFailureChannel(1) val topicPartition = new TopicPartition(topic, partition) - val log = UnifiedLog( + val log = new Log( dir, LogConfig(logProps), 0L, @@ -105,10 +103,9 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { mockTime, Int.MaxValue, Int.MaxValue, - logDirFailureChannel, - lastShutdownClean = false, - topicId = None, - keepPartitionMetadataFile = false + topicPartition, + new ProducerStateManager(topicPartition, dir), + logDirFailureChannel ) messages.foreach { case (k, v) => val record = new SimpleRecord(k.getBytes, v.getBytes) @@ -204,11 +201,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { sc, kafkaParams, offsetRanges, preferredHosts ).map(m => m.key -> m.value) - // To make it sure that the compaction happens - eventually(timeout(20.second), interval(1.seconds)) { - val dir = new File(kafkaTestUtils.brokerLogDir, topic + "-0") - assert(dir.listFiles().exists(_.getName.endsWith(".deleted"))) - } val received = rdd.collect.toSet assert(received === compactedMessages.toSet) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index dd8d66f1fc0..0783e591def 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -35,7 +35,6 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.utils.{Time => KTime} -import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.{SparkConf, SparkException} @@ -107,7 +106,7 @@ private[kafka010] class KafkaTestUtils extends Logging { // Get the actual zookeeper binding port zkPort = zookeeper.actualPort zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false, zkSessionTimeout, - zkConnectionTimeout, 1, KTime.SYSTEM, "test", new ZKClientConfig) + zkConnectionTimeout, 1, KTime.SYSTEM) admClient = new AdminZkClient(zkClient) zkReady = true } diff --git a/pom.xml b/pom.xml index c91167d8de6..34c8354a3d4 100644 --- a/pom.xml +++ b/pom.xml @@ -128,7 +128,7 @@ <!-- Version used for internal directory structure --> <hive.version.short>2.3</hive.version.short> <!-- note that this should be compatible with Kafka brokers version 0.10 and up --> - <kafka.version>3.1.0</kafka.version> + <kafka.version>2.8.1</kafka.version> <!-- After 10.15.1.3, the minimum required version is JDK9 --> <derby.version>10.14.2.0</derby.version> <parquet.version>1.12.2</parquet.version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org