divijvaidya commented on code in PR #15277:
URL: https://github.com/apache/kafka/pull/15277#discussion_r1472892693


##########
gradle/dependencies.gradle:
##########
@@ -234,6 +227,7 @@ libs += [
   kafkaStreams_34: "org.apache.kafka:kafka-streams:$versions.kafka_34",
   kafkaStreams_35: "org.apache.kafka:kafka-streams:$versions.kafka_35",
   kafkaStreams_36: "org.apache.kafka:kafka-streams:$versions.kafka_36",
+  kerbSimpleKDC: "org.apache.kerby:kerb-simplekdc:2.0.3",

Review Comment:
   when we are introducing a new dependency, we need to add it to 
https://github.com/apache/kafka/blob/cfc8257479c37e02344689405954b01fcc8f24f7/LICENSE-binary#L205
   
   Similarly, we need to modify the version of apacheda and apacheds specified 
there.
   
   Also, please move the version to `versions` dictionary above.



##########
core/src/test/scala/kafka/security/minikdc/MiniKdcTest.scala:
##########
@@ -18,8 +18,7 @@
 package kafka.security.minikdc
 
 import java.util.Properties
-
-import kafka.utils.TestUtils
+import kafka.utils.{TestUtils}

Review Comment:
   nit
   Is this change required? 



##########
core/src/test/scala/kafka/security/minikdc/MiniKdc.scala:
##########
@@ -118,122 +101,39 @@ class MiniKdc(config: Properties, workDir: File) extends 
Logging {
       throw new RuntimeException("KDC already started")
     if (closed)
       throw new RuntimeException("KDC is closed")
-    initDirectoryService()
     initKdcServer()
     initJvmKerberosConfig()
   }
 
-  private def initDirectoryService(): Unit = {
-    ds = new DefaultDirectoryService
-    ds.setInstanceLayout(new InstanceLayout(workDir))
-    ds.setCacheService(new CacheService)
-
-    // first load the schema
-    val instanceLayout = ds.getInstanceLayout
-    val schemaPartitionDirectory = new 
File(instanceLayout.getPartitionsDirectory, "schema")
-    val extractor = new 
DefaultSchemaLdifExtractor(instanceLayout.getPartitionsDirectory)
-    extractor.extractOrCopy
-
-    val loader = new LdifSchemaLoader(schemaPartitionDirectory)
-    val schemaManager = new DefaultSchemaManager(loader)
-    schemaManager.loadAllEnabled()
-    ds.setSchemaManager(schemaManager)
-    // Init the LdifPartition with schema
-    val schemaLdifPartition = new LdifPartition(schemaManager, ds.getDnFactory)
-    schemaLdifPartition.setPartitionPath(schemaPartitionDirectory.toURI)
-
-    // The schema partition
-    val schemaPartition = new SchemaPartition(schemaManager)
-    schemaPartition.setWrappedPartition(schemaLdifPartition)
-    ds.setSchemaPartition(schemaPartition)
-
-    val systemPartition = new JdbmPartition(ds.getSchemaManager, 
ds.getDnFactory)
-    systemPartition.setId("system")
-    systemPartition.setPartitionPath(new 
File(ds.getInstanceLayout.getPartitionsDirectory, systemPartition.getId).toURI)
-    systemPartition.setSuffixDn(new Dn(ServerDNConstants.SYSTEM_DN))
-    systemPartition.setSchemaManager(ds.getSchemaManager)
-    ds.setSystemPartition(systemPartition)
-
-    ds.getChangeLog.setEnabled(false)
-    ds.setDenormalizeOpAttrsEnabled(true)
-    ds.addLast(new KeyDerivationInterceptor)
-
-    // create one partition
-    val orgName = 
config.getProperty(MiniKdc.OrgName).toLowerCase(Locale.ENGLISH)
-    val orgDomain = 
config.getProperty(MiniKdc.OrgDomain).toLowerCase(Locale.ENGLISH)
-    val partition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory)
-    partition.setId(orgName)
-    partition.setPartitionPath(new 
File(ds.getInstanceLayout.getPartitionsDirectory, orgName).toURI)
-    val dn = new Dn(s"dc=$orgName,dc=$orgDomain")
-    partition.setSuffixDn(dn)
-    ds.addPartition(partition)
-
-    // indexes
-    val indexedAttributes = Set[Index[_, String]](
-      new JdbmIndex[Entry]("objectClass", false),
-      new JdbmIndex[Entry]("dc", false),
-      new JdbmIndex[Entry]("ou", false)
-    ).asJava
-    partition.setIndexedAttributes(indexedAttributes)
-
-    // And start the ds
-    ds.setInstanceId(config.getProperty(MiniKdc.Instance))
-    ds.setShutdownHookEnabled(false)
-    ds.startup()
-
-    // context entry, after ds.startup()
-    val entry = ds.newEntry(dn)
-    entry.add("objectClass", "top", "domain")
-    entry.add("dc", orgName)
-    ds.getAdminSession.add(entry)
-  }
-
   private def initKdcServer(): Unit = {
+    val kdcConfig = new KdcConfig()
+    kdcConfig.setLong(KdcConfigKey.MAXIMUM_RENEWABLE_LIFETIME, 
config.getProperty(MiniKdc.MaxRenewableLifetime).toLong)
+    kdcConfig.setLong(KdcConfigKey.MAXIMUM_TICKET_LIFETIME,
+      config.getProperty(MiniKdc.MaxTicketLifetime).toLong)
+    kdcConfig.setString(KdcConfigKey.KDC_REALM, realm)
+    kdcConfig.setString(KdcConfigKey.KDC_HOST, host)
+    kdcConfig.setBoolean(KdcConfigKey.PA_ENC_TIMESTAMP_REQUIRED, false)
+    kdcConfig.setString(KdcConfigKey.KDC_SERVICE_NAME, 
config.getProperty(MiniKdc.Instance))
+    kdc = new SimpleKdcServer(kdcConfig, new BackendConfig)
+    kdc.setWorkDir(workDir)
+    if (port == 0)
+      _port = NetworkUtil.getServerPort
 
-    def addInitialEntriesToDirectoryService(bindAddress: String): Unit = {
-      val map = Map (
-        "0" -> orgName.toLowerCase(Locale.ENGLISH),
-        "1" -> orgDomain.toLowerCase(Locale.ENGLISH),
-        "2" -> orgName.toUpperCase(Locale.ENGLISH),
-        "3" -> orgDomain.toUpperCase(Locale.ENGLISH),
-        "4" -> bindAddress
-      )
-      val reader = new BufferedReader(new 
InputStreamReader(MiniKdc.getResourceAsStream("minikdc.ldiff")))
-      try {
-        var line: String = null
-        val builder = new StringBuilder
-        while ({line = reader.readLine(); line != null})
-          builder.append(line).append("\n")
-        addEntriesToDirectoryService(StrSubstitutor.replace(builder, 
map.asJava))
-      }
-      finally CoreUtils.swallow(reader.close(), this)
-    }
-
-    val bindAddress = config.getProperty(MiniKdc.KdcBindAddress)
-    addInitialEntriesToDirectoryService(bindAddress)
-
-    val kerberosConfig = new KerberosConfig
-    
kerberosConfig.setMaximumRenewableLifetime(config.getProperty(MiniKdc.MaxRenewableLifetime).toLong)
-    
kerberosConfig.setMaximumTicketLifetime(config.getProperty(MiniKdc.MaxTicketLifetime).toLong)
-    kerberosConfig.setSearchBaseDn(s"dc=$orgName,dc=$orgDomain")
-    kerberosConfig.setPaEncTimestampRequired(false)
-    kdc = new KdcServer(kerberosConfig)
-    kdc.setDirectoryService(ds)
-
-    // transport
     val transport = config.getProperty(MiniKdc.Transport)
-    val absTransport = transport.trim match {
-      case "TCP" => new TcpTransport(bindAddress, port, 3, 50)
-      case "UDP" => new UdpTransport(port)
+    transport.trim match {
+      case "TCP" =>
+        kdc.setKdcTcpPort(port)
+        kdc.setAllowUdp(false)
+        kdc.setAllowTcp(true)
+      case "UDP" =>
+        kdc.setKdcUdpPort(port)
+        kdc.setAllowTcp(false)
+        kdc.setAllowUdp(true)
       case _ => throw new IllegalArgumentException(s"Invalid transport: 
$transport")
     }
-    kdc.addTransports(absTransport)
-    kdc.setServiceName(config.getProperty(MiniKdc.Instance))
-    kdc.start()
 
-    // if using ephemeral port, update port number for binding

Review Comment:
   please port this comment to new implementation. this is useful



##########
core/src/test/scala/kafka/security/minikdc/MiniKdc.scala:
##########
@@ -19,38 +19,22 @@
 package kafka.security.minikdc
 
 import java.io._
-import java.net.InetSocketAddress
 import java.nio.charset.StandardCharsets
 import java.nio.file.Files
 import java.text.MessageFormat
 import java.util.{Locale, Properties, UUID}
-
 import kafka.utils.{CoreUtils, Exit, Logging}
+import 
org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory
 
 import scala.jdk.CollectionConverters._
-import org.apache.commons.lang.text.StrSubstitutor
-import org.apache.directory.api.ldap.model.entry.{DefaultEntry, Entry}
-import org.apache.directory.api.ldap.model.ldif.LdifReader
-import org.apache.directory.api.ldap.model.name.Dn
-import 
org.apache.directory.api.ldap.schema.extractor.impl.DefaultSchemaLdifExtractor
-import org.apache.directory.api.ldap.schema.loader.LdifSchemaLoader
-import org.apache.directory.api.ldap.schema.manager.impl.DefaultSchemaManager
-import org.apache.directory.server.constants.ServerDNConstants
-import org.apache.directory.server.core.DefaultDirectoryService
-import org.apache.directory.server.core.api.{CacheService, DirectoryService, 
InstanceLayout}
-import org.apache.directory.server.core.api.schema.SchemaPartition
-import org.apache.directory.server.core.kerberos.KeyDerivationInterceptor
-import org.apache.directory.server.core.partition.impl.btree.jdbm.{JdbmIndex, 
JdbmPartition}
-import org.apache.directory.server.core.partition.ldif.LdifPartition
-import org.apache.directory.server.kerberos.KerberosConfig
-import org.apache.directory.server.kerberos.kdc.KdcServer
-import 
org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory
-import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry}
-import org.apache.directory.server.protocol.shared.transport.{TcpTransport, 
UdpTransport}
-import org.apache.directory.server.xdbm.Index
-import org.apache.directory.shared.kerberos.KerberosTime
+import org.apache.kerby.kerberos.kerb.KrbException
+import org.apache.kerby.kerberos.kerb.identity.backend.BackendConfig
+import org.apache.kerby.kerberos.kerb.server.{KdcConfig, KdcConfigKey, 
SimpleKdcServer}
 import org.apache.kafka.common.utils.{Java, Utils}
-
+import org.apache.kerby.kerberos.kerb.`type`.KerberosTime
+import org.apache.kerby.kerberos.kerb.`type`.base.{EncryptionKey, 
PrincipalName}
+import org.apache.kerby.kerberos.kerb.keytab.{Keytab, KeytabEntry}
+import org.apache.kerby.util.NetworkUtil
 /**
   * Mini KDC based on Apache Directory Server that can be embedded in tests or 
used from command line as a standalone

Review Comment:
   this java doc needs to change I believe?



##########
gradle/dependencies.gradle:
##########
@@ -88,8 +88,8 @@ else
 // See https://issues.apache.org/jira/browse/KAFKA-12622 for steps to verify 
the LICENSE-binary file is correct.
 versions += [
   activation: "1.1.1",
-  apacheda: "1.0.2",
-  apacheds: "2.0.0-M24",
+  apacheda: "2.1.5",
+  apacheds: "2.0.0.AM27",

Review Comment:
   your PR description mentions that you removed apacheDS but we seem to be 
using it. Does the description need an update?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to