This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c16c2e712ccc98c74bed7368d44fedcd05a66584
Author: Jiale He <35652389+jial...@users.noreply.github.com>
AuthorDate: Fri Dec 9 17:18:10 2022 +0800

    KYLIN-5442 Optimized for loading Kafka Kerberos keyTab
---
 .../common/exception/code/ErrorCodeServer.java     |   3 +
 .../resources/kylin_error_msg_conf_cn.properties   |   3 +
 .../resources/kylin_error_msg_conf_en.properties   |   3 +
 .../kylin_error_suggestion_conf_cn.properties      |   5 +
 .../kylin_error_suggestion_conf_en.properties      |   6 +
 .../main/resources/kylin_errorcode_conf.properties |   4 +
 .../org/apache/kylin/kafka/util/KafkaUtils.java    |   5 +-
 .../streaming/constants/StreamingConstants.java    |   6 +
 .../kylin/streaming/jobs/StreamingJobListener.java |  20 +--
 .../kylin/streaming/jobs/StreamingJobUtils.java    | 145 ++++++++++++++++++---
 .../streaming/jobs/impl/StreamingJobLauncher.java  | 132 ++++++++++---------
 .../kylin/streaming/CreateStreamingFlatTable.scala |   3 -
 .../apache/kylin/kafka/util/KafkaUtilsTest.java    |   6 +-
 .../streaming/jobs/StreamingJobListenerTest.java   |  32 ++---
 .../streaming/jobs/StreamingJobUtilsTest.java      |  98 ++++++++++++--
 .../jobs/impl/StreamingJobLauncherTest.java        |  33 ++++-
 16 files changed, 368 insertions(+), 136 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
index 7b60e33ab2..b5eb0fd98c 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
@@ -145,6 +145,9 @@ public enum ErrorCodeServer implements ErrorCodeProducer {
 
     // 100352XX Streaming
     STREAMING_PARSE_MESSAGE_ERROR("KE-010035202"),
+    READ_KAFKA_JAAS_FILE_ERROR("KE-010035215"),
+    KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS("KE-010035216"),
+    KAFKA_JAAS_FILE_KAFKACLIENT_NOT_EXISTS("KE-010035217"),
 
     // 100422XX CUSTOM PARSER
     CUSTOM_PARSER_NOT_JAR("KE-010042201"),
diff --git 
a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties 
b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
index cf284ffba4..be8bca221a 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
@@ -136,6 +136,9 @@ KE-010043221=参数 “%s” 已存在。请检查后重试。
 
 ## Streaming
 KE-010035202=使用解析器 “%s” 解析Topic “%s” 的消息时发生异常,请检查后重试。
+KE-010035215=无法正确读取 Kafka 认证文件,请检查后再试。
+KE-010035216=Kafka 认证文件中的 keyTab 文件不存在,请检查后重试。
+KE-010035217=Kafka 认证文件中不存在 “KafkaClient”,请检查后重试。
 
 
 ## XX 100422 Custom Parser
diff --git 
a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties 
b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
index f8a170e52e..b44c0ef422 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
@@ -133,6 +133,9 @@ KE-010043221=The parameter “%s” already exists. Please 
check and try again.
 
 ## Streaming
 KE-010035202=An exception occurred while parsing the messages of Topic "%2$s" 
with parser "%1$s". Please check and try again.
+KE-010035215=Can't read Kafka authentication file correctly. Please check and 
try again.
+KE-010035216=The keyTab file in the Kafka authentication file does not exist, 
please check and try again.
+KE-010035217="KafkaClient" does not exist in the Kafka authentication file, 
please check and try again.
 
 ## XX 100422 Custom Parser
 KE-010042201=The file format is invalid. Please use jar format file.
diff --git 
a/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties 
b/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties
index 10d5e8e0e4..059fa231e7 100644
--- 
a/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties
+++ 
b/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties
@@ -128,6 +128,11 @@ KE-010043219=
 KE-010043220=
 KE-010043221=
 
+## Streaming
+KE-010035215=
+KE-010035216=
+KE-010035217=
+
 # System
 ## 400052XX password
 KE-040005201=
diff --git 
a/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties 
b/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties
index 1b5ffcc1c2..05ce078ac1 100644
--- 
a/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties
+++ 
b/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties
@@ -128,6 +128,12 @@ KE-010043219=
 KE-010043220=
 KE-010043221=
 
+
+## Streaming
+KE-010035215=
+KE-010035216=
+KE-010035217=
+
 # System
 ## 400052XX password
 KE-040005201=
diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf.properties 
b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
index a80a9c19a9..074fb6550e 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
@@ -139,6 +139,10 @@ KE-010043221
 
 ## Streaming
 KE-010035202
+KE-010035215
+KE-010035216
+KE-010035217
+
 
 ## Custom Parser
 KE-010042201
diff --git 
a/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java 
b/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java
index a0cd5905c4..26bb14d07d 100644
--- a/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java
+++ b/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java
@@ -67,8 +67,7 @@ public class KafkaUtils {
         return getKafkaConsumer(brokers, groupId, new Properties());
     }
 
-    public static Consumer<String, ByteBuffer> getKafkaConsumer(String 
brokers, String groupId,
-            Properties properties) {
+    public static Consumer<String, ByteBuffer> getKafkaConsumer(String 
brokers, String groupId, Properties properties) {
         Properties props = getConsumerProperties(brokers, groupId, properties);
         if (mockup != null) {
             return mockup;
@@ -106,7 +105,7 @@ public class KafkaUtils {
         
props.putAll(KylinConfig.getInstanceFromEnv().getStreamingKafkaConfigOverride());
         synchronized (kafkaJaasTextPair) {
             if (Boolean.FALSE.equals(kafkaJaasTextPair.getFirst())) {
-                
kafkaJaasTextPair.setSecond(StreamingJobUtils.extractKafkaSaslJaasConf());
+                
kafkaJaasTextPair.setSecond(StreamingJobUtils.extractKafkaJaasConf(true));
                 kafkaJaasTextPair.setFirst(true);
             }
         }
diff --git 
a/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java
 
b/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java
index 1181f22c01..479c496e69 100644
--- 
a/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java
+++ 
b/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java
@@ -21,6 +21,9 @@ package org.apache.kylin.streaming.constants;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.metadata.HDFSMetadataStore;
 
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
 public class StreamingConstants {
 
     // spark job conf
@@ -38,6 +41,9 @@ public class StreamingConstants {
     public static final String SPARK_DRIVER_OVERHEAD = 
"spark.driver.memoryOverhead";
     public static final String SPARK_DRIVER_OVERHEAD_DEFAULT = "1g";
 
+    public static final String SPARK_KERBEROS_KEYTAB = "spark.kerberos.keytab";
+    public static final String SPARK_KERBEROS_PRINCIPAL = 
"spark.kerberos.principal";
+
     public static final String SPARK_YARN_DIST_JARS = "spark.yarn.dist.jars";
     public static final String SPARK_DRIVER_OPTS = 
"spark.driver.extraJavaOptions";
     public static final String SPARK_EXECUTOR_OPTS = 
"spark.executor.extraJavaOptions";
diff --git 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java
 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java
index c0b295154b..d2070f7987 100644
--- 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java
+++ 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java
@@ -86,40 +86,32 @@ public class StreamingJobListener implements 
SparkAppHandle.Listener {
     }
 
     private boolean isFailed(SparkAppHandle.State state) {
-        if (SparkAppHandle.State.FAILED == state || 
SparkAppHandle.State.KILLED == state
-                || SparkAppHandle.State.LOST == state) {
-            return true;
-        }
-        return false;
+        return SparkAppHandle.State.FAILED == state || 
SparkAppHandle.State.KILLED == state
+                || SparkAppHandle.State.LOST == state;
     }
 
     private boolean isFinished(SparkAppHandle.State state) {
-        if (SparkAppHandle.State.FINISHED == state) {
-            return true;
-        }
-        return false;
+        return SparkAppHandle.State.FINISHED == state;
     }
 
     @Override
     public void infoChanged(SparkAppHandle handler) {
-
+        // just Override
     }
 
     @Subscribe
     public void onStreamingJobKill(StreamingJobKillEvent 
streamingJobKillEvent) {
-        val project = streamingJobKillEvent.getProject();
         val modelId = streamingJobKillEvent.getModelId();
-        StreamingScheduler scheduler = StreamingScheduler.getInstance(project);
+        StreamingScheduler scheduler = 
StreamingScheduler.getInstance(streamingJobKillEvent.getProject());
         scheduler.killJob(modelId, JobTypeEnum.STREAMING_MERGE, 
JobStatusEnum.STOPPED);
         scheduler.killJob(modelId, JobTypeEnum.STREAMING_BUILD, 
JobStatusEnum.STOPPED);
     }
 
     @Subscribe
     public void onStreamingJobDrop(StreamingJobDropEvent 
streamingJobDropEvent) {
-        val project = streamingJobDropEvent.getProject();
         val modelId = streamingJobDropEvent.getModelId();
         val config = KylinConfig.getInstanceFromEnv();
-        val mgr = StreamingJobManager.getInstance(config, project);
+        val mgr = StreamingJobManager.getInstance(config, 
streamingJobDropEvent.getProject());
         val buildJobId = StreamingUtils.getJobId(modelId, 
JobTypeEnum.STREAMING_BUILD.toString());
         val mergeJobId = StreamingUtils.getJobId(modelId, 
JobTypeEnum.STREAMING_MERGE.toString());
         mgr.deleteStreamingJob(buildJobId);
diff --git 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java
 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java
index f79dd9bec9..7b6e29bd25 100644
--- 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java
+++ 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java
@@ -18,47 +18,53 @@
 
 package org.apache.kylin.streaming.jobs;
 
-import static 
org.apache.kylin.common.exception.ServerErrorCode.READ_KAFKA_JAAS_FILE_ERROR;
+import static org.apache.commons.lang3.StringUtils.INDEX_NOT_FOUND;
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.KAFKA_JAAS_FILE_KAFKACLIENT_NOT_EXISTS;
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS;
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.READ_KAFKA_JAAS_FILE_ERROR;
 import static 
org.apache.kylin.streaming.constants.StreamingConstants.STREAMING_CONFIG_PREFIX;
 import static 
org.apache.kylin.streaming.constants.StreamingConstants.STREAMING_KAFKA_CONFIG_PREFIX;
 import static 
org.apache.kylin.streaming.constants.StreamingConstants.STREAMING_TABLE_REFRESH_INTERVAL;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.security.JaasContext;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
 
+import lombok.SneakyThrows;
 import lombok.val;
+import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
+@UtilityClass
 public class StreamingJobUtils {
-    private static final Logger logger = 
Logger.getLogger(StreamingJobUtils.class);
 
     /**
      * kylin.properties config -> model config -> job config
-     *
-     * @return
      */
     public static KylinConfig getStreamingKylinConfig(final KylinConfig 
originalConfig, Map<String, String> jobParams,
             String modelId, String project) {
         KylinConfigExt kylinConfigExt;
-        val dataflowId = modelId;
-        if (StringUtils.isNotBlank(dataflowId)) {
+        if (StringUtils.isNotBlank(modelId)) {
             val dataflowManager = NDataflowManager.getInstance(originalConfig, 
project);
-            kylinConfigExt = 
dataflowManager.getDataflow(dataflowId).getConfig();
+            kylinConfigExt = dataflowManager.getDataflow(modelId).getConfig();
         } else {
             val projectInstance = 
NProjectManager.getInstance(originalConfig).getProject(project);
             kylinConfigExt = projectInstance.getConfig();
@@ -89,22 +95,121 @@ public class StreamingJobUtils {
         return KylinConfigExt.createInstance(kylinConfigExt, 
streamingJobOverrides);
     }
 
-    public static String extractKafkaSaslJaasConf() {
+    public static String extractKafkaJaasConf(boolean useAbsKeyTabPath) {
         val kapConfig = KapConfig.getInstanceFromEnv();
         if (!kapConfig.isKafkaJaasEnabled()) {
             return null;
         }
-        File file = new File(kapConfig.getKafkaJaasConfPath());
+        String jaasOriginText = extractJaasText();
+        if (StringUtils.isEmpty(jaasOriginText)) {
+            return null;
+        }
+        String jaasTextRewrite = rewriteJaasConf(jaasOriginText);
+        return rewriteKeyTab(jaasTextRewrite, useAbsKeyTabPath);
+    }
+
+    /**
+     * extract keytab abs path in kafka jaas
+     */
+    public static String getJaasKeyTabAbsPath() {
+        val kapConfig = KapConfig.getInstanceFromEnv();
+        if (!kapConfig.isKafkaJaasEnabled()) {
+            return null;
+        }
+        String jaasOriginText = extractJaasText();
+        if (StringUtils.isEmpty(jaasOriginText)) {
+            return null;
+        }
+        String jaasRewriteText = rewriteJaasConf(jaasOriginText);
+        String keyTabPath = getKeyTabPathFromJaas(jaasRewriteText);
+        if (StringUtils.isEmpty(keyTabPath)) {
+            return null;
+        }
+        return FileUtils.getFile(keyTabPath).getAbsolutePath();
+    }
+
+    @SneakyThrows
+    public static void createExecutorJaas() {
+        // extract origin kafka jaas file, rewrite keytab path if exists
+        // write it into {KYLIN_HOME}/hadoop_conf
+        val kapConfig = KapConfig.getInstanceFromEnv();
+        if (!kapConfig.isKafkaJaasEnabled()) {
+            return;
+        }
+        String jaasConf = extractKafkaJaasConf(false);
+        if (StringUtils.isEmpty(jaasConf)) {
+            return;
+        }
+        String jaasResultText = "KafkaClient { " + jaasConf + " };";
+        String jaasPath = getExecutorJaasPath();
+        File executorJaasConfFile = FileUtils.getFile(jaasPath);
+        FileUtils.write(executorJaasConfFile, jaasResultText, 
StandardCharsets.UTF_8, false);
+        log.info("extract kafka jaas file to {}", jaasPath);
+    }
+
+    public static String getExecutorJaasPath() {
+        // {KYLIN_HOME}/hadoop_conf/kafka_jaas.conf
+        return HadoopUtil.getHadoopConfDir() + File.separator + 
getExecutorJaasName();
+    }
+
+    public static String getExecutorJaasName() {
+        return KapConfig.getInstanceFromEnv().getKafkaJaasConf();
+    }
+
+    /**
+     * read kafka jaas conf
+     */
+    private static String extractJaasText() {
+        val kapConfig = KapConfig.getInstanceFromEnv();
+        File jaasFile = new File(kapConfig.getKafkaJaasConfPath());
+        String jaasOriginText;
         try {
-            val text = FileUtils.readFileToString(file);
-            int kafkaClientIdx = text.indexOf("KafkaClient");
-            if (StringUtils.isNotEmpty(text) && kafkaClientIdx != -1) {
-                return text.substring(text.indexOf("{") + 1, 
text.indexOf("}")).trim();
-            }
-        } catch (Exception e) {
-            logger.error("read kafka jaas file error ", e);
+            jaasOriginText = FileUtils.readFileToString(jaasFile, 
StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new KylinException(READ_KAFKA_JAAS_FILE_ERROR, e);
+        }
+        if (StringUtils.indexOf(jaasOriginText, "KafkaClient") == 
INDEX_NOT_FOUND) {
+            throw new KylinException(KAFKA_JAAS_FILE_KAFKACLIENT_NOT_EXISTS);
+        }
+        return jaasOriginText;
+    }
+
+    /**
+     * input:  KafkaClient { *****; };
+     * output: *****;
+     */
+    private static String rewriteJaasConf(String jaasText) {
+        int start = StringUtils.indexOf(jaasText, '{') + 1;
+        int end = StringUtils.indexOf(jaasText, '}');
+        return StringUtils.substring(jaasText, start, end).trim();
+    }
+
+    private static String rewriteKeyTab(String jaasText, boolean 
useAbsKeyTabPath) {
+        String keyTabPath = getKeyTabPathFromJaas(jaasText);
+        if (StringUtils.isEmpty(keyTabPath)) {
+            return jaasText;
+        }
+        File keyTabFile = FileUtils.getFile(keyTabPath);
+        String replacement = keyTabFile.getName();
+        if (useAbsKeyTabPath) {
+            replacement = keyTabFile.getAbsolutePath();
+        }
+        log.info("kafka jaas replace {} -> {}", keyTabPath, replacement);
+        return StringUtils.replace(jaasText, keyTabPath, replacement);
+    }
+
+    public static String getKeyTabPathFromJaas(String jaasStr) {
+        Map<String, Password> map = Maps.newHashMap();
+        map.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasStr));
+        val configEntry = 
JaasContext.loadClientContext(map).configurationEntries().get(0);
+        String keyTabPath = (String) 
configEntry.getOptions().getOrDefault("keyTab", null);
+        if (StringUtils.isEmpty(keyTabPath)) {
+            return null;
+        }
+        if (!FileUtils.getFile(keyTabPath).exists()) {
+            throw new KylinException(KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS);
         }
-        throw new KylinException(READ_KAFKA_JAAS_FILE_ERROR, 
MsgPicker.getMsg().getReadKafkaJaasFileError());
+        return keyTabPath;
     }
 
 }
diff --git 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
index 499e7c92c3..d748f0b4fb 100644
--- 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
+++ 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
@@ -34,6 +34,8 @@ import static 
org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXEC
 import static 
org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXECUTOR_MEM;
 import static 
org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXECUTOR_MEM_DEFAULT;
 import static 
org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXECUTOR_OPTS;
+import static 
org.apache.kylin.streaming.constants.StreamingConstants.SPARK_KERBEROS_KEYTAB;
+import static 
org.apache.kylin.streaming.constants.StreamingConstants.SPARK_KERBEROS_PRINCIPAL;
 import static 
org.apache.kylin.streaming.constants.StreamingConstants.SPARK_MASTER;
 import static 
org.apache.kylin.streaming.constants.StreamingConstants.SPARK_MASTER_DEFAULT;
 import static 
org.apache.kylin.streaming.constants.StreamingConstants.SPARK_SHUFFLE_PARTITIONS;
@@ -64,7 +66,7 @@ import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
@@ -108,6 +110,7 @@ import lombok.extern.slf4j.Slf4j;
 public class StreamingJobLauncher extends AbstractSparkJobLauncher {
     private static final String KRB5CONF_PROPS = "java.security.krb5.conf";
     private static final String JAASCONF_PROPS = 
"java.security.auth.login.config";
+    private static final String HADOOP_CONF_PATH = 
"./__spark_conf__/__hadoop_conf__/";
     private Map<String, String> jobParams;
     private String mainClazz;
     private String[] appArgs;
@@ -138,6 +141,8 @@ public class StreamingJobLauncher extends 
AbstractSparkJobLauncher {
                     jobParams.getOrDefault(STREAMING_DURATION, 
STREAMING_DURATION_DEFAULT),
                     jobParams.getOrDefault(STREAMING_WATERMARK, 
STREAMING_WATERMARK_DEFAULT),
                     distMetaStorageUrl.toString() };
+            // build job extract and create kafka jaas file
+            StreamingJobUtils.createExecutorJaas();
             break;
         }
         case STREAMING_MERGE: {
@@ -267,32 +272,6 @@ public class StreamingJobLauncher extends 
AbstractSparkJobLauncher {
         }
     }
 
-    private String wrapDriverJavaOptions(Map<String, String> sparkConf) {
-        val driverJavaOptsConfigStr = 
sparkConf.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
-
-        Preconditions.checkNotNull(driverJavaOptsConfigStr, 
SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS + " is empty");
-        StringBuilder driverJavaOptionsSB = new 
StringBuilder(driverJavaOptsConfigStr);
-        val kapConfig = KapConfig.getInstanceFromEnv();
-        if (kapConfig.isKerberosEnabled() && 
!driverJavaOptsConfigStr.contains(KRB5CONF_PROPS)) {
-            val krb5conf = " -Djava.security.krb5.conf=" + 
kapConfig.getKerberosKrb5ConfPath();
-            driverJavaOptionsSB.append(krb5conf);
-        }
-        if (kapConfig.isKafkaJaasEnabled() && 
!driverJavaOptsConfigStr.contains(JAASCONF_PROPS)) {
-            val jaasConf = " -Djava.security.auth.login.config=" + 
kapConfig.getKafkaJaasConfPath();
-            driverJavaOptionsSB.append(jaasConf);
-        }
-        driverJavaOptionsSB.append(javaPropertyFormatter(REST_SERVER_IP, 
AddressUtil.getLocalHostExactAddress()));
-        
driverJavaOptionsSB.append(javaPropertyFormatter("kylin.hdfs.working.dir", 
config.getHdfsWorkingDirectory()));
-        driverJavaOptionsSB
-                
.append(javaPropertyFormatter("spark.driver.log4j.appender.hdfs.File", 
getDriverHDFSLogPath()));
-        driverJavaOptionsSB.append(javaPropertyFormatter("user.timezone", 
config.getTimeZone()));
-
-        final String driverLog4jXmlFile = 
config.getLogSparkStreamingDriverPropertiesFile();
-        generateLog4jConfiguration(false, driverJavaOptionsSB, 
driverLog4jXmlFile);
-
-        return driverJavaOptionsSB.toString();
-    }
-
     private void generateLog4jConfiguration(boolean isExecutor, StringBuilder 
log4jJavaOptionsSB, String log4jXmlFile) {
         String log4jConfigStr = "file:" + log4jXmlFile;
 
@@ -304,47 +283,74 @@ public class StreamingJobLauncher extends 
AbstractSparkJobLauncher {
         
log4jJavaOptionsSB.append(javaPropertyFormatter("log4j.configurationFile", 
log4jConfigStr));
     }
 
-    private String wrapExecutorJavaOptions(Map<String, String> sparkConf) {
-        val executorJavaOptsConfigStr = 
sparkConf.get(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS);
+    private String wrapDriverJavaOptions(Map<String, String> sparkConf) {
+        val existOptStr = 
sparkConf.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
+        Preconditions.checkNotNull(existOptStr, 
SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS + " is empty");
+
+        StringBuilder driverJavaOptSB = new StringBuilder(existOptStr);
+        val kapConfig = KapConfig.getInstanceFromEnv();
+        rewriteKrb5Conf(driverJavaOptSB, existOptStr, 
kapConfig.getKerberosKrb5ConfPath());
+        // client driver kafka_jaas use local file
+        // cluster driver use remote kafka_jaas file
+        rewriteKafkaJaasConf(driverJavaOptSB, existOptStr, 
kapConfig.getKafkaJaasConfPath());
+        driverJavaOptSB.append(javaPropertyFormatter(REST_SERVER_IP, 
AddressUtil.getLocalHostExactAddress()));
+        driverJavaOptSB.append(javaPropertyFormatter("kylin.hdfs.working.dir", 
config.getHdfsWorkingDirectory()));
+        
driverJavaOptSB.append(javaPropertyFormatter("spark.driver.log4j.appender.hdfs.File",
 getDriverHDFSLogPath()));
+        driverJavaOptSB.append(javaPropertyFormatter("user.timezone", 
config.getTimeZone()));
 
-        Preconditions.checkNotNull(executorJavaOptsConfigStr, 
SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS + " is empty");
+        final String driverLog4jXmlFile = 
config.getLogSparkStreamingDriverPropertiesFile();
+        generateLog4jConfiguration(false, driverJavaOptSB, driverLog4jXmlFile);
+        return driverJavaOptSB.toString();
+    }
+
+    private String wrapExecutorJavaOptions(Map<String, String> sparkConf) {
+        val existOptionsStr = 
sparkConf.get(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS);
+        Preconditions.checkNotNull(existOptionsStr, 
SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS + " is empty");
 
-        StringBuilder executorJavaOptionsSB = new 
StringBuilder(executorJavaOptsConfigStr);
         val kapConfig = KapConfig.getInstanceFromEnv();
-        if (kapConfig.isKerberosEnabled() && 
!executorJavaOptsConfigStr.contains(KRB5CONF_PROPS)) {
-            val krb5Conf = " 
-Djava.security.krb5.conf=./__spark_conf__/__hadoop_conf__/"
-                    + kapConfig.getKerberosKrb5Conf();
-            executorJavaOptionsSB.append(krb5Conf);
-        }
-        if (kapConfig.isKafkaJaasEnabled() && 
!executorJavaOptsConfigStr.contains(JAASCONF_PROPS)) {
-            val jaasConf = " -Djava.security.auth.login.config=./" + 
kapConfig.getKafkaJaasConf();
-            executorJavaOptionsSB.append(jaasConf);
-        }
-        
executorJavaOptionsSB.append(javaPropertyFormatter("kap.spark.identifier", 
jobId));
-        
executorJavaOptionsSB.append(javaPropertyFormatter("kap.spark.jobTimeStamp", 
currentTimestamp.toString()));
-        
executorJavaOptionsSB.append(javaPropertyFormatter("kap.spark.project", 
project));
-        executorJavaOptionsSB.append(javaPropertyFormatter("user.timezone", 
config.getTimeZone()));
+        StringBuilder executorJavaOptSB = new StringBuilder(existOptionsStr);
+        rewriteKrb5Conf(executorJavaOptSB, existOptionsStr, HADOOP_CONF_PATH + 
kapConfig.getKerberosKrb5Conf());
+        // executor always use remote kafka_jaas file
+        rewriteKafkaJaasConf(executorJavaOptSB, existOptionsStr,
+                HADOOP_CONF_PATH + StreamingJobUtils.getExecutorJaasName());
+        executorJavaOptSB.append(javaPropertyFormatter("kap.spark.identifier", 
jobId));
+        
executorJavaOptSB.append(javaPropertyFormatter("kap.spark.jobTimeStamp", 
currentTimestamp.toString()));
+        executorJavaOptSB.append(javaPropertyFormatter("kap.spark.project", 
project));
+        executorJavaOptSB.append(javaPropertyFormatter("user.timezone", 
config.getTimeZone()));
         if (StringUtils.isNotBlank(config.getMountSparkLogDir())) {
-            executorJavaOptionsSB.append(javaPropertyFormatter("job.mountDir", 
config.getMountSparkLogDir()));
+            executorJavaOptSB.append(javaPropertyFormatter("job.mountDir", 
config.getMountSparkLogDir()));
         }
 
         final String executorLog4jXmlFile = 
config.getLogSparkStreamingExecutorPropertiesFile();
-        generateLog4jConfiguration(true, executorJavaOptionsSB, 
executorLog4jXmlFile);
-
-        return executorJavaOptionsSB.toString();
+        generateLog4jConfiguration(true, executorJavaOptSB, 
executorLog4jXmlFile);
+        return executorJavaOptSB.toString();
     }
 
     private String wrapYarnAmJavaOptions(Map<String, String> sparkConf) {
-        val yarnAmJavaOptsConfigStr = 
sparkConf.getOrDefault(SPARK_YARN_AM_OPTS, "");
+        val existOptStr = sparkConf.getOrDefault(SPARK_YARN_AM_OPTS, "");
+        val kapConfig = KapConfig.getInstanceFromEnv();
+        StringBuilder yarnAmJavaOptSB = new StringBuilder(existOptStr);
+        rewriteKrb5Conf(yarnAmJavaOptSB, existOptStr, HADOOP_CONF_PATH + 
kapConfig.getKerberosKrb5Conf());
+        return yarnAmJavaOptSB.toString();
+    }
+
+    private void rewriteKafkaJaasConf(StringBuilder sb, String existOptStr, 
String value) {
+        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
+        if (!kapConfig.isKafkaJaasEnabled() || 
!jobType.equals(JobTypeEnum.STREAMING_BUILD)
+                || existOptStr.contains(JAASCONF_PROPS)) {
+            return;
+        }
+        String jaasConf = javaPropertyFormatter(JAASCONF_PROPS, value);
+        sb.append(jaasConf);
+    }
 
-        StringBuilder yarnAmJavaOptionsSB = new 
StringBuilder(yarnAmJavaOptsConfigStr);
+    private void rewriteKrb5Conf(StringBuilder sb, String existConfStr, String 
value) {
         val kapConfig = KapConfig.getInstanceFromEnv();
-        if (kapConfig.isKerberosEnabled() && 
!yarnAmJavaOptsConfigStr.contains(KRB5CONF_PROPS)) {
-            val krb5Conf = " 
-Djava.security.krb5.conf=./__spark_conf__/__hadoop_conf__/"
-                    + kapConfig.getKerberosKrb5Conf();
-            yarnAmJavaOptionsSB.append(krb5Conf);
+        if (!kapConfig.isKerberosEnabled() || 
existConfStr.contains(KRB5CONF_PROPS)) {
+            return;
         }
-        return yarnAmJavaOptionsSB.toString();
+        val krb5Conf = javaPropertyFormatter(KRB5CONF_PROPS, value);
+        sb.append(krb5Conf);
     }
 
     private void addParserJar(SparkLauncher sparkLauncher) {
@@ -361,18 +367,22 @@ public class StreamingJobLauncher extends 
AbstractSparkJobLauncher {
         Map<String, String> sparkConf = getStreamingSparkConfig(config);
         sparkConf.forEach((key, value) -> launcher.setConf(key, value));
 
-        val numberOfExecutor = 
sparkConf.getOrDefault(SPARK_EXECUTOR_INSTANCES, 
SPARK_EXECUTOR_INSTANCES_DEFAULT);
-        val numberOfCore = sparkConf.getOrDefault(SPARK_EXECUTOR_CORES, 
SPARK_EXECUTOR_CORES_DEFAULT);
         val sparkLauncher = 
launcher.setAppName(jobId).setSparkHome(KylinConfig.getSparkHome());
         val kapConfig = KapConfig.getInstanceFromEnv();
         if (kapConfig.isKerberosEnabled()) {
-            sparkLauncher.setConf("spark.kerberos.keytab", 
kapConfig.getKerberosKeytabPath());
-            sparkLauncher.setConf("spark.kerberos.principal", 
kapConfig.getKerberosPrincipal());
+            sparkLauncher.setConf(SPARK_KERBEROS_KEYTAB, 
kapConfig.getKerberosKeytabPath());
+            sparkLauncher.setConf(SPARK_KERBEROS_PRINCIPAL, 
kapConfig.getKerberosPrincipal());
         }
-        if (kapConfig.isKafkaJaasEnabled()) {
-            sparkLauncher.addFile(kapConfig.getKafkaJaasConfPath());
+        if (kapConfig.isKafkaJaasEnabled() && 
jobType.equals(JobTypeEnum.STREAMING_BUILD)) {
+            String keyTabAbsPath = StreamingJobUtils.getJaasKeyTabAbsPath();
+            if (StringUtils.isNotEmpty(keyTabAbsPath)) {
+                // upload keytab in kafka jaas 
+                sparkLauncher.addFile(keyTabAbsPath);
+            }
         }
         addParserJar(sparkLauncher);
+        val numberOfExecutor = 
sparkConf.getOrDefault(SPARK_EXECUTOR_INSTANCES, 
SPARK_EXECUTOR_INSTANCES_DEFAULT);
+        val numberOfCore = sparkConf.getOrDefault(SPARK_EXECUTOR_CORES, 
SPARK_EXECUTOR_CORES_DEFAULT);
         sparkLauncher.setMaster(sparkConf.getOrDefault(SPARK_MASTER, 
SPARK_MASTER_DEFAULT))
                 .setConf(SPARK_DRIVER_MEM, 
sparkConf.getOrDefault(SPARK_DRIVER_MEM, SPARK_DRIVER_MEM_DEFAULT))
                 .setConf(SPARK_EXECUTOR_INSTANCES, 
numberOfExecutor).setConf(SPARK_EXECUTOR_CORES, numberOfCore)
diff --git 
a/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala
 
b/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala
index 80653ed87c..13fb54a288 100644
--- 
a/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala
+++ 
b/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala
@@ -71,9 +71,6 @@ class CreateStreamingFlatTable(entry: CreateFlatTableEntry) 
extends
       kafkaJobParams.remove(SASL_MECHANISM);
       kafkaJobParams.put("kafka." + SASL_MECHANISM, saslMechanism.get)
     }
-    val text = StreamingJobUtils.extractKafkaSaslJaasConf
-    if (StringUtils.isNotEmpty(text)) 
kafkaJobParams.put(SaslConfigs.SASL_JAAS_CONFIG, text)
-
     kafkaJobParams.foreach { param =>
       param._1 match {
         case MAX_OFFSETS_PER_TRIGGER => if (param._2.toInt > 0) {
diff --git 
a/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java 
b/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java
index 13514fca7e..8f86a53c31 100644
--- 
a/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java
+++ 
b/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.kafka.util;
 import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
 
 import java.io.File;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Properties;
 
@@ -109,8 +110,9 @@ public class KafkaUtilsTest extends StreamingTestCase {
         val kapConfig = KapConfig.getInstanceFromEnv();
 
         FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
-                "KafkaClient{ 
org.apache.kafka.common.security.scram.ScramLoginModule required}");
-        val text = StreamingJobUtils.extractKafkaSaslJaasConf();
+                "KafkaClient{ 
org.apache.kafka.common.security.scram.ScramLoginModule required;};",
+                StandardCharsets.UTF_8);
+        val text = StreamingJobUtils.extractKafkaJaasConf(true);
         Assert.assertNull(text);
         Pair<Boolean, String> kafkaJaasTextPair = (Pair<Boolean, String>) 
ReflectionUtils.getField(KafkaUtils.class,
                 "kafkaJaasTextPair");
diff --git 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java
 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java
index bb419f7510..5d0ce9de37 100644
--- 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java
+++ 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java
@@ -27,10 +27,10 @@ import java.util.Optional;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.execution.JobTypeEnum;
-import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.metadata.cube.utils.StreamingUtils;
 import org.apache.kylin.streaming.event.StreamingJobDropEvent;
 import org.apache.kylin.streaming.event.StreamingJobKillEvent;
@@ -52,15 +52,15 @@ import lombok.var;
 
 public class StreamingJobListenerTest extends StreamingTestCase {
 
-    private static String PROJECT = "streaming_test";
-    private static String MODEL_ID = "e78a89dd-847f-4574-8afa-8768b4228b72";
+    private static final String PROJECT = "streaming_test";
+    private static final String MODEL_ID = 
"e78a89dd-847f-4574-8afa-8768b4228b72";
     @Rule
     public ExpectedException thrown = ExpectedException.none();
     @Rule
     public TemporaryFolder temporaryFolder = new TemporaryFolder();
     @Rule
     public TestName testName = new TestName();
-    private StreamingJobListener eventListener = new StreamingJobListener();
+    private final StreamingJobListener eventListener = new 
StreamingJobListener();
 
     @Before
     public void setUp() throws Exception {
@@ -81,9 +81,7 @@ public class StreamingJobListenerTest extends 
StreamingTestCase {
         val listener = new StreamingJobListener(PROJECT, jobId);
         val testConfig = getTestConfig();
         var mgr = StreamingJobManager.getInstance(testConfig, PROJECT);
-        mgr.updateStreamingJob(jobId, copyForWrite -> {
-            copyForWrite.setSkipListener(true);
-        });
+        mgr.updateStreamingJob(jobId, copyForWrite -> 
copyForWrite.setSkipListener(true));
         listener.stateChanged(mockRunningState());
         var jobMeta = mgr.getStreamingJobByUuid(jobId);
         Assert.assertEquals(JobStatusEnum.RUNNING, jobMeta.getCurrentStatus());
@@ -95,9 +93,7 @@ public class StreamingJobListenerTest extends 
StreamingTestCase {
         val jobId = StreamingUtils.getJobId(MODEL_ID, 
JobTypeEnum.STREAMING_BUILD.toString());
         val testConfig = getTestConfig();
         var mgr = StreamingJobManager.getInstance(testConfig, PROJECT);
-        mgr.updateStreamingJob(jobId, copyForWrite -> {
-            copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING);
-        });
+        mgr.updateStreamingJob(jobId, copyForWrite -> 
copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
         val listener = new StreamingJobListener(PROJECT, jobId);
         listener.stateChanged(mockFailedState());
         var jobMeta = mgr.getStreamingJobByUuid(jobId);
@@ -128,9 +124,7 @@ public class StreamingJobListenerTest extends 
StreamingTestCase {
         val jobId = StreamingUtils.getJobId(MODEL_ID, 
JobTypeEnum.STREAMING_BUILD.toString());
         val testConfig = getTestConfig();
         var mgr = StreamingJobManager.getInstance(testConfig, PROJECT);
-        mgr.updateStreamingJob(jobId, copyForWrite -> {
-            copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING);
-        });
+        mgr.updateStreamingJob(jobId, copyForWrite -> 
copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
         val listener = new StreamingJobListener(PROJECT, jobId);
         listener.stateChanged(mockKilledState());
         var jobMeta = mgr.getStreamingJobByUuid(jobId);
@@ -172,12 +166,8 @@ public class StreamingJobListenerTest extends 
StreamingTestCase {
         var mgr = StreamingJobManager.getInstance(config, project);
         val buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
         val mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
-        mgr.updateStreamingJob(buildJobId, copyForWrite -> {
-            copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING);
-        });
-        mgr.updateStreamingJob(mergeJobId, copyForWrite -> {
-            copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING);
-        });
+        mgr.updateStreamingJob(buildJobId, copyForWrite -> 
copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
+        mgr.updateStreamingJob(mergeJobId, copyForWrite -> 
copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
         var buildJobMeta = mgr.getStreamingJobByUuid(buildJobId);
         var mergeJobMeta = mgr.getStreamingJobByUuid(mergeJobId);
         Assert.assertEquals(JobStatusEnum.RUNNING, 
buildJobMeta.getCurrentStatus());
@@ -308,7 +298,7 @@ public class StreamingJobListenerTest extends 
StreamingTestCase {
 
         @Override
         public Optional<Throwable> getError() {
-            return null;
+            return Optional.empty();
         }
-    };
+    }
 }
diff --git 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java
 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java
index 6c27a82cc4..8a92cdc62b 100644
--- 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java
+++ 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java
@@ -17,27 +17,29 @@
  */
 package org.apache.kylin.streaming.jobs;
 
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS;
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.READ_KAFKA_JAAS_FILE_ERROR;
+
 import java.io.File;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.KapConfig;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.streaming.manager.StreamingJobManager;
 import org.apache.kylin.streaming.util.StreamingTestCase;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import lombok.val;
 
 public class StreamingJobUtilsTest extends StreamingTestCase {
     private static final String PROJECT = "streaming_test";
     private static final String DATAFLOW_ID = 
"e78a89dd-847f-4574-8afa-8768b4228b73";
-    @Rule
-    public ExpectedException thrown = ExpectedException.none();
 
     @Before
     public void setUp() throws Exception {
@@ -88,24 +90,100 @@ public class StreamingJobUtilsTest extends 
StreamingTestCase {
     @Test
     public void testExtractKafkaSaslJaasConf() throws Exception {
         val kapConfig = KapConfig.getInstanceFromEnv();
-        Assert.assertNull(StreamingJobUtils.extractKafkaSaslJaasConf());
+        Assert.assertNull(StreamingJobUtils.extractKafkaJaasConf(true));
         getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true");
         FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
-                "KafkaClient{ 
org.apache.kafka.common.security.scram.ScramLoginModule required}");
-        val text = StreamingJobUtils.extractKafkaSaslJaasConf();
+                "KafkaClient{ 
org.apache.kafka.common.security.scram.ScramLoginModule required;}",
+                StandardCharsets.UTF_8);
+        val text = StreamingJobUtils.extractKafkaJaasConf(true);
         Assert.assertNotNull(text);
 
         getTestConfig().setProperty("kylin.kafka-jaas-conf", 
"kafka_err_jaas.conf");
         File file = new File(kapConfig.getKafkaJaasConfPath());
 
-        FileUtils.write(file, "}4{");
+        FileUtils.write(file, "}4{", StandardCharsets.UTF_8);
         try {
-            StreamingJobUtils.extractKafkaSaslJaasConf();
+            StreamingJobUtils.extractKafkaJaasConf(true);
         } catch (Exception e) {
             Assert.assertTrue(e instanceof KylinException);
-            Assert.assertEquals("KE-010035015", ((KylinException) 
e).getErrorCode().getCodeString());
+            Assert.assertEquals("KE-010035217", ((KylinException) 
e).getErrorCode().getCodeString());
         } finally {
             FileUtils.deleteQuietly(new 
File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
         }
     }
+
+    @Test
+    public void testCheckKeyTabFileUnderJaas() throws Exception {
+        val kapConfig = KapConfig.getInstanceFromEnv();
+        Assert.assertNull(StreamingJobUtils.extractKafkaJaasConf(true));
+        Assert.assertNull(StreamingJobUtils.getJaasKeyTabAbsPath());
+        KylinConfig kylinConfig = getTestConfig();
+        kylinConfig.setProperty("kylin.kafka-jaas.enabled", "true");
+        File testKeyTab = new File(KylinConfig.getKylinConfDir() + 
File.separator + "test.keytab");
+
+        // jaas not exist
+        Assert.assertThrows(READ_KAFKA_JAAS_FILE_ERROR.getMsg(), 
KylinException.class,
+                () -> StreamingJobUtils.extractKafkaJaasConf(true));
+
+        // jaas keytab key not exist
+        FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
+                "KafkaClient { " + 
"com.sun.security.auth.module.Krb5LoginModule required " + "useKeyTab=true "
+                        + "storeKey=true " + "principal=\"ky...@dev.com\" " + 
"serviceName=\"kafka\";" + " };",
+                StandardCharsets.UTF_8);
+        Assert.assertNull(StreamingJobUtils.getJaasKeyTabAbsPath());
+
+        // jaas exist but keytab not exist
+        FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
+                "KafkaClient { " + 
"com.sun.security.auth.module.Krb5LoginModule required " + "useKeyTab=true "
+                        + "storeKey=true " + "keyTab=\"" + testKeyTab + "\" " 
+ "principal=\"ky...@dev.com\" "
+                        + "serviceName=\"kafka\";" + " };",
+                StandardCharsets.UTF_8);
+        Assert.assertThrows(KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS.getMsg(), 
KylinException.class,
+                () -> StreamingJobUtils.extractKafkaJaasConf(true));
+
+        // all exist
+        FileUtils.write(testKeyTab, "test", StandardCharsets.UTF_8);
+        val text = StreamingJobUtils.extractKafkaJaasConf(true);
+        Assert.assertNotNull(text);
+        String keyTabAbsPath = StreamingJobUtils.getJaasKeyTabAbsPath();
+        Assert.assertEquals(testKeyTab.getAbsolutePath(), keyTabAbsPath);
+        String executorJaasName = StreamingJobUtils.getExecutorJaasName();
+        Assert.assertEquals(kapConfig.getKafkaJaasConf(), executorJaasName);
+        String executorJaasPath = StreamingJobUtils.getExecutorJaasPath();
+        Assert.assertEquals(HadoopUtil.getHadoopConfDir() + File.separator + 
executorJaasName, executorJaasPath);
+        kylinConfig.setProperty("kylin.kafka-jaas-conf", 
"kafka_err_jaas.conf");
+    }
+
+    @Test
+    public void testCreateExecutorJaas() throws Exception {
+        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
+        String executorJaasPath = HadoopUtil.getHadoopConfDir() + 
File.separator + kapConfig.getKafkaJaasConf();
+        File executorJaasFile = new File(executorJaasPath);
+        executorJaasFile.deleteOnExit();
+        StreamingJobUtils.createExecutorJaas();
+        Assert.assertFalse(executorJaasFile.exists());
+        getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true");
+
+        {
+            String jaasContext = "KafkaClient { 
org.apache.kafka.common.security.scram.ScramLoginModule required; };";
+            FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), 
jaasContext, StandardCharsets.UTF_8);
+            StreamingJobUtils.createExecutorJaas();
+            Assert.assertTrue(executorJaasFile.exists());
+            Assert.assertEquals(jaasContext, 
FileUtils.readFileToString(executorJaasFile, StandardCharsets.UTF_8));
+        }
+
+        {
+            File testKeyTab = new File(KylinConfig.getKylinConfDir() + 
File.separator + "test.keytab");
+            FileUtils.write(testKeyTab, "test", StandardCharsets.UTF_8);
+            String jaasContext = "KafkaClient { " + 
"com.sun.security.auth.module.Krb5LoginModule required "
+                    + "useKeyTab=true " + "storeKey=true " + "keyTab=\"" + 
testKeyTab.getAbsolutePath() + "\" "
+                    + "principal=\"ky...@dev.com\" " + 
"serviceName=\"kafka\";" + " };";
+            FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), 
jaasContext, StandardCharsets.UTF_8);
+            StreamingJobUtils.createExecutorJaas();
+            Assert.assertTrue(executorJaasFile.exists());
+            
Assert.assertEquals(jaasContext.replace(testKeyTab.getAbsolutePath(), 
testKeyTab.getName()),
+                    FileUtils.readFileToString(executorJaasFile, 
StandardCharsets.UTF_8));
+        }
+        executorJaasFile.deleteOnExit();
+    }
 }
diff --git 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
index 43251502dd..11640895c5 100644
--- 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
+++ 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.kylin.streaming.constants.StreamingConstants.DEFAULT_PA
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -186,6 +187,9 @@ public class StreamingJobLauncherTest extends 
NLocalFileMetadataTestCase {
                 "-Djava.security.krb5.conf=./krb5.conf 
-Djava.security.auth.login.config=./kafka_jaas.conf");
         val mockup = new MockupSparkLauncher();
         ReflectionUtils.setField(launcher, "launcher", mockup);
+        FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
+                "KafkaClient{ 
org.apache.kafka.common.security.scram.ScramLoginModule required;}",
+                StandardCharsets.UTF_8);
         launcher.startYarnJob();
         
Assert.assertNotNull(mockup.sparkConf.get("spark.driver.extraJavaOptions"));
         
Assert.assertNotNull(mockup.sparkConf.get("spark.executor.extraJavaOptions"));
@@ -205,14 +209,15 @@ public class StreamingJobLauncherTest extends 
NLocalFileMetadataTestCase {
             val kapConfig = KapConfig.getInstanceFromEnv();
             config.setProperty("kylin.kafka-jaas.enabled", "true");
             FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
-                    "KafkaClient{ 
org.apache.kafka.common.security.scram.ScramLoginModule required}");
+                    "KafkaClient{ 
org.apache.kafka.common.security.scram.ScramLoginModule required}",
+                    StandardCharsets.UTF_8);
 
             val mockup = new MockupSparkLauncher();
             ReflectionUtils.setField(launcher, "launcher", mockup);
             launcher.startYarnJob();
             
Assert.assertNotNull(mockup.sparkConf.get("spark.kerberos.keytab"));
             
Assert.assertNotNull(mockup.sparkConf.get("spark.kerberos.principal"));
-            
Assert.assertTrue(mockup.files.contains(kapConfig.getKafkaJaasConfPath()));
+            
Assert.assertFalse(mockup.files.contains(kapConfig.getKafkaJaasConfPath()));
         } finally {
             FileUtils.deleteQuietly(new 
File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
         }
@@ -495,6 +500,30 @@ public class StreamingJobLauncherTest extends 
NLocalFileMetadataTestCase {
         Assert.assertTrue(mockup.jars.contains("default"));
     }
 
+    @Test
+    public void testStartYarnBuildJobWithoutExtraOpts() throws Exception {
+        val config = getTestConfig();
+        val modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
+
+        val launcher = new StreamingJobLauncher();
+        launcher.init(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
+        config.setProperty("kylin.kerberos.enabled", "true");
+        config.setProperty("kylin.tool.mount-spark-log-dir", ".");
+        val kapConfig = KapConfig.getInstanceFromEnv();
+
+        config.setProperty("kylin.kerberos.enabled", "true");
+        config.setProperty("kylin.kafka-jaas.enabled", "true");
+        val mockup = new MockupSparkLauncher();
+        ReflectionUtils.setField(launcher, "launcher", mockup);
+        FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
+                "KafkaClient{ 
org.apache.kafka.common.security.scram.ScramLoginModule required;}",
+                StandardCharsets.UTF_8);
+        launcher.startYarnJob();
+        
Assert.assertNotNull(mockup.sparkConf.get("spark.driver.extraJavaOptions"));
+        
Assert.assertNotNull(mockup.sparkConf.get("spark.executor.extraJavaOptions"));
+        
Assert.assertNotNull(mockup.sparkConf.get("spark.yarn.am.extraJavaOptions"));
+    }
+
     static class MockupSparkLauncher extends SparkLauncher {
         private Map<String, String> sparkConf;
         private List<String> files;

Reply via email to