http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java new file mode 100644 index 0000000..c1789db --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.kerberos; + +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * An encoding of a rule for translating kerberos names. + */ +class KerberosRule { + + /** + * A pattern that matches a string without '$' and then a single + * parameter with $n. + */ + private static final Pattern PARAMETER_PATTERN = Pattern.compile("([^$]*)(\\$(\\d*))?"); + + /** + * A pattern that recognizes simple/non-simple names. + */ + private static final Pattern NON_SIMPLE_PATTERN = Pattern.compile("[/@]"); + + private final String defaultRealm; + private final boolean isDefault; + private final int numOfComponents; + private final String format; + private final Pattern match; + private final Pattern fromPattern; + private final String toPattern; + private final boolean repeat; + + KerberosRule(String defaultRealm) { + this.defaultRealm = defaultRealm; + isDefault = true; + numOfComponents = 0; + format = null; + match = null; + fromPattern = null; + toPattern = null; + repeat = false; + } + + KerberosRule(String defaultRealm, int numOfComponents, String format, String match, String fromPattern, + String toPattern, boolean repeat) { + this.defaultRealm = defaultRealm; + isDefault = false; + this.numOfComponents = numOfComponents; + this.format = format; + this.match = match == null ? null : Pattern.compile(match); + this.fromPattern = + fromPattern == null ? null : Pattern.compile(fromPattern); + this.toPattern = toPattern; + this.repeat = repeat; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + if (isDefault) { + buf.append("DEFAULT"); + } else { + buf.append("RULE:["); + buf.append(numOfComponents); + buf.append(':'); + buf.append(format); + buf.append(']'); + if (match != null) { + buf.append('('); + buf.append(match); + buf.append(')'); + } + if (fromPattern != null) { + buf.append("s/"); + buf.append(fromPattern); + buf.append('/'); + buf.append(toPattern); + buf.append('/'); + if (repeat) { + buf.append('g'); + } + } + } + return buf.toString(); + } + + /** + * Replace the numbered parameters of the form $n where n is from 1 to + * the length of params. Normal text is copied directly and $n is replaced + * by the corresponding parameter. + * @param format the string to replace parameters again + * @param params the list of parameters + * @return the generated string with the parameter references replaced. + * @throws KerberosNameParser.BadFormatString + */ + static String replaceParameters(String format, + String[] params) throws KerberosNameParser.BadFormatString { + Matcher match = PARAMETER_PATTERN.matcher(format); + int start = 0; + StringBuilder result = new StringBuilder(); + while (start < format.length() && match.find(start)) { + result.append(match.group(1)); + String paramNum = match.group(3); + if (paramNum != null) { + try { + int num = Integer.parseInt(paramNum); + if (num < 0 || num > params.length) { + throw new KerberosNameParser.BadFormatString("index " + num + " from " + format + + " is outside of the valid range 0 to " + + (params.length - 1)); + } + result.append(params[num]); + } catch (NumberFormatException nfe) { + throw new KerberosNameParser.BadFormatString("bad format in username mapping in " + + paramNum, nfe); + } + + } + start = match.end(); + } + return result.toString(); + } + + /** + * Replace the matches of the from pattern in the base string with the value + * of the to string. + * @param base the string to transform + * @param from the pattern to look for in the base string + * @param to the string to replace matches of the pattern with + * @param repeat whether the substitution should be repeated + * @return + */ + static String replaceSubstitution(String base, Pattern from, String to, + boolean repeat) { + Matcher match = from.matcher(base); + if (repeat) { + return match.replaceAll(to); + } else { + return match.replaceFirst(to); + } + } + + /** + * Try to apply this rule to the given name represented as a parameter + * array. + * @param params first element is the realm, second and later elements are + * are the components of the name "a/b@FOO" -> {"FOO", "a", "b"} + * @return the short name if this rule applies or null + * @throws IOException throws if something is wrong with the rules + */ + String apply(String[] params) throws IOException { + String result = null; + if (isDefault) { + if (defaultRealm.equals(params[0])) { + result = params[1]; + } + } else if (params.length - 1 == numOfComponents) { + String base = replaceParameters(format, params); + if (match == null || match.matcher(base).matches()) { + if (fromPattern == null) { + result = base; + } else { + result = replaceSubstitution(base, fromPattern, toPattern, repeat); + } + } + } + if (result != null && NON_SIMPLE_PATTERN.matcher(result).find()) { + throw new NoMatchingRule("Non-simple name " + result + " after auth_to_local rule " + this); + } + return result; + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java new file mode 100644 index 0000000..dd885e5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.kerberos; + +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.kerberos.KerberosTicket; +import javax.security.auth.Subject; + +import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.ClientCallbackHandler; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Shell; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.SystemTime; + +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Random; +import java.util.Set; +import java.util.Map; + +/** + * This class is responsible for refreshing Kerberos credentials for + * logins for both Kafka client and server. + */ +public class Login { + private static final Logger log = LoggerFactory.getLogger(Login.class); + + private static final Random RNG = new Random(); + + private final Thread t; + private final boolean isKrbTicket; + private final boolean isUsingTicketCache; + + private final String loginContextName; + private final String principal; + private final Time time = new SystemTime(); + private final CallbackHandler callbackHandler = new ClientCallbackHandler(); + + // LoginThread will sleep until 80% of time from last refresh to + // ticket's expiry has been reached, at which time it will wake + // and try to renew the ticket. + private final double ticketRenewWindowFactor; + + /** + * Percentage of random jitter added to the renewal time + */ + private final double ticketRenewJitter; + + // Regardless of ticketRenewWindowFactor setting above and the ticket expiry time, + // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute). + // Change the '1' to e.g. 5, to change this to 5 minutes. + private final long minTimeBeforeRelogin; + + private final String kinitCmd; + + private volatile Subject subject; + + private LoginContext login; + private long lastLogin; + + /** + * Login constructor. The constructor starts the thread used + * to periodically re-login to the Kerberos Ticket Granting Server. + * @param loginContextName + * name of section in JAAS file that will be use to login. + * Passed as first param to javax.security.auth.login.LoginContext(). + * @param configs configure Login with the given key-value pairs. + * @throws javax.security.auth.login.LoginException + * Thrown if authentication fails. + */ + public Login(final String loginContextName, Map<String, ?> configs) throws LoginException { + this.loginContextName = loginContextName; + this.ticketRenewWindowFactor = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR); + this.ticketRenewJitter = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER); + this.minTimeBeforeRelogin = (Long) configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN); + this.kinitCmd = (String) configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD); + + this.lastLogin = currentElapsedTime(); + login = login(loginContextName); + subject = login.getSubject(); + isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty(); + + AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName); + if (entries.length == 0) { + isUsingTicketCache = false; + principal = null; + } else { + // there will only be a single entry + AppConfigurationEntry entry = entries[0]; + if (entry.getOptions().get("useTicketCache") != null) { + String val = (String) entry.getOptions().get("useTicketCache"); + isUsingTicketCache = val.equals("true"); + } else + isUsingTicketCache = false; + if (entry.getOptions().get("principal") != null) + principal = (String) entry.getOptions().get("principal"); + else + principal = null; + } + + if (!isKrbTicket) { + log.debug("It is not a Kerberos ticket"); + t = null; + // if no TGT, do not bother with ticket management. + return; + } + log.debug("It is a Kerberos ticket"); + + // Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the + // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development, + // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running: + // "modprinc -maxlife 3mins <principal>" in kadmin. + t = Utils.newThread("kafka-kerberos-refresh-thread", new Runnable() { + public void run() { + log.info("TGT refresh thread started."); + while (true) { // renewal thread's main loop. if it exits from here, thread will exit. + KerberosTicket tgt = getTGT(); + long now = currentWallTime(); + long nextRefresh; + Date nextRefreshDate; + if (tgt == null) { + nextRefresh = now + minTimeBeforeRelogin; + nextRefreshDate = new Date(nextRefresh); + log.warn("No TGT found: will try again at {}", nextRefreshDate); + } else { + nextRefresh = getRefreshTime(tgt); + long expiry = tgt.getEndTime().getTime(); + Date expiryDate = new Date(expiry); + if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() >= expiry) { + log.error("The TGT cannot be renewed beyond the next expiry date: {}." + + "This process will not be able to authenticate new SASL connections after that " + + "time (for example, it will not be able to authenticate a new connection with a Kafka " + + "Broker). Ask your system administrator to either increase the " + + "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " + + "kadmin, or instead, to generate a keytab for {}. Because the TGT's " + + "expiry cannot be further extended by refreshing, exiting refresh thread now.", + expiryDate, principal, principal); + return; + } + // determine how long to sleep from looking at ticket's expiry. + // We should not allow the ticket to expire, but we should take into consideration + // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so + // would cause ticket expiration. + if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) { + // expiry is before next scheduled refresh). + log.info("Refreshing now because expiry is before next scheduled refresh time."); + nextRefresh = now; + } else { + if (nextRefresh < (now + minTimeBeforeRelogin)) { + // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN). + Date until = new Date(nextRefresh); + Date newUntil = new Date(now + minTimeBeforeRelogin); + log.warn("TGT refresh thread time adjusted from {} to {} since the former is sooner " + + "than the minimum refresh interval ({} seconds) from now.", + until, newUntil, minTimeBeforeRelogin / 1000); + } + nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin); + } + nextRefreshDate = new Date(nextRefresh); + if (nextRefresh > expiry) { + log.error("Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." + + "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.", + nextRefreshDate, expiryDate); + return; + } + } + if (now < nextRefresh) { + Date until = new Date(nextRefresh); + log.info("TGT refresh sleeping until: {}", until); + try { + Thread.sleep(nextRefresh - now); + } catch (InterruptedException ie) { + log.warn("TGT renewal thread has been interrupted and will exit."); + return; + } + } else { + log.error("NextRefresh: {} is in the past: exiting refresh thread. Check" + + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)." + + " Manual intervention will be required for this client to successfully authenticate." + + " Exiting refresh thread.", nextRefreshDate); + return; + } + if (isUsingTicketCache) { + String kinitArgs = "-R"; + int retry = 1; + while (retry >= 0) { + try { + log.debug("Running ticket cache refresh command: {} {}", kinitCmd, kinitArgs); + Shell.execCommand(kinitCmd, kinitArgs); + break; + } catch (Exception e) { + if (retry > 0) { + --retry; + // sleep for 10 seconds + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ie) { + log.error("Interrupted while renewing TGT, exiting Login thread"); + return; + } + } else { + log.warn("Could not renew TGT due to problem running shell command: '" + kinitCmd + + " " + kinitArgs + "'" + "; exception was: " + e + ". Exiting refresh thread.", e); + return; + } + } + } + } + try { + int retry = 1; + while (retry >= 0) { + try { + reLogin(); + break; + } catch (LoginException le) { + if (retry > 0) { + --retry; + // sleep for 10 seconds. + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + log.error("Interrupted during login retry after LoginException:", le); + throw le; + } + } else { + log.error("Could not refresh TGT for principal: " + principal + ".", le); + } + } + } + } catch (LoginException le) { + log.error("Failed to refresh TGT: refresh thread exiting now.", le); + return; + } + } + } + }, false); + } + + public void startThreadIfNeeded() { + // thread object 't' will be null if a refresh thread is not needed. + if (t != null) { + t.start(); + } + } + + public void shutdown() { + if ((t != null) && (t.isAlive())) { + t.interrupt(); + try { + t.join(); + } catch (InterruptedException e) { + log.warn("Error while waiting for Login thread to shutdown: " + e, e); + } + } + } + + public Subject subject() { + return subject; + } + + private synchronized LoginContext login(final String loginContextName) throws LoginException { + String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM); + if (jaasConfigFile == null) { + throw new IllegalArgumentException("You must pass " + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + " in secure mode."); + } + + AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName); + if (configEntries == null) { + // Forcing a reload of the configuration in case it's been overridden by third-party code. + // Without this, our tests fail sometimes depending on the order the tests are executed. + // Singletons are bad. + Configuration.setConfiguration(null); + configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName); + if (configEntries == null) { + String errorMessage = "Could not find a '" + loginContextName + "' entry in `" + jaasConfigFile + "`."; + throw new IllegalArgumentException(errorMessage); + } else { + log.info("Found `" + loginContextName + "` in JAAS configuration after forced reload."); + } + } + + LoginContext loginContext = new LoginContext(loginContextName, callbackHandler); + loginContext.login(); + log.info("Successfully logged in."); + return loginContext; + } + + private long getRefreshTime(KerberosTicket tgt) { + long start = tgt.getStartTime().getTime(); + long expires = tgt.getEndTime().getTime(); + log.info("TGT valid starting at: {}", tgt.getStartTime()); + log.info("TGT expires: {}", tgt.getEndTime()); + long proposedRefresh = start + (long) ((expires - start) * + (ticketRenewWindowFactor + (ticketRenewJitter * RNG.nextDouble()))); + + if (proposedRefresh > expires) + // proposedRefresh is too far in the future: it's after ticket expires: simply return now. + return currentWallTime(); + else + return proposedRefresh; + } + + private synchronized KerberosTicket getTGT() { + Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class); + for (KerberosTicket ticket : tickets) { + KerberosPrincipal server = ticket.getServer(); + if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) { + log.debug("Found TGT {}.", ticket); + return ticket; + } + } + return null; + } + + private boolean hasSufficientTimeElapsed() { + long now = currentElapsedTime(); + if (now - lastLogin < minTimeBeforeRelogin) { + log.warn("Not attempting to re-login since the last re-login was attempted less than {} seconds before.", + minTimeBeforeRelogin / 1000); + return false; + } + return true; + } + + /** + * Re-login a principal. This method assumes that {@link #login(String)} has happened already. + * @throws javax.security.auth.login.LoginException on a failure + */ + private synchronized void reLogin() + throws LoginException { + if (!isKrbTicket) { + return; + } + if (login == null) { + throw new LoginException("Login must be done first"); + } + if (!hasSufficientTimeElapsed()) { + return; + } + log.info("Initiating logout for {}", principal); + synchronized (Login.class) { + // register most recent relogin attempt + lastLogin = currentElapsedTime(); + //clear up the kerberos state. But the tokens are not cleared! As per + //the Java kerberos login module code, only the kerberos credentials + //are cleared + login.logout(); + //login and also update the subject field of this instance to + //have the new credentials (pass it to the LoginContext constructor) + login = new LoginContext(loginContextName, subject); + log.info("Initiating re-login for {}", principal); + login.login(); + } + } + + private long currentElapsedTime() { + return time.nanoseconds() / 1000000; + } + + private long currentWallTime() { + return time.milliseconds(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java new file mode 100644 index 0000000..18651c8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.kerberos; + +import javax.security.auth.Subject; +import javax.security.auth.login.LoginException; +import java.io.IOException; +import java.util.EnumMap; +import java.util.Map; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.LoginType; +import org.apache.kafka.common.security.JaasUtils; + +public class LoginManager { + + private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap(LoginType.class); + + private final Login login; + private final String serviceName; + private final LoginType loginType; + private int refCount; + + private LoginManager(LoginType loginType, Map<String, ?> configs) throws IOException, LoginException { + this.loginType = loginType; + String loginContext = loginType.contextName(); + login = new Login(loginContext, configs); + this.serviceName = getServiceName(loginContext, configs); + login.startThreadIfNeeded(); + } + + private static String getServiceName(String loginContext, Map<String, ?> configs) throws IOException { + String jaasServiceName = JaasUtils.jaasConfig(loginContext, JaasUtils.SERVICE_NAME); + String configServiceName = (String) configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME); + if (jaasServiceName != null && configServiceName != null && jaasServiceName != configServiceName) { + String message = "Conflicting serviceName values found in JAAS and Kafka configs " + + "value in JAAS file " + jaasServiceName + ", value in Kafka config " + configServiceName; + throw new IllegalArgumentException(message); + } + + if (jaasServiceName != null) + return jaasServiceName; + if (configServiceName != null) + return configServiceName; + + throw new IllegalArgumentException("No serviceName defined in either JAAS or Kafka config"); + } + + /** + * Returns an instance of `LoginManager` and increases its reference count. + * + * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an + * existing `LoginManager` for the provided `mode` if available. However, it expects `configs` to be the same for + * every invocation and it will ignore them in the case where it's returning a cached instance of `LoginManager`. + * + * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and + * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more + * complicated to do the latter without making the consumer API more complex. + * + * @param loginType the type of the login context, it should be SERVER for the broker and CLIENT for the clients + * (i.e. consumer and producer) + * @param configs configuration as key/value pairs + */ + public static final LoginManager acquireLoginManager(LoginType loginType, Map<String, ?> configs) throws IOException, LoginException { + synchronized (LoginManager.class) { + LoginManager loginManager = CACHED_INSTANCES.get(loginType); + if (loginManager == null) { + loginManager = new LoginManager(loginType, configs); + CACHED_INSTANCES.put(loginType, loginManager); + } + return loginManager.acquire(); + } + } + + public Subject subject() { + return login.subject(); + } + + public String serviceName() { + return serviceName; + } + + private LoginManager acquire() { + ++refCount; + return this; + } + + /** + * Decrease the reference count for this instance and release resources if it reaches 0. + */ + public void release() { + synchronized (LoginManager.class) { + if (refCount == 0) + throw new IllegalStateException("release called on LoginManager with refCount == 0"); + else if (refCount == 1) { + CACHED_INSTANCES.remove(loginType); + login.shutdown(); + } + --refCount; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/NoMatchingRule.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/NoMatchingRule.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/NoMatchingRule.java new file mode 100644 index 0000000..6c2d267 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/NoMatchingRule.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.kerberos; + +import java.io.IOException; + +public class NoMatchingRule extends IOException { + NoMatchingRule(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java index b291409..163b8c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java @@ -19,19 +19,19 @@ package org.apache.kafka.common.security.ssl; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.network.Mode; import javax.net.ssl.*; import java.io.FileInputStream; import java.io.IOException; import java.security.GeneralSecurityException; import java.security.KeyStore; + import java.util.List; import java.util.Map; - public class SSLFactory implements Configurable { - public enum Mode { CLIENT, SERVER }; private String protocol; private String provider; private String kmfAlgorithm; http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/utils/Shell.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java new file mode 100644 index 0000000..f5db5c3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kafka.common.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A base class for running a Unix command. + * + * <code>Shell</code> can be used to run unix commands like <code>du</code> or + * <code>df</code>. + */ +abstract public class Shell { + + private static final Logger LOG = LoggerFactory.getLogger(Shell.class); + + /** Return an array containing the command name and its parameters */ + protected abstract String[] execString(); + + /** Parse the execution result */ + protected abstract void parseExecResult(BufferedReader lines) throws IOException; + + private final long timeout; + + private int exitCode; + private Process process; // sub process used to execute the command + + /* If or not script finished executing */ + private volatile AtomicBoolean completed; + + /** + * @param timeout Specifies the time in milliseconds, after which the command will be killed. -1 means no timeout. + */ + public Shell(long timeout) { + this.timeout = timeout; + } + + /** get the exit code + * @return the exit code of the process + */ + public int exitCode() { + return exitCode; + } + + /** get the current sub-process executing the given command + * @return process executing the command + */ + public Process process() { + return process; + } + + protected void run() throws IOException { + exitCode = 0; // reset for next run + runCommand(); + } + + /** Run a command */ + private void runCommand() throws IOException { + ProcessBuilder builder = new ProcessBuilder(execString()); + Timer timeoutTimer = null; + completed = new AtomicBoolean(false); + + process = builder.start(); + if (timeout > -1) { + timeoutTimer = new Timer(); + //One time scheduling. + timeoutTimer.schedule(new ShellTimeoutTimerTask(this), timeout); + } + final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); + BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + final StringBuffer errMsg = new StringBuffer(); + + // read error and input streams as this would free up the buffers + // free the error stream buffer + Thread errThread = Utils.newThread("kafka-shell-thread", new Runnable() { + @Override + public void run() { + try { + String line = errReader.readLine(); + while ((line != null) && !Thread.currentThread().isInterrupted()) { + errMsg.append(line); + errMsg.append(System.getProperty("line.separator")); + line = errReader.readLine(); + } + } catch (IOException ioe) { + LOG.warn("Error reading the error stream", ioe); + } + } + }, false); + errThread.start(); + + try { + parseExecResult(inReader); // parse the output + // clear the input stream buffer + String line = null; + while (line != null) { + line = inReader.readLine(); + } + // wait for the process to finish and check the exit code + exitCode = process.waitFor(); + try { + // make sure that the error thread exits + errThread.join(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted while reading the error stream", ie); + } + completed.set(true); + //the timeout thread handling + //taken care in finally block + if (exitCode != 0) { + throw new ExitCodeException(exitCode, errMsg.toString()); + } + } catch (InterruptedException ie) { + throw new IOException(ie.toString()); + } finally { + if (timeoutTimer != null) + timeoutTimer.cancel(); + + // close the input stream + try { + inReader.close(); + } catch (IOException ioe) { + LOG.warn("Error while closing the input stream", ioe); + } + if (!completed.get()) + errThread.interrupt(); + + try { + errReader.close(); + } catch (IOException ioe) { + LOG.warn("Error while closing the error stream", ioe); + } + + process.destroy(); + } + } + + + /** + * This is an IOException with exit code added. + */ + @SuppressWarnings("serial") + public static class ExitCodeException extends IOException { + int exitCode; + + public ExitCodeException(int exitCode, String message) { + super(message); + this.exitCode = exitCode; + } + + public int getExitCode() { + return exitCode; + } + } + + /** + * A simple shell command executor. + * + * <code>ShellCommandExecutor</code>should be used in cases where the output + * of the command needs no explicit parsing and where the command, working + * directory and the environment remains unchanged. The output of the command + * is stored as-is and is expected to be small. + */ + public static class ShellCommandExecutor extends Shell { + + private final String[] command; + private StringBuffer output; + + /** + * Create a new instance of the ShellCommandExecutor to execute a command. + * + * @param execString The command to execute with arguments + * @param timeout Specifies the time in milliseconds, after which the + * command will be killed. -1 means no timeout. + */ + + public ShellCommandExecutor(String[] execString, long timeout) { + super(timeout); + command = execString.clone(); + } + + + /** Execute the shell command. */ + public void execute() throws IOException { + this.run(); + } + + protected String[] execString() { + return command; + } + + protected void parseExecResult(BufferedReader reader) throws IOException { + output = new StringBuffer(); + char[] buf = new char[512]; + int nRead; + while ((nRead = reader.read(buf, 0, buf.length)) > 0) { + output.append(buf, 0, nRead); + } + } + + /** Get the output of the shell command.*/ + public String output() { + return (output == null) ? "" : output.toString(); + } + + /** + * Returns the commands of this instance. + * Arguments with spaces in are presented with quotes round; other + * arguments are presented raw + * + * @return a string representation of the object. + */ + public String toString() { + StringBuilder builder = new StringBuilder(); + String[] args = execString(); + for (String s : args) { + if (s.indexOf(' ') >= 0) { + builder.append('"').append(s).append('"'); + } else { + builder.append(s); + } + builder.append(' '); + } + return builder.toString(); + } + } + + /** + * Static method to execute a shell command. + * Covers most of the simple cases without requiring the user to implement + * the <code>Shell</code> interface. + * @param cmd shell command to execute. + * @return the output of the executed command. + */ + public static String execCommand(String ... cmd) throws IOException { + return execCommand(cmd, -1); + } + + /** + * Static method to execute a shell command. + * Covers most of the simple cases without requiring the user to implement + * the <code>Shell</code> interface. + * @param cmd shell command to execute. + * @param timeout time in milliseconds after which script should be killed. -1 means no timeout. + * @return the output of the executed command. + */ + public static String execCommand(String[] cmd, long timeout) throws IOException { + ShellCommandExecutor exec = new ShellCommandExecutor(cmd, timeout); + exec.execute(); + return exec.output(); + } + + /** + * Timer which is used to timeout scripts spawned off by shell. + */ + private static class ShellTimeoutTimerTask extends TimerTask { + + private final Shell shell; + + public ShellTimeoutTimerTask(Shell shell) { + this.shell = shell; + } + + @Override + public void run() { + Process p = shell.process(); + try { + p.exitValue(); + } catch (Exception e) { + //Process has not terminated. + //So check if it has completed + //if not just destroy it. + if (p != null && !shell.completed.get()) { + p.destroy(); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/utils/Time.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Time.java b/clients/src/main/java/org/apache/kafka/common/utils/Time.java index 66c44de..b2fad7f 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Time.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Time.java @@ -24,16 +24,16 @@ public interface Time { /** * The current time in milliseconds */ - public long milliseconds(); + long milliseconds(); /** * The current time in nanoseconds */ - public long nanoseconds(); + long nanoseconds(); /** * Sleep for the given number of milliseconds */ - public void sleep(long ms); + void sleep(long ms); } http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java index f13c21a..4a6d304 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java @@ -45,7 +45,7 @@ class EchoServer extends Thread { this.protocol = configs.containsKey("security.protocol") ? SecurityProtocol.valueOf((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT; if (protocol == SecurityProtocol.SSL) { - this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER); + this.sslFactory = new SSLFactory(Mode.SERVER); this.sslFactory.configure(configs); SSLContext sslContext = this.sslFactory.sslContext(); this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0); http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java index c60053f..6475ff0 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java @@ -22,7 +22,6 @@ import java.net.InetSocketAddress; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.config.SSLConfigs; -import org.apache.kafka.common.security.ssl.SSLFactory; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestSSLUtils; import org.junit.After; @@ -40,15 +39,15 @@ public class SSLSelectorTest extends SelectorTest { public void setup() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); - Map<String, Object> sslServerConfigs = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.SERVER, trustStoreFile, "server"); + Map<String, Object> sslServerConfigs = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server"); sslServerConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); this.server = new EchoServer(sslServerConfigs); this.server.start(); this.time = new MockTime(); - Map<String, Object> sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, SSLFactory.Mode.SERVER, trustStoreFile, "client"); + Map<String, Object> sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, Mode.SERVER, trustStoreFile, "client"); sslClientConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); - this.channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT); + this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT); this.channelBuilder.configure(sslClientConfigs); this.metrics = new Metrics(); this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder); http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java index 6993f52..987f4bb 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java @@ -73,7 +73,7 @@ public class SSLTransportLayerTest { sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); - this.channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT); + this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT); this.channelBuilder.configure(sslClientConfigs); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder); } @@ -270,7 +270,7 @@ public class SSLTransportLayerTest { */ @Test public void testInvalidTruststorePassword() throws Exception { - SSLChannelBuilder channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT); + SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT); try { sslClientConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); @@ -285,7 +285,7 @@ public class SSLTransportLayerTest { */ @Test public void testInvalidKeystorePassword() throws Exception { - SSLChannelBuilder channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT); + SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT); try { sslClientConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); @@ -437,7 +437,7 @@ public class SSLTransportLayerTest { private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) { - this.channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT) { + this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT) { @Override protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException { @@ -461,7 +461,7 @@ public class SSLTransportLayerTest { CertStores(boolean server) throws Exception { String name = server ? "server" : "client"; - SSLFactory.Mode mode = server ? SSLFactory.Mode.SERVER : SSLFactory.Mode.CLIENT; + Mode mode = server ? Mode.SERVER : Mode.CLIENT; File truststoreFile = File.createTempFile(name + "TS", ".jks"); sslConfig = TestSSLUtils.createSSLConfig(!server, true, mode, truststoreFile, name); sslConfig.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); @@ -549,7 +549,7 @@ public class SSLTransportLayerTest { private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>(); public SSLEchoServer(Map<String, ?> configs, String serverHost) throws Exception { - this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER); + this.sslFactory = new SSLFactory(Mode.SERVER); this.sslFactory.configure(configs); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); @@ -557,7 +557,7 @@ public class SSLTransportLayerTest { this.port = serverSocketChannel.socket().getLocalPort(); this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>()); this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>()); - SSLChannelBuilder channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.SERVER); + SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.SERVER); channelBuilder.configure(sslServerConfigs); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder); setName("echoserver"); http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java new file mode 100644 index 0000000..9781f6d --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.kerberos; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class KerberosNameTest { + + @Test + public void testParse() throws IOException { + List<String> rules = new ArrayList<>(Arrays.asList( + "RULE:[1:$1](App\\..*)s/App\\.(.*)/$1/g", + "RULE:[2:$1](App\\..*)s/App\\.(.*)/$1/g", + "DEFAULT" + )); + KerberosNameParser parser = new KerberosNameParser("REALM.COM", rules); + + KerberosName name = parser.parse("App.service-name/[email protected]"); + assertEquals("App.service-name", name.serviceName()); + assertEquals("example.com", name.hostName()); + assertEquals("REALM.COM", name.realm()); + assertEquals("service-name", name.shortName()); + + name = parser.parse("[email protected]"); + assertEquals("App.service-name", name.serviceName()); + assertNull(name.hostName()); + assertEquals("REALM.COM", name.realm()); + assertEquals("service-name", name.shortName()); + + name = parser.parse("user/[email protected]"); + assertEquals("user", name.serviceName()); + assertEquals("host", name.hostName()); + assertEquals("REALM.COM", name.realm()); + assertEquals("user", name.shortName()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java index 0aec666..e90ec2b 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java @@ -18,6 +18,7 @@ import java.io.File; import java.util.Map; import org.apache.kafka.test.TestSSLUtils; +import org.apache.kafka.common.network.Mode; import org.junit.Test; import static org.junit.Assert.assertNotNull; @@ -35,8 +36,8 @@ public class SSLFactoryTest { @Test public void testSSLFactoryConfiguration() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); - Map<String, Object> serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.SERVER, trustStoreFile, "server"); - SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.SERVER); + Map<String, Object> serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server"); + SSLFactory sslFactory = new SSLFactory(Mode.SERVER); sslFactory.configure(serverSSLConfig); //host and port are hints SSLEngine engine = sslFactory.createSSLEngine("localhost", 0); @@ -49,8 +50,8 @@ public class SSLFactoryTest { @Test public void testClientMode() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); - Map<String, Object> clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.CLIENT, trustStoreFile, "client"); - SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT); + Map<String, Object> clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.CLIENT, trustStoreFile, "client"); + SSLFactory sslFactory = new SSLFactory(Mode.CLIENT); sslFactory.configure(clientSSLConfig); //host and port are hints SSLEngine engine = sslFactory.createSSLEngine("localhost", 0); http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java index eb7fcf0..387e48f 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java index c01cf37..b231692 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java @@ -18,7 +18,7 @@ package org.apache.kafka.test; import org.apache.kafka.common.config.SSLConfigs; -import org.apache.kafka.common.security.ssl.SSLFactory; +import org.apache.kafka.common.network.Mode; import org.apache.kafka.clients.CommonClientConfigs; import java.io.File; @@ -177,13 +177,13 @@ public class TestSSLUtils { return certs; } - public static Map<String, Object> createSSLConfig(SSLFactory.Mode mode, File keyStoreFile, String password, String keyPassword, + public static Map<String, Object> createSSLConfig(Mode mode, File keyStoreFile, String password, String keyPassword, File trustStoreFile, String trustStorePassword) { Map<String, Object> sslConfigs = new HashMap<String, Object>(); sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol sslConfigs.put(SSLConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext - if (mode == SSLFactory.Mode.SERVER || (mode == SSLFactory.Mode.CLIENT && keyStoreFile != null)) { + if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) { sslConfigs.put(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath()); sslConfigs.put(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); sslConfigs.put(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm()); @@ -203,13 +203,13 @@ public class TestSSLUtils { return sslConfigs; } - public static Map<String, Object> createSSLConfig(boolean useClientCert, boolean trustStore, SSLFactory.Mode mode, File trustStoreFile, String certAlias) + public static Map<String, Object> createSSLConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias) throws IOException, GeneralSecurityException { Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>(); File keyStoreFile; String password; - if (mode == SSLFactory.Mode.SERVER) + if (mode == Mode.SERVER) password = "ServerPassword"; else password = "ClientPassword"; http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index c2076a2..3756822 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -21,10 +21,9 @@ import kafka.utils._ import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient} import org.apache.kafka.common.{TopicPartition, Node} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{Selectable, ChannelBuilders, Selector, NetworkReceive} +import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, Selector, NetworkReceive, Mode} import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys} import org.apache.kafka.common.requests._ -import org.apache.kafka.common.security.ssl.SSLFactory import org.apache.kafka.common.utils.Time import collection.mutable.HashMap import kafka.cluster.Broker @@ -97,7 +96,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf "controller-channel", Map("broker-id" -> broker.id.toString).asJava, false, - ChannelBuilders.create(config.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, config.channelConfigs) + ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, config.channelConfigs) ) new NetworkClient( selector, http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 41a3705..1066fbe 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -33,9 +33,8 @@ import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.{Selector => KSelector, ChannelBuilders, InvalidReceiveException} +import org.apache.kafka.common.network.{Selector => KSelector, LoginType, Mode, ChannelBuilders, InvalidReceiveException} import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.security.ssl.SSLFactory import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException import org.apache.kafka.common.utils.{Time, Utils} @@ -378,7 +377,7 @@ private[kafka] class Processor(val id: Int, private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() private val inflightResponses = mutable.Map[String, RequestChannel.Response]() - private val channelBuilder = ChannelBuilders.create(protocol, SSLFactory.Mode.SERVER, channelConfigs) + private val channelBuilder = ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs) private val metricTags = new util.HashMap[String, String]() metricTags.put("networkProcessor", id.toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 913d49b..194ee9c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -26,6 +26,8 @@ import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs + import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SSLConfigs} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol @@ -33,7 +35,6 @@ import org.apache.kafka.common.security.auth.PrincipalBuilder import scala.collection.{Map, immutable} - object Defaults { /** ********* Zookeeper Configuration ***********/ val ZkSessionTimeoutMs = 6000 @@ -168,6 +169,14 @@ object Defaults { val SSLClientAuthNone = "none" val SSLClientAuth = SSLClientAuthNone val SSLCipherSuites = "" + + /** ********* Sasl configuration ***********/ + val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD + val SaslKerberosTicketRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR + val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER + val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN + val AuthToLocal = SaslConfigs.DEFAULT_AUTH_TO_LOCAL + } object KafkaConfig { @@ -316,6 +325,13 @@ object KafkaConfig { val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG + /** ********* SASL Configuration ****************/ + val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME + val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD + val SaslKerberosTicketRenewWindowFactorProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR + val SaslKerberosTicketRenewJitterProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER + val SaslKerberosMinTimeBeforeReloginProp = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN + val AuthToLocalProp = SaslConfigs.AUTH_TO_LOCAL /* Documentation */ /** ********* Zookeeper Configuration ***********/ @@ -487,6 +503,14 @@ object KafkaConfig { val SSLEndpointIdentificationAlgorithmDoc = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC + /** ********* Sasl Configuration ****************/ + val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC + val SaslKerberosKinitCmdDoc = SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC + val SaslKerberosTicketRenewWindowFactorDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC + val SaslKerberosTicketRenewJitterDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC + val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC + val AuthToLocalDoc = SaslConfigs.AUTH_TO_LOCAL_DOC + private val configDef = { import ConfigDef.Importance._ import ConfigDef.Range._ @@ -642,6 +666,15 @@ object KafkaConfig { .define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM, SSLTrustManagerAlgorithmDoc) .define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired, Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc) .define(SSLCipherSuitesProp, LIST, Defaults.SSLCipherSuites, MEDIUM, SSLCipherSuitesDoc) + + /** ********* Sasl Configuration ****************/ + .define(SaslKerberosServiceNameProp, STRING, MEDIUM, SaslKerberosServiceNameDoc, false) + .define(SaslKerberosKinitCmdProp, STRING, Defaults.SaslKerberosKinitCmd, MEDIUM, SaslKerberosKinitCmdDoc) + .define(SaslKerberosTicketRenewWindowFactorProp, DOUBLE, Defaults.SaslKerberosTicketRenewWindowFactor, MEDIUM, SaslKerberosTicketRenewWindowFactorDoc) + .define(SaslKerberosTicketRenewJitterProp, DOUBLE, Defaults.SaslKerberosTicketRenewJitter, MEDIUM, SaslKerberosTicketRenewJitterDoc) + .define(SaslKerberosMinTimeBeforeReloginProp, LONG, Defaults.SaslKerberosMinTimeBeforeRelogin, MEDIUM, SaslKerberosMinTimeBeforeReloginDoc) + .define(AuthToLocalProp, LIST, Defaults.AuthToLocal, MEDIUM, AuthToLocalDoc) + } def configNames() = { @@ -802,6 +835,14 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp) val sslCipher = getList(KafkaConfig.SSLCipherSuitesProp) + /** ********* Sasl Configuration **************/ + val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp) + val saslKerberosKinitCmd = getString(KafkaConfig.SaslKerberosKinitCmdProp) + val saslKerberosTicketRenewWindowFactor = getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp) + val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp) + val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp) + val authToLocal = getList(KafkaConfig.AuthToLocalProp) + /** ********* Quota Configuration **************/ val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp) val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp) @@ -823,7 +864,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val millis: java.lang.Long = Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse( Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match { - case Some(mins) => millisInMinute * mins + case Some(mins) => millisInMinute * mins case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour }) @@ -927,20 +968,30 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka def channelConfigs: java.util.Map[String, Object] = { val channelConfigs = new java.util.HashMap[String, Object]() import kafka.server.KafkaConfig._ - channelConfigs.put(PrincipalBuilderClassProp, Class.forName(principalBuilderClass)) - channelConfigs.put(SSLProtocolProp, sslProtocol) - channelConfigs.put(SSLEnabledProtocolsProp, sslEnabledProtocols) - channelConfigs.put(SSLKeystoreTypeProp, sslKeystoreType) - channelConfigs.put(SSLKeystoreLocationProp, sslKeystoreLocation) - channelConfigs.put(SSLKeystorePasswordProp, sslKeystorePassword) - channelConfigs.put(SSLKeyPasswordProp, sslKeyPassword) - channelConfigs.put(SSLTruststoreTypeProp, sslTruststoreType) - channelConfigs.put(SSLTruststoreLocationProp, sslTruststoreLocation) - channelConfigs.put(SSLTruststorePasswordProp, sslTruststorePassword) - channelConfigs.put(SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm) - channelConfigs.put(SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm) - channelConfigs.put(SSLClientAuthProp, sslClientAuth) - channelConfigs.put(SSLCipherSuitesProp, sslCipher) + Seq( + (PrincipalBuilderClassProp, Class.forName(principalBuilderClass)), + (SSLProtocolProp, sslProtocol), + (SSLEnabledProtocolsProp, sslEnabledProtocols), + (SSLKeystoreTypeProp, sslKeystoreType), + (SSLKeystoreLocationProp, sslKeystoreLocation), + (SSLKeystorePasswordProp, sslKeystorePassword), + (SSLKeyPasswordProp, sslKeyPassword), + (SSLTruststoreTypeProp, sslTruststoreType), + (SSLTruststoreLocationProp, sslTruststoreLocation), + (SSLTruststorePasswordProp, sslTruststorePassword), + (SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm), + (SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm), + (SSLClientAuthProp, sslClientAuth), + (SSLCipherSuitesProp, sslCipher), + (SaslKerberosServiceNameProp, saslKerberosServiceName), + (SaslKerberosKinitCmdProp, saslKerberosKinitCmd), + (SaslKerberosTicketRenewWindowFactorProp, saslKerberosTicketRenewWindowFactor), + (SaslKerberosTicketRenewJitterProp, saslKerberosTicketRenewJitter), + (SaslKerberosMinTimeBeforeReloginProp, saslKerberosMinTimeBeforeRelogin), + (AuthToLocalProp, authToLocal) + ).foreach { case (key, value) => + if (value != null) channelConfigs.put(key, value) + } channelConfigs } http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f50c266..510957b 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -21,7 +21,7 @@ import java.net.{SocketTimeoutException} import java.util import kafka.admin._ -import kafka.api.{KAFKA_090, ApiVersion} +import kafka.api.KAFKA_090 import kafka.log.LogConfig import kafka.log.CleanerConfig import kafka.log.LogManager @@ -34,17 +34,16 @@ import kafka.utils._ import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest, NetworkClient} import org.apache.kafka.common.Node import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkReceive, Selector} +import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode} import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.requests.{ControlledShutdownResponse, ControlledShutdownRequest, RequestSend} import org.apache.kafka.common.security.JaasUtils -import org.apache.kafka.common.security.ssl.SSLFactory import org.apache.kafka.common.utils.AppInfoParser import scala.collection.mutable import scala.collection.JavaConverters._ -import org.I0Itec.zkclient.{ZkClient, ZkConnection} +import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} @@ -308,7 +307,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr "kafka-server-controlled-shutdown", Map.empty.asJava, false, - ChannelBuilders.create(config.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, config.channelConfigs) + ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, config.channelConfigs) ) new NetworkClient( selector, @@ -491,7 +490,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_090)) networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue) else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue) - + if (!shutdownSucceeded) warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed") http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 0a17fd0..5aa817d 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -27,7 +27,7 @@ import kafka.api.KAFKA_090 import kafka.common.{KafkaStorageException, TopicAndPartition} import ReplicaFetcherThread._ import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse} -import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkReceive, Selector} +import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode} import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend, AbstractRequest, ListOffsetRequest} import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest} import org.apache.kafka.common.{Node, TopicPartition} @@ -74,7 +74,7 @@ class ReplicaFetcherThread(name: String, "replica-fetcher", Map("broker-id" -> sourceBroker.id.toString).asJava, false, - ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, brokerConfig.channelConfigs) + ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, brokerConfig.channelConfigs) ) new NetworkClient( selector, http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/resources/kafka_jaas.conf ---------------------------------------------------------------------- diff --git a/core/src/test/resources/kafka_jaas.conf b/core/src/test/resources/kafka_jaas.conf new file mode 100644 index 0000000..b097e26 --- /dev/null +++ b/core/src/test/resources/kafka_jaas.conf @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required debug=true + useKeyTab=true + storeKey=true + serviceName="kafka" + keyTab="$keytab-location" + principal="[email protected]"; +}; + +KafkaServer { + com.sun.security.auth.module.Krb5LoginModule required debug=true + useKeyTab=true + storeKey=true + serviceName="kafka" + keyTab="$keytab-location" + principal="kafka/[email protected]"; +};
