divijvaidya commented on code in PR #15277: URL: https://github.com/apache/kafka/pull/15277#discussion_r1472892693
########## gradle/dependencies.gradle: ########## @@ -234,6 +227,7 @@ libs += [ kafkaStreams_34: "org.apache.kafka:kafka-streams:$versions.kafka_34", kafkaStreams_35: "org.apache.kafka:kafka-streams:$versions.kafka_35", kafkaStreams_36: "org.apache.kafka:kafka-streams:$versions.kafka_36", + kerbSimpleKDC: "org.apache.kerby:kerb-simplekdc:2.0.3", Review Comment: when we are introducing a new dependency, we need to add it to https://github.com/apache/kafka/blob/cfc8257479c37e02344689405954b01fcc8f24f7/LICENSE-binary#L205 Similarly, we need to modify the version of apacheda and apacheds specified there. Also, please move the version to `versions` dictionary above. ########## core/src/test/scala/kafka/security/minikdc/MiniKdcTest.scala: ########## @@ -18,8 +18,7 @@ package kafka.security.minikdc import java.util.Properties - -import kafka.utils.TestUtils +import kafka.utils.{TestUtils} Review Comment: nit Is this change required? ########## core/src/test/scala/kafka/security/minikdc/MiniKdc.scala: ########## @@ -118,122 +101,39 @@ class MiniKdc(config: Properties, workDir: File) extends Logging { throw new RuntimeException("KDC already started") if (closed) throw new RuntimeException("KDC is closed") - initDirectoryService() initKdcServer() initJvmKerberosConfig() } - private def initDirectoryService(): Unit = { - ds = new DefaultDirectoryService - ds.setInstanceLayout(new InstanceLayout(workDir)) - ds.setCacheService(new CacheService) - - // first load the schema - val instanceLayout = ds.getInstanceLayout - val schemaPartitionDirectory = new File(instanceLayout.getPartitionsDirectory, "schema") - val extractor = new DefaultSchemaLdifExtractor(instanceLayout.getPartitionsDirectory) - extractor.extractOrCopy - - val loader = new LdifSchemaLoader(schemaPartitionDirectory) - val schemaManager = new DefaultSchemaManager(loader) - schemaManager.loadAllEnabled() - ds.setSchemaManager(schemaManager) - // Init the LdifPartition with schema - val schemaLdifPartition = new LdifPartition(schemaManager, ds.getDnFactory) - schemaLdifPartition.setPartitionPath(schemaPartitionDirectory.toURI) - - // The schema partition - val schemaPartition = new SchemaPartition(schemaManager) - schemaPartition.setWrappedPartition(schemaLdifPartition) - ds.setSchemaPartition(schemaPartition) - - val systemPartition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory) - systemPartition.setId("system") - systemPartition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, systemPartition.getId).toURI) - systemPartition.setSuffixDn(new Dn(ServerDNConstants.SYSTEM_DN)) - systemPartition.setSchemaManager(ds.getSchemaManager) - ds.setSystemPartition(systemPartition) - - ds.getChangeLog.setEnabled(false) - ds.setDenormalizeOpAttrsEnabled(true) - ds.addLast(new KeyDerivationInterceptor) - - // create one partition - val orgName = config.getProperty(MiniKdc.OrgName).toLowerCase(Locale.ENGLISH) - val orgDomain = config.getProperty(MiniKdc.OrgDomain).toLowerCase(Locale.ENGLISH) - val partition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory) - partition.setId(orgName) - partition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, orgName).toURI) - val dn = new Dn(s"dc=$orgName,dc=$orgDomain") - partition.setSuffixDn(dn) - ds.addPartition(partition) - - // indexes - val indexedAttributes = Set[Index[_, String]]( - new JdbmIndex[Entry]("objectClass", false), - new JdbmIndex[Entry]("dc", false), - new JdbmIndex[Entry]("ou", false) - ).asJava - partition.setIndexedAttributes(indexedAttributes) - - // And start the ds - ds.setInstanceId(config.getProperty(MiniKdc.Instance)) - ds.setShutdownHookEnabled(false) - ds.startup() - - // context entry, after ds.startup() - val entry = ds.newEntry(dn) - entry.add("objectClass", "top", "domain") - entry.add("dc", orgName) - ds.getAdminSession.add(entry) - } - private def initKdcServer(): Unit = { + val kdcConfig = new KdcConfig() + kdcConfig.setLong(KdcConfigKey.MAXIMUM_RENEWABLE_LIFETIME, config.getProperty(MiniKdc.MaxRenewableLifetime).toLong) + kdcConfig.setLong(KdcConfigKey.MAXIMUM_TICKET_LIFETIME, + config.getProperty(MiniKdc.MaxTicketLifetime).toLong) + kdcConfig.setString(KdcConfigKey.KDC_REALM, realm) + kdcConfig.setString(KdcConfigKey.KDC_HOST, host) + kdcConfig.setBoolean(KdcConfigKey.PA_ENC_TIMESTAMP_REQUIRED, false) + kdcConfig.setString(KdcConfigKey.KDC_SERVICE_NAME, config.getProperty(MiniKdc.Instance)) + kdc = new SimpleKdcServer(kdcConfig, new BackendConfig) + kdc.setWorkDir(workDir) + if (port == 0) + _port = NetworkUtil.getServerPort - def addInitialEntriesToDirectoryService(bindAddress: String): Unit = { - val map = Map ( - "0" -> orgName.toLowerCase(Locale.ENGLISH), - "1" -> orgDomain.toLowerCase(Locale.ENGLISH), - "2" -> orgName.toUpperCase(Locale.ENGLISH), - "3" -> orgDomain.toUpperCase(Locale.ENGLISH), - "4" -> bindAddress - ) - val reader = new BufferedReader(new InputStreamReader(MiniKdc.getResourceAsStream("minikdc.ldiff"))) - try { - var line: String = null - val builder = new StringBuilder - while ({line = reader.readLine(); line != null}) - builder.append(line).append("\n") - addEntriesToDirectoryService(StrSubstitutor.replace(builder, map.asJava)) - } - finally CoreUtils.swallow(reader.close(), this) - } - - val bindAddress = config.getProperty(MiniKdc.KdcBindAddress) - addInitialEntriesToDirectoryService(bindAddress) - - val kerberosConfig = new KerberosConfig - kerberosConfig.setMaximumRenewableLifetime(config.getProperty(MiniKdc.MaxRenewableLifetime).toLong) - kerberosConfig.setMaximumTicketLifetime(config.getProperty(MiniKdc.MaxTicketLifetime).toLong) - kerberosConfig.setSearchBaseDn(s"dc=$orgName,dc=$orgDomain") - kerberosConfig.setPaEncTimestampRequired(false) - kdc = new KdcServer(kerberosConfig) - kdc.setDirectoryService(ds) - - // transport val transport = config.getProperty(MiniKdc.Transport) - val absTransport = transport.trim match { - case "TCP" => new TcpTransport(bindAddress, port, 3, 50) - case "UDP" => new UdpTransport(port) + transport.trim match { + case "TCP" => + kdc.setKdcTcpPort(port) + kdc.setAllowUdp(false) + kdc.setAllowTcp(true) + case "UDP" => + kdc.setKdcUdpPort(port) + kdc.setAllowTcp(false) + kdc.setAllowUdp(true) case _ => throw new IllegalArgumentException(s"Invalid transport: $transport") } - kdc.addTransports(absTransport) - kdc.setServiceName(config.getProperty(MiniKdc.Instance)) - kdc.start() - // if using ephemeral port, update port number for binding Review Comment: please port this comment to new implementation. this is useful ########## core/src/test/scala/kafka/security/minikdc/MiniKdc.scala: ########## @@ -19,38 +19,22 @@ package kafka.security.minikdc import java.io._ -import java.net.InetSocketAddress import java.nio.charset.StandardCharsets import java.nio.file.Files import java.text.MessageFormat import java.util.{Locale, Properties, UUID} - import kafka.utils.{CoreUtils, Exit, Logging} +import org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory import scala.jdk.CollectionConverters._ -import org.apache.commons.lang.text.StrSubstitutor -import org.apache.directory.api.ldap.model.entry.{DefaultEntry, Entry} -import org.apache.directory.api.ldap.model.ldif.LdifReader -import org.apache.directory.api.ldap.model.name.Dn -import org.apache.directory.api.ldap.schema.extractor.impl.DefaultSchemaLdifExtractor -import org.apache.directory.api.ldap.schema.loader.LdifSchemaLoader -import org.apache.directory.api.ldap.schema.manager.impl.DefaultSchemaManager -import org.apache.directory.server.constants.ServerDNConstants -import org.apache.directory.server.core.DefaultDirectoryService -import org.apache.directory.server.core.api.{CacheService, DirectoryService, InstanceLayout} -import org.apache.directory.server.core.api.schema.SchemaPartition -import org.apache.directory.server.core.kerberos.KeyDerivationInterceptor -import org.apache.directory.server.core.partition.impl.btree.jdbm.{JdbmIndex, JdbmPartition} -import org.apache.directory.server.core.partition.ldif.LdifPartition -import org.apache.directory.server.kerberos.KerberosConfig -import org.apache.directory.server.kerberos.kdc.KdcServer -import org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory -import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry} -import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport} -import org.apache.directory.server.xdbm.Index -import org.apache.directory.shared.kerberos.KerberosTime +import org.apache.kerby.kerberos.kerb.KrbException +import org.apache.kerby.kerberos.kerb.identity.backend.BackendConfig +import org.apache.kerby.kerberos.kerb.server.{KdcConfig, KdcConfigKey, SimpleKdcServer} import org.apache.kafka.common.utils.{Java, Utils} - +import org.apache.kerby.kerberos.kerb.`type`.KerberosTime +import org.apache.kerby.kerberos.kerb.`type`.base.{EncryptionKey, PrincipalName} +import org.apache.kerby.kerberos.kerb.keytab.{Keytab, KeytabEntry} +import org.apache.kerby.util.NetworkUtil /** * Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone Review Comment: this java doc needs to change I believe? ########## gradle/dependencies.gradle: ########## @@ -88,8 +88,8 @@ else // See https://issues.apache.org/jira/browse/KAFKA-12622 for steps to verify the LICENSE-binary file is correct. versions += [ activation: "1.1.1", - apacheda: "1.0.2", - apacheds: "2.0.0-M24", + apacheda: "2.1.5", + apacheds: "2.0.0.AM27", Review Comment: your PR description mentions that you removed apacheDS but we seem to be using it. Does the description need an update? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org