This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c16c2e712ccc98c74bed7368d44fedcd05a66584 Author: Jiale He <35652389+jial...@users.noreply.github.com> AuthorDate: Fri Dec 9 17:18:10 2022 +0800 KYLIN-5442 Optimized for loading Kafka Kerberos keyTab --- .../common/exception/code/ErrorCodeServer.java | 3 + .../resources/kylin_error_msg_conf_cn.properties | 3 + .../resources/kylin_error_msg_conf_en.properties | 3 + .../kylin_error_suggestion_conf_cn.properties | 5 + .../kylin_error_suggestion_conf_en.properties | 6 + .../main/resources/kylin_errorcode_conf.properties | 4 + .../org/apache/kylin/kafka/util/KafkaUtils.java | 5 +- .../streaming/constants/StreamingConstants.java | 6 + .../kylin/streaming/jobs/StreamingJobListener.java | 20 +-- .../kylin/streaming/jobs/StreamingJobUtils.java | 145 ++++++++++++++++++--- .../streaming/jobs/impl/StreamingJobLauncher.java | 132 ++++++++++--------- .../kylin/streaming/CreateStreamingFlatTable.scala | 3 - .../apache/kylin/kafka/util/KafkaUtilsTest.java | 6 +- .../streaming/jobs/StreamingJobListenerTest.java | 32 ++--- .../streaming/jobs/StreamingJobUtilsTest.java | 98 ++++++++++++-- .../jobs/impl/StreamingJobLauncherTest.java | 33 ++++- 16 files changed, 368 insertions(+), 136 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java index 7b60e33ab2..b5eb0fd98c 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java @@ -145,6 +145,9 @@ public enum ErrorCodeServer implements ErrorCodeProducer { // 100352XX Streaming STREAMING_PARSE_MESSAGE_ERROR("KE-010035202"), + READ_KAFKA_JAAS_FILE_ERROR("KE-010035215"), + KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS("KE-010035216"), + KAFKA_JAAS_FILE_KAFKACLIENT_NOT_EXISTS("KE-010035217"), // 100422XX CUSTOM PARSER CUSTOM_PARSER_NOT_JAR("KE-010042201"), diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties index cf284ffba4..be8bca221a 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties @@ -136,6 +136,9 @@ KE-010043221=参数 “%s” 已存在。请检查后重试。 ## Streaming KE-010035202=使用解析器 “%s” 解析Topic “%s” 的消息时发生异常,请检查后重试。 +KE-010035215=无法正确读取 Kafka 认证文件,请检查后再试。 +KE-010035216=Kafka 认证文件中的 keyTab 文件不存在,请检查后重试。 +KE-010035217=Kafka 认证文件中不存在 “KafkaClient”,请检查后重试。 ## XX 100422 Custom Parser diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties index f8a170e52e..b44c0ef422 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties @@ -133,6 +133,9 @@ KE-010043221=The parameter “%s” already exists. Please check and try again. ## Streaming KE-010035202=An exception occurred while parsing the messages of Topic "%2$s" with parser "%1$s". Please check and try again. +KE-010035215=Can't read Kafka authentication file correctly. Please check and try again. +KE-010035216=The keyTab file in the Kafka authentication file does not exist, please check and try again. +KE-010035217="KafkaClient" does not exist in the Kafka authentication file, please check and try again. ## XX 100422 Custom Parser KE-010042201=The file format is invalid. Please use jar format file. diff --git a/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties index 10d5e8e0e4..059fa231e7 100644 --- a/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties +++ b/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties @@ -128,6 +128,11 @@ KE-010043219= KE-010043220= KE-010043221= +## Streaming +KE-010035215= +KE-010035216= +KE-010035217= + # System ## 400052XX password KE-040005201= diff --git a/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties b/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties index 1b5ffcc1c2..05ce078ac1 100644 --- a/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties @@ -128,6 +128,12 @@ KE-010043219= KE-010043220= KE-010043221= + +## Streaming +KE-010035215= +KE-010035216= +KE-010035217= + # System ## 400052XX password KE-040005201= diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf.properties b/src/core-common/src/main/resources/kylin_errorcode_conf.properties index a80a9c19a9..074fb6550e 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf.properties @@ -139,6 +139,10 @@ KE-010043221 ## Streaming KE-010035202 +KE-010035215 +KE-010035216 +KE-010035217 + ## Custom Parser KE-010042201 diff --git a/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java b/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java index a0cd5905c4..26bb14d07d 100644 --- a/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java +++ b/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java @@ -67,8 +67,7 @@ public class KafkaUtils { return getKafkaConsumer(brokers, groupId, new Properties()); } - public static Consumer<String, ByteBuffer> getKafkaConsumer(String brokers, String groupId, - Properties properties) { + public static Consumer<String, ByteBuffer> getKafkaConsumer(String brokers, String groupId, Properties properties) { Properties props = getConsumerProperties(brokers, groupId, properties); if (mockup != null) { return mockup; @@ -106,7 +105,7 @@ public class KafkaUtils { props.putAll(KylinConfig.getInstanceFromEnv().getStreamingKafkaConfigOverride()); synchronized (kafkaJaasTextPair) { if (Boolean.FALSE.equals(kafkaJaasTextPair.getFirst())) { - kafkaJaasTextPair.setSecond(StreamingJobUtils.extractKafkaSaslJaasConf()); + kafkaJaasTextPair.setSecond(StreamingJobUtils.extractKafkaJaasConf(true)); kafkaJaasTextPair.setFirst(true); } } diff --git a/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java b/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java index 1181f22c01..479c496e69 100644 --- a/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java +++ b/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java @@ -21,6 +21,9 @@ package org.apache.kylin.streaming.constants; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.metadata.HDFSMetadataStore; +import lombok.experimental.UtilityClass; + +@UtilityClass public class StreamingConstants { // spark job conf @@ -38,6 +41,9 @@ public class StreamingConstants { public static final String SPARK_DRIVER_OVERHEAD = "spark.driver.memoryOverhead"; public static final String SPARK_DRIVER_OVERHEAD_DEFAULT = "1g"; + public static final String SPARK_KERBEROS_KEYTAB = "spark.kerberos.keytab"; + public static final String SPARK_KERBEROS_PRINCIPAL = "spark.kerberos.principal"; + public static final String SPARK_YARN_DIST_JARS = "spark.yarn.dist.jars"; public static final String SPARK_DRIVER_OPTS = "spark.driver.extraJavaOptions"; public static final String SPARK_EXECUTOR_OPTS = "spark.executor.extraJavaOptions"; diff --git a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java index c0b295154b..d2070f7987 100644 --- a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java +++ b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java @@ -86,40 +86,32 @@ public class StreamingJobListener implements SparkAppHandle.Listener { } private boolean isFailed(SparkAppHandle.State state) { - if (SparkAppHandle.State.FAILED == state || SparkAppHandle.State.KILLED == state - || SparkAppHandle.State.LOST == state) { - return true; - } - return false; + return SparkAppHandle.State.FAILED == state || SparkAppHandle.State.KILLED == state + || SparkAppHandle.State.LOST == state; } private boolean isFinished(SparkAppHandle.State state) { - if (SparkAppHandle.State.FINISHED == state) { - return true; - } - return false; + return SparkAppHandle.State.FINISHED == state; } @Override public void infoChanged(SparkAppHandle handler) { - + // just Override } @Subscribe public void onStreamingJobKill(StreamingJobKillEvent streamingJobKillEvent) { - val project = streamingJobKillEvent.getProject(); val modelId = streamingJobKillEvent.getModelId(); - StreamingScheduler scheduler = StreamingScheduler.getInstance(project); + StreamingScheduler scheduler = StreamingScheduler.getInstance(streamingJobKillEvent.getProject()); scheduler.killJob(modelId, JobTypeEnum.STREAMING_MERGE, JobStatusEnum.STOPPED); scheduler.killJob(modelId, JobTypeEnum.STREAMING_BUILD, JobStatusEnum.STOPPED); } @Subscribe public void onStreamingJobDrop(StreamingJobDropEvent streamingJobDropEvent) { - val project = streamingJobDropEvent.getProject(); val modelId = streamingJobDropEvent.getModelId(); val config = KylinConfig.getInstanceFromEnv(); - val mgr = StreamingJobManager.getInstance(config, project); + val mgr = StreamingJobManager.getInstance(config, streamingJobDropEvent.getProject()); val buildJobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_BUILD.toString()); val mergeJobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_MERGE.toString()); mgr.deleteStreamingJob(buildJobId); diff --git a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java index f79dd9bec9..7b6e29bd25 100644 --- a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java +++ b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java @@ -18,47 +18,53 @@ package org.apache.kylin.streaming.jobs; -import static org.apache.kylin.common.exception.ServerErrorCode.READ_KAFKA_JAAS_FILE_ERROR; +import static org.apache.commons.lang3.StringUtils.INDEX_NOT_FOUND; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.KAFKA_JAAS_FILE_KAFKACLIENT_NOT_EXISTS; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.READ_KAFKA_JAAS_FILE_ERROR; import static org.apache.kylin.streaming.constants.StreamingConstants.STREAMING_CONFIG_PREFIX; import static org.apache.kylin.streaming.constants.StreamingConstants.STREAMING_KAFKA_CONFIG_PREFIX; import static org.apache.kylin.streaming.constants.StreamingConstants.STREAMING_TABLE_REFRESH_INTERVAL; import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.commons.collections.MapUtils; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.security.JaasContext; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.exception.KylinException; -import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.project.NProjectManager; -import org.apache.log4j.Logger; import com.google.common.collect.Maps; +import lombok.SneakyThrows; import lombok.val; +import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @Slf4j +@UtilityClass public class StreamingJobUtils { - private static final Logger logger = Logger.getLogger(StreamingJobUtils.class); /** * kylin.properties config -> model config -> job config - * - * @return */ public static KylinConfig getStreamingKylinConfig(final KylinConfig originalConfig, Map<String, String> jobParams, String modelId, String project) { KylinConfigExt kylinConfigExt; - val dataflowId = modelId; - if (StringUtils.isNotBlank(dataflowId)) { + if (StringUtils.isNotBlank(modelId)) { val dataflowManager = NDataflowManager.getInstance(originalConfig, project); - kylinConfigExt = dataflowManager.getDataflow(dataflowId).getConfig(); + kylinConfigExt = dataflowManager.getDataflow(modelId).getConfig(); } else { val projectInstance = NProjectManager.getInstance(originalConfig).getProject(project); kylinConfigExt = projectInstance.getConfig(); @@ -89,22 +95,121 @@ public class StreamingJobUtils { return KylinConfigExt.createInstance(kylinConfigExt, streamingJobOverrides); } - public static String extractKafkaSaslJaasConf() { + public static String extractKafkaJaasConf(boolean useAbsKeyTabPath) { val kapConfig = KapConfig.getInstanceFromEnv(); if (!kapConfig.isKafkaJaasEnabled()) { return null; } - File file = new File(kapConfig.getKafkaJaasConfPath()); + String jaasOriginText = extractJaasText(); + if (StringUtils.isEmpty(jaasOriginText)) { + return null; + } + String jaasTextRewrite = rewriteJaasConf(jaasOriginText); + return rewriteKeyTab(jaasTextRewrite, useAbsKeyTabPath); + } + + /** + * extract keytab abs path in kafka jaas + */ + public static String getJaasKeyTabAbsPath() { + val kapConfig = KapConfig.getInstanceFromEnv(); + if (!kapConfig.isKafkaJaasEnabled()) { + return null; + } + String jaasOriginText = extractJaasText(); + if (StringUtils.isEmpty(jaasOriginText)) { + return null; + } + String jaasRewriteText = rewriteJaasConf(jaasOriginText); + String keyTabPath = getKeyTabPathFromJaas(jaasRewriteText); + if (StringUtils.isEmpty(keyTabPath)) { + return null; + } + return FileUtils.getFile(keyTabPath).getAbsolutePath(); + } + + @SneakyThrows + public static void createExecutorJaas() { + // extract origin kafka jaas file, rewrite keytab path if exists + // write it into {KYLIN_HOME}/hadoop_conf + val kapConfig = KapConfig.getInstanceFromEnv(); + if (!kapConfig.isKafkaJaasEnabled()) { + return; + } + String jaasConf = extractKafkaJaasConf(false); + if (StringUtils.isEmpty(jaasConf)) { + return; + } + String jaasResultText = "KafkaClient { " + jaasConf + " };"; + String jaasPath = getExecutorJaasPath(); + File executorJaasConfFile = FileUtils.getFile(jaasPath); + FileUtils.write(executorJaasConfFile, jaasResultText, StandardCharsets.UTF_8, false); + log.info("extract kafka jaas file to {}", jaasPath); + } + + public static String getExecutorJaasPath() { + // {KYLIN_HOME}/hadoop_conf/kafka_jaas.conf + return HadoopUtil.getHadoopConfDir() + File.separator + getExecutorJaasName(); + } + + public static String getExecutorJaasName() { + return KapConfig.getInstanceFromEnv().getKafkaJaasConf(); + } + + /** + * read kafka jaas conf + */ + private static String extractJaasText() { + val kapConfig = KapConfig.getInstanceFromEnv(); + File jaasFile = new File(kapConfig.getKafkaJaasConfPath()); + String jaasOriginText; try { - val text = FileUtils.readFileToString(file); - int kafkaClientIdx = text.indexOf("KafkaClient"); - if (StringUtils.isNotEmpty(text) && kafkaClientIdx != -1) { - return text.substring(text.indexOf("{") + 1, text.indexOf("}")).trim(); - } - } catch (Exception e) { - logger.error("read kafka jaas file error ", e); + jaasOriginText = FileUtils.readFileToString(jaasFile, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new KylinException(READ_KAFKA_JAAS_FILE_ERROR, e); + } + if (StringUtils.indexOf(jaasOriginText, "KafkaClient") == INDEX_NOT_FOUND) { + throw new KylinException(KAFKA_JAAS_FILE_KAFKACLIENT_NOT_EXISTS); + } + return jaasOriginText; + } + + /** + * input: KafkaClient { *****; }; + * output: *****; + */ + private static String rewriteJaasConf(String jaasText) { + int start = StringUtils.indexOf(jaasText, '{') + 1; + int end = StringUtils.indexOf(jaasText, '}'); + return StringUtils.substring(jaasText, start, end).trim(); + } + + private static String rewriteKeyTab(String jaasText, boolean useAbsKeyTabPath) { + String keyTabPath = getKeyTabPathFromJaas(jaasText); + if (StringUtils.isEmpty(keyTabPath)) { + return jaasText; + } + File keyTabFile = FileUtils.getFile(keyTabPath); + String replacement = keyTabFile.getName(); + if (useAbsKeyTabPath) { + replacement = keyTabFile.getAbsolutePath(); + } + log.info("kafka jaas replace {} -> {}", keyTabPath, replacement); + return StringUtils.replace(jaasText, keyTabPath, replacement); + } + + public static String getKeyTabPathFromJaas(String jaasStr) { + Map<String, Password> map = Maps.newHashMap(); + map.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasStr)); + val configEntry = JaasContext.loadClientContext(map).configurationEntries().get(0); + String keyTabPath = (String) configEntry.getOptions().getOrDefault("keyTab", null); + if (StringUtils.isEmpty(keyTabPath)) { + return null; + } + if (!FileUtils.getFile(keyTabPath).exists()) { + throw new KylinException(KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS); } - throw new KylinException(READ_KAFKA_JAAS_FILE_ERROR, MsgPicker.getMsg().getReadKafkaJaasFileError()); + return keyTabPath; } } diff --git a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java index 499e7c92c3..d748f0b4fb 100644 --- a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java +++ b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java @@ -34,6 +34,8 @@ import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXEC import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXECUTOR_MEM; import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXECUTOR_MEM_DEFAULT; import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXECUTOR_OPTS; +import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_KERBEROS_KEYTAB; +import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_KERBEROS_PRINCIPAL; import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_MASTER; import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_MASTER_DEFAULT; import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_SHUFFLE_PARTITIONS; @@ -64,7 +66,7 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; @@ -108,6 +110,7 @@ import lombok.extern.slf4j.Slf4j; public class StreamingJobLauncher extends AbstractSparkJobLauncher { private static final String KRB5CONF_PROPS = "java.security.krb5.conf"; private static final String JAASCONF_PROPS = "java.security.auth.login.config"; + private static final String HADOOP_CONF_PATH = "./__spark_conf__/__hadoop_conf__/"; private Map<String, String> jobParams; private String mainClazz; private String[] appArgs; @@ -138,6 +141,8 @@ public class StreamingJobLauncher extends AbstractSparkJobLauncher { jobParams.getOrDefault(STREAMING_DURATION, STREAMING_DURATION_DEFAULT), jobParams.getOrDefault(STREAMING_WATERMARK, STREAMING_WATERMARK_DEFAULT), distMetaStorageUrl.toString() }; + // build job extract and create kafka jaas file + StreamingJobUtils.createExecutorJaas(); break; } case STREAMING_MERGE: { @@ -267,32 +272,6 @@ public class StreamingJobLauncher extends AbstractSparkJobLauncher { } } - private String wrapDriverJavaOptions(Map<String, String> sparkConf) { - val driverJavaOptsConfigStr = sparkConf.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS); - - Preconditions.checkNotNull(driverJavaOptsConfigStr, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS + " is empty"); - StringBuilder driverJavaOptionsSB = new StringBuilder(driverJavaOptsConfigStr); - val kapConfig = KapConfig.getInstanceFromEnv(); - if (kapConfig.isKerberosEnabled() && !driverJavaOptsConfigStr.contains(KRB5CONF_PROPS)) { - val krb5conf = " -Djava.security.krb5.conf=" + kapConfig.getKerberosKrb5ConfPath(); - driverJavaOptionsSB.append(krb5conf); - } - if (kapConfig.isKafkaJaasEnabled() && !driverJavaOptsConfigStr.contains(JAASCONF_PROPS)) { - val jaasConf = " -Djava.security.auth.login.config=" + kapConfig.getKafkaJaasConfPath(); - driverJavaOptionsSB.append(jaasConf); - } - driverJavaOptionsSB.append(javaPropertyFormatter(REST_SERVER_IP, AddressUtil.getLocalHostExactAddress())); - driverJavaOptionsSB.append(javaPropertyFormatter("kylin.hdfs.working.dir", config.getHdfsWorkingDirectory())); - driverJavaOptionsSB - .append(javaPropertyFormatter("spark.driver.log4j.appender.hdfs.File", getDriverHDFSLogPath())); - driverJavaOptionsSB.append(javaPropertyFormatter("user.timezone", config.getTimeZone())); - - final String driverLog4jXmlFile = config.getLogSparkStreamingDriverPropertiesFile(); - generateLog4jConfiguration(false, driverJavaOptionsSB, driverLog4jXmlFile); - - return driverJavaOptionsSB.toString(); - } - private void generateLog4jConfiguration(boolean isExecutor, StringBuilder log4jJavaOptionsSB, String log4jXmlFile) { String log4jConfigStr = "file:" + log4jXmlFile; @@ -304,47 +283,74 @@ public class StreamingJobLauncher extends AbstractSparkJobLauncher { log4jJavaOptionsSB.append(javaPropertyFormatter("log4j.configurationFile", log4jConfigStr)); } - private String wrapExecutorJavaOptions(Map<String, String> sparkConf) { - val executorJavaOptsConfigStr = sparkConf.get(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS); + private String wrapDriverJavaOptions(Map<String, String> sparkConf) { + val existOptStr = sparkConf.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS); + Preconditions.checkNotNull(existOptStr, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS + " is empty"); + + StringBuilder driverJavaOptSB = new StringBuilder(existOptStr); + val kapConfig = KapConfig.getInstanceFromEnv(); + rewriteKrb5Conf(driverJavaOptSB, existOptStr, kapConfig.getKerberosKrb5ConfPath()); + // client driver kafka_jaas use local file + // cluster driver use remote kafka_jaas file + rewriteKafkaJaasConf(driverJavaOptSB, existOptStr, kapConfig.getKafkaJaasConfPath()); + driverJavaOptSB.append(javaPropertyFormatter(REST_SERVER_IP, AddressUtil.getLocalHostExactAddress())); + driverJavaOptSB.append(javaPropertyFormatter("kylin.hdfs.working.dir", config.getHdfsWorkingDirectory())); + driverJavaOptSB.append(javaPropertyFormatter("spark.driver.log4j.appender.hdfs.File", getDriverHDFSLogPath())); + driverJavaOptSB.append(javaPropertyFormatter("user.timezone", config.getTimeZone())); - Preconditions.checkNotNull(executorJavaOptsConfigStr, SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS + " is empty"); + final String driverLog4jXmlFile = config.getLogSparkStreamingDriverPropertiesFile(); + generateLog4jConfiguration(false, driverJavaOptSB, driverLog4jXmlFile); + return driverJavaOptSB.toString(); + } + + private String wrapExecutorJavaOptions(Map<String, String> sparkConf) { + val existOptionsStr = sparkConf.get(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS); + Preconditions.checkNotNull(existOptionsStr, SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS + " is empty"); - StringBuilder executorJavaOptionsSB = new StringBuilder(executorJavaOptsConfigStr); val kapConfig = KapConfig.getInstanceFromEnv(); - if (kapConfig.isKerberosEnabled() && !executorJavaOptsConfigStr.contains(KRB5CONF_PROPS)) { - val krb5Conf = " -Djava.security.krb5.conf=./__spark_conf__/__hadoop_conf__/" - + kapConfig.getKerberosKrb5Conf(); - executorJavaOptionsSB.append(krb5Conf); - } - if (kapConfig.isKafkaJaasEnabled() && !executorJavaOptsConfigStr.contains(JAASCONF_PROPS)) { - val jaasConf = " -Djava.security.auth.login.config=./" + kapConfig.getKafkaJaasConf(); - executorJavaOptionsSB.append(jaasConf); - } - executorJavaOptionsSB.append(javaPropertyFormatter("kap.spark.identifier", jobId)); - executorJavaOptionsSB.append(javaPropertyFormatter("kap.spark.jobTimeStamp", currentTimestamp.toString())); - executorJavaOptionsSB.append(javaPropertyFormatter("kap.spark.project", project)); - executorJavaOptionsSB.append(javaPropertyFormatter("user.timezone", config.getTimeZone())); + StringBuilder executorJavaOptSB = new StringBuilder(existOptionsStr); + rewriteKrb5Conf(executorJavaOptSB, existOptionsStr, HADOOP_CONF_PATH + kapConfig.getKerberosKrb5Conf()); + // executor always use remote kafka_jaas file + rewriteKafkaJaasConf(executorJavaOptSB, existOptionsStr, + HADOOP_CONF_PATH + StreamingJobUtils.getExecutorJaasName()); + executorJavaOptSB.append(javaPropertyFormatter("kap.spark.identifier", jobId)); + executorJavaOptSB.append(javaPropertyFormatter("kap.spark.jobTimeStamp", currentTimestamp.toString())); + executorJavaOptSB.append(javaPropertyFormatter("kap.spark.project", project)); + executorJavaOptSB.append(javaPropertyFormatter("user.timezone", config.getTimeZone())); if (StringUtils.isNotBlank(config.getMountSparkLogDir())) { - executorJavaOptionsSB.append(javaPropertyFormatter("job.mountDir", config.getMountSparkLogDir())); + executorJavaOptSB.append(javaPropertyFormatter("job.mountDir", config.getMountSparkLogDir())); } final String executorLog4jXmlFile = config.getLogSparkStreamingExecutorPropertiesFile(); - generateLog4jConfiguration(true, executorJavaOptionsSB, executorLog4jXmlFile); - - return executorJavaOptionsSB.toString(); + generateLog4jConfiguration(true, executorJavaOptSB, executorLog4jXmlFile); + return executorJavaOptSB.toString(); } private String wrapYarnAmJavaOptions(Map<String, String> sparkConf) { - val yarnAmJavaOptsConfigStr = sparkConf.getOrDefault(SPARK_YARN_AM_OPTS, ""); + val existOptStr = sparkConf.getOrDefault(SPARK_YARN_AM_OPTS, ""); + val kapConfig = KapConfig.getInstanceFromEnv(); + StringBuilder yarnAmJavaOptSB = new StringBuilder(existOptStr); + rewriteKrb5Conf(yarnAmJavaOptSB, existOptStr, HADOOP_CONF_PATH + kapConfig.getKerberosKrb5Conf()); + return yarnAmJavaOptSB.toString(); + } + + private void rewriteKafkaJaasConf(StringBuilder sb, String existOptStr, String value) { + KapConfig kapConfig = KapConfig.getInstanceFromEnv(); + if (!kapConfig.isKafkaJaasEnabled() || !jobType.equals(JobTypeEnum.STREAMING_BUILD) + || existOptStr.contains(JAASCONF_PROPS)) { + return; + } + String jaasConf = javaPropertyFormatter(JAASCONF_PROPS, value); + sb.append(jaasConf); + } - StringBuilder yarnAmJavaOptionsSB = new StringBuilder(yarnAmJavaOptsConfigStr); + private void rewriteKrb5Conf(StringBuilder sb, String existConfStr, String value) { val kapConfig = KapConfig.getInstanceFromEnv(); - if (kapConfig.isKerberosEnabled() && !yarnAmJavaOptsConfigStr.contains(KRB5CONF_PROPS)) { - val krb5Conf = " -Djava.security.krb5.conf=./__spark_conf__/__hadoop_conf__/" - + kapConfig.getKerberosKrb5Conf(); - yarnAmJavaOptionsSB.append(krb5Conf); + if (!kapConfig.isKerberosEnabled() || existConfStr.contains(KRB5CONF_PROPS)) { + return; } - return yarnAmJavaOptionsSB.toString(); + val krb5Conf = javaPropertyFormatter(KRB5CONF_PROPS, value); + sb.append(krb5Conf); } private void addParserJar(SparkLauncher sparkLauncher) { @@ -361,18 +367,22 @@ public class StreamingJobLauncher extends AbstractSparkJobLauncher { Map<String, String> sparkConf = getStreamingSparkConfig(config); sparkConf.forEach((key, value) -> launcher.setConf(key, value)); - val numberOfExecutor = sparkConf.getOrDefault(SPARK_EXECUTOR_INSTANCES, SPARK_EXECUTOR_INSTANCES_DEFAULT); - val numberOfCore = sparkConf.getOrDefault(SPARK_EXECUTOR_CORES, SPARK_EXECUTOR_CORES_DEFAULT); val sparkLauncher = launcher.setAppName(jobId).setSparkHome(KylinConfig.getSparkHome()); val kapConfig = KapConfig.getInstanceFromEnv(); if (kapConfig.isKerberosEnabled()) { - sparkLauncher.setConf("spark.kerberos.keytab", kapConfig.getKerberosKeytabPath()); - sparkLauncher.setConf("spark.kerberos.principal", kapConfig.getKerberosPrincipal()); + sparkLauncher.setConf(SPARK_KERBEROS_KEYTAB, kapConfig.getKerberosKeytabPath()); + sparkLauncher.setConf(SPARK_KERBEROS_PRINCIPAL, kapConfig.getKerberosPrincipal()); } - if (kapConfig.isKafkaJaasEnabled()) { - sparkLauncher.addFile(kapConfig.getKafkaJaasConfPath()); + if (kapConfig.isKafkaJaasEnabled() && jobType.equals(JobTypeEnum.STREAMING_BUILD)) { + String keyTabAbsPath = StreamingJobUtils.getJaasKeyTabAbsPath(); + if (StringUtils.isNotEmpty(keyTabAbsPath)) { + // upload keytab in kafka jaas + sparkLauncher.addFile(keyTabAbsPath); + } } addParserJar(sparkLauncher); + val numberOfExecutor = sparkConf.getOrDefault(SPARK_EXECUTOR_INSTANCES, SPARK_EXECUTOR_INSTANCES_DEFAULT); + val numberOfCore = sparkConf.getOrDefault(SPARK_EXECUTOR_CORES, SPARK_EXECUTOR_CORES_DEFAULT); sparkLauncher.setMaster(sparkConf.getOrDefault(SPARK_MASTER, SPARK_MASTER_DEFAULT)) .setConf(SPARK_DRIVER_MEM, sparkConf.getOrDefault(SPARK_DRIVER_MEM, SPARK_DRIVER_MEM_DEFAULT)) .setConf(SPARK_EXECUTOR_INSTANCES, numberOfExecutor).setConf(SPARK_EXECUTOR_CORES, numberOfCore) diff --git a/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala b/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala index 80653ed87c..13fb54a288 100644 --- a/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala +++ b/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala @@ -71,9 +71,6 @@ class CreateStreamingFlatTable(entry: CreateFlatTableEntry) extends kafkaJobParams.remove(SASL_MECHANISM); kafkaJobParams.put("kafka." + SASL_MECHANISM, saslMechanism.get) } - val text = StreamingJobUtils.extractKafkaSaslJaasConf - if (StringUtils.isNotEmpty(text)) kafkaJobParams.put(SaslConfigs.SASL_JAAS_CONFIG, text) - kafkaJobParams.foreach { param => param._1 match { case MAX_OFFSETS_PER_TRIGGER => if (param._2.toInt > 0) { diff --git a/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java b/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java index 13514fca7e..8f86a53c31 100644 --- a/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java +++ b/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.kylin.kafka.util; import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; import java.io.File; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Properties; @@ -109,8 +110,9 @@ public class KafkaUtilsTest extends StreamingTestCase { val kapConfig = KapConfig.getInstanceFromEnv(); FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), - "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}"); - val text = StreamingJobUtils.extractKafkaSaslJaasConf(); + "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;};", + StandardCharsets.UTF_8); + val text = StreamingJobUtils.extractKafkaJaasConf(true); Assert.assertNull(text); Pair<Boolean, String> kafkaJaasTextPair = (Pair<Boolean, String>) ReflectionUtils.getField(KafkaUtils.class, "kafkaJaasTextPair"); diff --git a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java index bb419f7510..5d0ce9de37 100644 --- a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java +++ b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java @@ -27,10 +27,10 @@ import java.util.Optional; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.execution.JobTypeEnum; -import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.metadata.cube.utils.StreamingUtils; import org.apache.kylin.streaming.event.StreamingJobDropEvent; import org.apache.kylin.streaming.event.StreamingJobKillEvent; @@ -52,15 +52,15 @@ import lombok.var; public class StreamingJobListenerTest extends StreamingTestCase { - private static String PROJECT = "streaming_test"; - private static String MODEL_ID = "e78a89dd-847f-4574-8afa-8768b4228b72"; + private static final String PROJECT = "streaming_test"; + private static final String MODEL_ID = "e78a89dd-847f-4574-8afa-8768b4228b72"; @Rule public ExpectedException thrown = ExpectedException.none(); @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public TestName testName = new TestName(); - private StreamingJobListener eventListener = new StreamingJobListener(); + private final StreamingJobListener eventListener = new StreamingJobListener(); @Before public void setUp() throws Exception { @@ -81,9 +81,7 @@ public class StreamingJobListenerTest extends StreamingTestCase { val listener = new StreamingJobListener(PROJECT, jobId); val testConfig = getTestConfig(); var mgr = StreamingJobManager.getInstance(testConfig, PROJECT); - mgr.updateStreamingJob(jobId, copyForWrite -> { - copyForWrite.setSkipListener(true); - }); + mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setSkipListener(true)); listener.stateChanged(mockRunningState()); var jobMeta = mgr.getStreamingJobByUuid(jobId); Assert.assertEquals(JobStatusEnum.RUNNING, jobMeta.getCurrentStatus()); @@ -95,9 +93,7 @@ public class StreamingJobListenerTest extends StreamingTestCase { val jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.toString()); val testConfig = getTestConfig(); var mgr = StreamingJobManager.getInstance(testConfig, PROJECT); - mgr.updateStreamingJob(jobId, copyForWrite -> { - copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING); - }); + mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING)); val listener = new StreamingJobListener(PROJECT, jobId); listener.stateChanged(mockFailedState()); var jobMeta = mgr.getStreamingJobByUuid(jobId); @@ -128,9 +124,7 @@ public class StreamingJobListenerTest extends StreamingTestCase { val jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.toString()); val testConfig = getTestConfig(); var mgr = StreamingJobManager.getInstance(testConfig, PROJECT); - mgr.updateStreamingJob(jobId, copyForWrite -> { - copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING); - }); + mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING)); val listener = new StreamingJobListener(PROJECT, jobId); listener.stateChanged(mockKilledState()); var jobMeta = mgr.getStreamingJobByUuid(jobId); @@ -172,12 +166,8 @@ public class StreamingJobListenerTest extends StreamingTestCase { var mgr = StreamingJobManager.getInstance(config, project); val buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build"; val mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge"; - mgr.updateStreamingJob(buildJobId, copyForWrite -> { - copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING); - }); - mgr.updateStreamingJob(mergeJobId, copyForWrite -> { - copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING); - }); + mgr.updateStreamingJob(buildJobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING)); + mgr.updateStreamingJob(mergeJobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING)); var buildJobMeta = mgr.getStreamingJobByUuid(buildJobId); var mergeJobMeta = mgr.getStreamingJobByUuid(mergeJobId); Assert.assertEquals(JobStatusEnum.RUNNING, buildJobMeta.getCurrentStatus()); @@ -308,7 +298,7 @@ public class StreamingJobListenerTest extends StreamingTestCase { @Override public Optional<Throwable> getError() { - return null; + return Optional.empty(); } - }; + } } diff --git a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java index 6c27a82cc4..8a92cdc62b 100644 --- a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java +++ b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java @@ -17,27 +17,29 @@ */ package org.apache.kylin.streaming.jobs; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.READ_KAFKA_JAAS_FILE_ERROR; + import java.io.File; +import java.nio.charset.StandardCharsets; import org.apache.commons.io.FileUtils; import org.apache.kylin.common.KapConfig; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.streaming.manager.StreamingJobManager; import org.apache.kylin.streaming.util.StreamingTestCase; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import lombok.val; public class StreamingJobUtilsTest extends StreamingTestCase { private static final String PROJECT = "streaming_test"; private static final String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73"; - @Rule - public ExpectedException thrown = ExpectedException.none(); @Before public void setUp() throws Exception { @@ -88,24 +90,100 @@ public class StreamingJobUtilsTest extends StreamingTestCase { @Test public void testExtractKafkaSaslJaasConf() throws Exception { val kapConfig = KapConfig.getInstanceFromEnv(); - Assert.assertNull(StreamingJobUtils.extractKafkaSaslJaasConf()); + Assert.assertNull(StreamingJobUtils.extractKafkaJaasConf(true)); getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true"); FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), - "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}"); - val text = StreamingJobUtils.extractKafkaSaslJaasConf(); + "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}", + StandardCharsets.UTF_8); + val text = StreamingJobUtils.extractKafkaJaasConf(true); Assert.assertNotNull(text); getTestConfig().setProperty("kylin.kafka-jaas-conf", "kafka_err_jaas.conf"); File file = new File(kapConfig.getKafkaJaasConfPath()); - FileUtils.write(file, "}4{"); + FileUtils.write(file, "}4{", StandardCharsets.UTF_8); try { - StreamingJobUtils.extractKafkaSaslJaasConf(); + StreamingJobUtils.extractKafkaJaasConf(true); } catch (Exception e) { Assert.assertTrue(e instanceof KylinException); - Assert.assertEquals("KE-010035015", ((KylinException) e).getErrorCode().getCodeString()); + Assert.assertEquals("KE-010035217", ((KylinException) e).getErrorCode().getCodeString()); } finally { FileUtils.deleteQuietly(new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath())); } } + + @Test + public void testCheckKeyTabFileUnderJaas() throws Exception { + val kapConfig = KapConfig.getInstanceFromEnv(); + Assert.assertNull(StreamingJobUtils.extractKafkaJaasConf(true)); + Assert.assertNull(StreamingJobUtils.getJaasKeyTabAbsPath()); + KylinConfig kylinConfig = getTestConfig(); + kylinConfig.setProperty("kylin.kafka-jaas.enabled", "true"); + File testKeyTab = new File(KylinConfig.getKylinConfDir() + File.separator + "test.keytab"); + + // jaas not exist + Assert.assertThrows(READ_KAFKA_JAAS_FILE_ERROR.getMsg(), KylinException.class, + () -> StreamingJobUtils.extractKafkaJaasConf(true)); + + // jaas keytab key not exist + FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), + "KafkaClient { " + "com.sun.security.auth.module.Krb5LoginModule required " + "useKeyTab=true " + + "storeKey=true " + "principal=\"ky...@dev.com\" " + "serviceName=\"kafka\";" + " };", + StandardCharsets.UTF_8); + Assert.assertNull(StreamingJobUtils.getJaasKeyTabAbsPath()); + + // jaas exist but keytab not exist + FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), + "KafkaClient { " + "com.sun.security.auth.module.Krb5LoginModule required " + "useKeyTab=true " + + "storeKey=true " + "keyTab=\"" + testKeyTab + "\" " + "principal=\"ky...@dev.com\" " + + "serviceName=\"kafka\";" + " };", + StandardCharsets.UTF_8); + Assert.assertThrows(KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS.getMsg(), KylinException.class, + () -> StreamingJobUtils.extractKafkaJaasConf(true)); + + // all exist + FileUtils.write(testKeyTab, "test", StandardCharsets.UTF_8); + val text = StreamingJobUtils.extractKafkaJaasConf(true); + Assert.assertNotNull(text); + String keyTabAbsPath = StreamingJobUtils.getJaasKeyTabAbsPath(); + Assert.assertEquals(testKeyTab.getAbsolutePath(), keyTabAbsPath); + String executorJaasName = StreamingJobUtils.getExecutorJaasName(); + Assert.assertEquals(kapConfig.getKafkaJaasConf(), executorJaasName); + String executorJaasPath = StreamingJobUtils.getExecutorJaasPath(); + Assert.assertEquals(HadoopUtil.getHadoopConfDir() + File.separator + executorJaasName, executorJaasPath); + kylinConfig.setProperty("kylin.kafka-jaas-conf", "kafka_err_jaas.conf"); + } + + @Test + public void testCreateExecutorJaas() throws Exception { + KapConfig kapConfig = KapConfig.getInstanceFromEnv(); + String executorJaasPath = HadoopUtil.getHadoopConfDir() + File.separator + kapConfig.getKafkaJaasConf(); + File executorJaasFile = new File(executorJaasPath); + executorJaasFile.deleteOnExit(); + StreamingJobUtils.createExecutorJaas(); + Assert.assertFalse(executorJaasFile.exists()); + getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true"); + + { + String jaasContext = "KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required; };"; + FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), jaasContext, StandardCharsets.UTF_8); + StreamingJobUtils.createExecutorJaas(); + Assert.assertTrue(executorJaasFile.exists()); + Assert.assertEquals(jaasContext, FileUtils.readFileToString(executorJaasFile, StandardCharsets.UTF_8)); + } + + { + File testKeyTab = new File(KylinConfig.getKylinConfDir() + File.separator + "test.keytab"); + FileUtils.write(testKeyTab, "test", StandardCharsets.UTF_8); + String jaasContext = "KafkaClient { " + "com.sun.security.auth.module.Krb5LoginModule required " + + "useKeyTab=true " + "storeKey=true " + "keyTab=\"" + testKeyTab.getAbsolutePath() + "\" " + + "principal=\"ky...@dev.com\" " + "serviceName=\"kafka\";" + " };"; + FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), jaasContext, StandardCharsets.UTF_8); + StreamingJobUtils.createExecutorJaas(); + Assert.assertTrue(executorJaasFile.exists()); + Assert.assertEquals(jaasContext.replace(testKeyTab.getAbsolutePath(), testKeyTab.getName()), + FileUtils.readFileToString(executorJaasFile, StandardCharsets.UTF_8)); + } + executorJaasFile.deleteOnExit(); + } } diff --git a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java index 43251502dd..11640895c5 100644 --- a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java +++ b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java @@ -22,6 +22,7 @@ import static org.apache.kylin.streaming.constants.StreamingConstants.DEFAULT_PA import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Locale; import java.util.Map; @@ -186,6 +187,9 @@ public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase { "-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./kafka_jaas.conf"); val mockup = new MockupSparkLauncher(); ReflectionUtils.setField(launcher, "launcher", mockup); + FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), + "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}", + StandardCharsets.UTF_8); launcher.startYarnJob(); Assert.assertNotNull(mockup.sparkConf.get("spark.driver.extraJavaOptions")); Assert.assertNotNull(mockup.sparkConf.get("spark.executor.extraJavaOptions")); @@ -205,14 +209,15 @@ public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase { val kapConfig = KapConfig.getInstanceFromEnv(); config.setProperty("kylin.kafka-jaas.enabled", "true"); FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), - "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}"); + "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}", + StandardCharsets.UTF_8); val mockup = new MockupSparkLauncher(); ReflectionUtils.setField(launcher, "launcher", mockup); launcher.startYarnJob(); Assert.assertNotNull(mockup.sparkConf.get("spark.kerberos.keytab")); Assert.assertNotNull(mockup.sparkConf.get("spark.kerberos.principal")); - Assert.assertTrue(mockup.files.contains(kapConfig.getKafkaJaasConfPath())); + Assert.assertFalse(mockup.files.contains(kapConfig.getKafkaJaasConfPath())); } finally { FileUtils.deleteQuietly(new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath())); } @@ -495,6 +500,30 @@ public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase { Assert.assertTrue(mockup.jars.contains("default")); } + @Test + public void testStartYarnBuildJobWithoutExtraOpts() throws Exception { + val config = getTestConfig(); + val modelId = "e78a89dd-847f-4574-8afa-8768b4228b72"; + + val launcher = new StreamingJobLauncher(); + launcher.init(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD); + config.setProperty("kylin.kerberos.enabled", "true"); + config.setProperty("kylin.tool.mount-spark-log-dir", "."); + val kapConfig = KapConfig.getInstanceFromEnv(); + + config.setProperty("kylin.kerberos.enabled", "true"); + config.setProperty("kylin.kafka-jaas.enabled", "true"); + val mockup = new MockupSparkLauncher(); + ReflectionUtils.setField(launcher, "launcher", mockup); + FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), + "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}", + StandardCharsets.UTF_8); + launcher.startYarnJob(); + Assert.assertNotNull(mockup.sparkConf.get("spark.driver.extraJavaOptions")); + Assert.assertNotNull(mockup.sparkConf.get("spark.executor.extraJavaOptions")); + Assert.assertNotNull(mockup.sparkConf.get("spark.yarn.am.extraJavaOptions")); + } + static class MockupSparkLauncher extends SparkLauncher { private Map<String, String> sparkConf; private List<String> files;