This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 3cc47a178a6 Revert "[SPARK-36837][BUILD] Upgrade Kafka to 3.1.0"
3cc47a178a6 is described below

commit 3cc47a178a68e957cde70fc1c3f10dbcca9bf84b
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Thu May 12 10:55:21 2022 -0700

    Revert "[SPARK-36837][BUILD] Upgrade Kafka to 3.1.0"
    
    ### What changes were proposed in this pull request?
    
    This PR aims to revert commit 973ea0f06e72ab64574cbf00e095922a3415f864 from 
`branch-3.3` to exclude it from Apache Spark 3.3 scope.
    
    ### Why are the changes needed?
    
    SPARK-36837 tried to use Apache Kafka 3.1.0 at Apache Spark 3.3.0 and 
initially wanted to upgrade to Apache Kafka 3.3.1 before the official release. 
However, we can use the stable Apache Kafka 2.8.1 at Spark 3.3.0 and wait for 
more proven versions, Apache Kafka 3.2.x or 3.3.x.
    
    Apache Kafka 3.2.0 vote is already passed and will arrive.
    - https://lists.apache.org/thread/9k5sysvchg98lchv2rvvvq6xhpgk99cc
    
    Apache Kafka 3.3.0 release discussion is started too.
    - https://lists.apache.org/thread/cmol5bcf011s1xl91rt4ylb1dgz2vb1r
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #36517 from dongjoon-hyun/SPARK-36837-REVERT.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala   |  7 ++++---
 .../spark/streaming/kafka010/KafkaRDDSuite.scala     | 20 ++++++--------------
 .../spark/streaming/kafka010/KafkaTestUtils.scala    |  3 +--
 pom.xml                                              |  2 +-
 4 files changed, 12 insertions(+), 20 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index c5d2a99d156..058563dfa16 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -44,7 +44,6 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, 
SASL_PLAINTEXT}
 import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.utils.SystemTime
-import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 import org.apache.zookeeper.server.auth.SASLAuthenticationProvider
 import org.scalatest.Assertions._
@@ -267,7 +266,7 @@ class KafkaTestUtils(
     // Get the actual zookeeper binding port
     zkPort = zookeeper.actualPort
     zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false, 
zkSessionTimeout,
-      zkConnectionTimeout, 1, new SystemTime(), "test", new ZKClientConfig)
+      zkConnectionTimeout, 1, new SystemTime())
     zkReady = true
   }
 
@@ -489,7 +488,9 @@ class KafkaTestUtils(
   protected def brokerConfiguration: Properties = {
     val props = new Properties()
     props.put("broker.id", "0")
-    props.put("listeners", s"PLAINTEXT://127.0.0.1:$brokerPort")
+    props.put("host.name", "127.0.0.1")
+    props.put("advertised.host.name", "127.0.0.1")
+    props.put("port", brokerPort.toString)
     props.put("log.dir", Utils.createTempDir().getAbsolutePath)
     props.put("zookeeper.connect", zkAddress)
     props.put("zookeeper.connection.timeout.ms", "60000")
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index 9c57663b3d8..b9ef16fb58c 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -21,17 +21,15 @@ import java.{ util => ju }
 import java.io.File
 
 import scala.collection.JavaConverters._
-import scala.concurrent.duration._
 import scala.util.Random
 
-import kafka.log.{CleanerConfig, LogCleaner, LogConfig, UnifiedLog}
+import kafka.log.{CleanerConfig, Log, LogCleaner, LogConfig, 
ProducerStateManager}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.Pool
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
SimpleRecord}
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
 
 import org.apache.spark._
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
@@ -86,7 +84,7 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
   private def compactLogs(topic: String, partition: Int,
       messages: Array[(String, String)]): Unit = {
     val mockTime = new MockTime()
-    val logs = new Pool[TopicPartition, UnifiedLog]()
+    val logs = new Pool[TopicPartition, Log]()
     val logDir = kafkaTestUtils.brokerLogDir
     val dir = new File(logDir, topic + "-" + partition)
     dir.mkdirs()
@@ -95,7 +93,7 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
java.lang.Float.valueOf(0.1f))
     val logDirFailureChannel = new LogDirFailureChannel(1)
     val topicPartition = new TopicPartition(topic, partition)
-    val log = UnifiedLog(
+    val log = new Log(
       dir,
       LogConfig(logProps),
       0L,
@@ -105,10 +103,9 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
       mockTime,
       Int.MaxValue,
       Int.MaxValue,
-      logDirFailureChannel,
-      lastShutdownClean = false,
-      topicId = None,
-      keepPartitionMetadataFile = false
+      topicPartition,
+      new ProducerStateManager(topicPartition, dir),
+      logDirFailureChannel
     )
     messages.foreach { case (k, v) =>
       val record = new SimpleRecord(k.getBytes, v.getBytes)
@@ -204,11 +201,6 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
       sc, kafkaParams, offsetRanges, preferredHosts
     ).map(m => m.key -> m.value)
 
-    // To make it sure that the compaction happens
-    eventually(timeout(20.second), interval(1.seconds)) {
-      val dir = new File(kafkaTestUtils.brokerLogDir, topic + "-0")
-      assert(dir.listFiles().exists(_.getName.endsWith(".deleted")))
-    }
     val received = rdd.collect.toSet
     assert(received === compactedMessages.toSet)
 
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index dd8d66f1fc0..0783e591def 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -35,7 +35,6 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.utils.{Time => KTime}
-import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 
 import org.apache.spark.{SparkConf, SparkException}
@@ -107,7 +106,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
     // Get the actual zookeeper binding port
     zkPort = zookeeper.actualPort
     zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false, 
zkSessionTimeout,
-      zkConnectionTimeout, 1, KTime.SYSTEM, "test", new ZKClientConfig)
+      zkConnectionTimeout, 1, KTime.SYSTEM)
     admClient = new AdminZkClient(zkClient)
     zkReady = true
   }
diff --git a/pom.xml b/pom.xml
index c91167d8de6..34c8354a3d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,7 +128,7 @@
     <!-- Version used for internal directory structure -->
     <hive.version.short>2.3</hive.version.short>
     <!-- note that this should be compatible with Kafka brokers version 0.10 
and up -->
-    <kafka.version>3.1.0</kafka.version>
+    <kafka.version>2.8.1</kafka.version>
     <!-- After 10.15.1.3, the minimum required version is JDK9 -->
     <derby.version>10.14.2.0</derby.version>
     <parquet.version>1.12.2</parquet.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to