This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c0b27873f7ff6cfcaa4d404214c41288bd81a4f4 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Thu Nov 19 10:24:18 2020 +0800 KYLIN-4813 Add spark executor log4j KYLIN-4813 Minor fix --- build/conf/spark-executor-log4j.properties | 46 ++++ .../org/apache/kylin/common/KylinConfigBase.java | 6 + .../src/main/resources/kylin-defaults.properties | 2 + .../kylin/job/execution/AbstractExecutable.java | 2 +- .../kylin/job/execution/ExecutableManager.java | 1 - .../common/logging/SparkExecutorHdfsAppender.java | 243 +++++++++++++++++++++ 6 files changed, 298 insertions(+), 2 deletions(-) diff --git a/build/conf/spark-executor-log4j.properties b/build/conf/spark-executor-log4j.properties new file mode 100644 index 0000000..7cc5b04 --- /dev/null +++ b/build/conf/spark-executor-log4j.properties @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# It's called spark-executor-log4j.properties so that it won't distract users from the other more important log4j config file: kylin-server-log4j.properties +# enable this by -Dlog4j.configuration=spark-executor-log4j.properties +log4j.rootLogger=INFO,stderr,hdfs + +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.layout=org.apache.kylin.common.logging.SensitivePatternLayout +log4j.appender.stderr.target=System.err +#Don't add line number (%L) as it's too costly! +log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n + + +log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsLogAppender + +log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir} +log4j.appender.hdfs.metadataIdentifier=${kylin.metadata.identifier} +log4j.appender.hdfs.category=${kylin.spark.category} +log4j.appender.hdfs.identifier=${kylin.spark.identifier} +log4j.appender.hdfs.jobName=${kylin.spark.jobName} +log4j.appender.hdfs.project=${kylin.spark.project} + +log4j.appender.hdfs.rollingPeriod=5 +log4j.appender.hdfs.logQueueCapacity=5000 +#flushPeriod count as millis +log4j.appender.hdfs.flushInterval=5000 + +log4j.appender.hdfs.layout=org.apache.kylin.engine.spark.common.logging.SensitivePatternLayout +#Don't add line number (%L) as it's too costly! +log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n \ No newline at end of file diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 081d23f..e2727fa 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -260,6 +260,7 @@ public abstract class KylinConfigBase implements Serializable { this.properties = BCC.check(properties); setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix()); setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile()); + setProperty("kylin.log.spark-executor-properties-file", getLogSparkExecutorPropertiesFile()); } private Map<Integer, String> convertKeyToInteger(Map<String, String> map) { @@ -2829,6 +2830,11 @@ public abstract class KylinConfigBase implements Serializable { return getLogPropertyFile("spark-driver-log4j.properties"); } + public String getLogSparkExecutorPropertiesFile() { + return getLogPropertyFile("spark-executor-log4j.properties"); + } + + private String getLogPropertyFile(String filename) { if (isDevEnv()) { return Paths.get(getKylinHomeWithoutWarn(), "build", "conf").toString() + File.separator + filename; diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 3d8268f..8fba3bc 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -259,6 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false +kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone} #kylin.engine.spark-conf.spark.sql.shuffle.partitions=1 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime @@ -286,6 +287,7 @@ kylin.query.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializ #kylin.query.spark-conf.spark.sql.shuffle.partitions=40 #kylin.query.spark-conf.spark.yarn.jars=hdfs://localhost:9000/spark2_jars/* +kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project} -XX:MaxDirectMemorySize=896M # uncomment for HDP #kylin.query.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current #kylin.query.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index d815e5b..a2fe7e5 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -596,7 +596,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { public final String getProject() { if (project == null) { - logger.error("project is not set for abstract executable " + getId()); + throw new IllegalStateException("project is not set for abstract executable " + getId()); } return project; } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index c9ae8a4..590276d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -382,7 +382,6 @@ public class ExecutableManager { /** * get the last N_LINES lines from the end of hdfs file input stream; - * reference: https://olapio.atlassian.net/wiki/spaces/PD/pages/1306918958 * * @param hdfsDin * @param startPos diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java new file mode 100644 index 0000000..4a76022 --- /dev/null +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java @@ -0,0 +1,243 @@ +package org.apache.kylin.engine.spark.common.logging; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.helpers.LogLog; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.spark.SparkEnv; +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil; + +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Locale; +import java.util.UUID; + +public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender { + + private static final long A_DAY_MILLIS = 24 * 60 * 60 * 1000L; + private static final long A_HOUR_MILLIS = 60 * 60 * 1000L; + private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault()); + private SimpleDateFormat hourFormat = new SimpleDateFormat("HH"); + + @VisibleForTesting + String outPutPath; + @VisibleForTesting + String executorId; + + @VisibleForTesting + long startTime = 0; + @VisibleForTesting + boolean rollingByHour = false; + @VisibleForTesting + int rollingPeriod = 5; + + //log appender configurable + private String metadataIdentifier; + private String category; + + private String identifier; + + // only cubing job + private String jobName; + private String project; + + public String getProject() { + return project; + } + + public void setProject(String project) { + this.project = project; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public void setIdentifier(String identifier) { + this.identifier = identifier; + } + + public String getIdentifier() { + return identifier; + } + + public void setCategory(String category) { + this.category = category; + } + + public String getCategory() { + return category; + } + + public void setMetadataIdentifier(String metadataIdentifier) { + this.metadataIdentifier = metadataIdentifier; + } + + public String getMetadataIdentifier() { + return metadataIdentifier; + } + + @Override + void init() { + if (StringUtils.isBlank(this.identifier)) { + this.identifier = YarnSparkHadoopUtil.getContainerId().getApplicationAttemptId().getApplicationId() + .toString(); + } + + LogLog.warn("metadataIdentifier -> " + getMetadataIdentifier()); + LogLog.warn("category -> " + getCategory()); + LogLog.warn("identifier -> " + getIdentifier()); + + if (null != getProject()) { + LogLog.warn("project -> " + getProject()); + } + + if (null != getJobName()) { + LogLog.warn("jobName -> " + getJobName()); + } + } + + @Override + String getAppenderName() { + return "SparkExecutorHdfsLogAppender"; + } + + @Override + boolean isSkipCheckAndFlushLog() { + if (SparkEnv.get() == null && StringUtils.isBlank(executorId)) { + LogLog.warn("Waiting for spark executor to start"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LogLog.error("Waiting for spark executor starting is interrupted!", e); + Thread.currentThread().interrupt(); + } + return true; + } + return false; + } + + @Override + void doWriteLog(int size, List<LoggingEvent> transaction) throws IOException, InterruptedException { + while (size > 0) { + final LoggingEvent loggingEvent = getLogBufferQue().take(); + if (isTimeChanged(loggingEvent)) { + updateOutPutDir(loggingEvent); + + final Path file = new Path(outPutPath); + + String sparkuser = System.getenv("SPARK_USER"); + String user = System.getenv("USER"); + LogLog.warn("login user is " + UserGroupInformation.getLoginUser() + " SPARK_USER is " + sparkuser + + " USER is " + user); + if (!initHdfsWriter(file, new Configuration())) { + LogLog.error("Failed to init the hdfs writer!"); + } + doRollingClean(loggingEvent); + } + + transaction.add(loggingEvent); + writeLogEvent(loggingEvent); + size--; + } + } + + @VisibleForTesting + void updateOutPutDir(LoggingEvent event) { + if (rollingByHour) { + String rollingDir = dateFormat.format(new Date(event.getTimeStamp())) + "/" + + hourFormat.format(new Date(event.getTimeStamp())); + outPutPath = getOutPutDir(rollingDir); + } else { + String rollingDir = dateFormat.format(new Date(event.getTimeStamp())); + outPutPath = getOutPutDir(rollingDir); + } + } + + private String getOutPutDir(String rollingDir) { + if (StringUtils.isBlank(executorId)) { + executorId = SparkEnv.get() != null ? SparkEnv.get().executorId() : UUID.randomUUID().toString(); + LogLog.warn("executorId set to " + executorId); + } + + if ("job".equals(getCategory())) { + return getRootPathName() + "/" + rollingDir + "/" + getIdentifier() + "/" + getJobName() + "/" + "executor-" + + executorId + ".log"; + } + return getRootPathName() + "/" + rollingDir + "/" + getIdentifier() + "/" + "executor-" + executorId + ".log"; + } + + @VisibleForTesting + void doRollingClean(LoggingEvent event) throws IOException { + FileSystem fileSystem = getFileSystem(); + + String rootPathName = getRootPathName(); + Path rootPath = new Path(rootPathName); + + if (!fileSystem.exists(rootPath)) + return; + + FileStatus[] logFolders = fileSystem.listStatus(rootPath); + + if (logFolders == null) + return; + + String thresholdDay = dateFormat.format(new Date(event.getTimeStamp() - A_DAY_MILLIS * rollingPeriod)); + + for (FileStatus fs : logFolders) { + String fileName = fs.getPath().getName(); + if (fileName.compareTo(thresholdDay) < 0) { + Path fullPath = new Path(rootPathName + File.separator + fileName); + if (!fileSystem.exists(fullPath)) + continue; + fileSystem.delete(fullPath, true); + } + } + } + + @VisibleForTesting + String getRootPathName() { + if ("job".equals(getCategory())) { + return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs"; + } else if ("sparder".equals(getCategory())) { + return parseHdfsWordingDir() + "/_sparder_logs"; + } else { + throw new IllegalArgumentException("illegal category: " + getCategory()); + } + } + + @VisibleForTesting + boolean isTimeChanged(LoggingEvent event) { + if (rollingByHour) { + return isNeedRolling(event, A_HOUR_MILLIS); + } else { + return isNeedRolling(event, A_DAY_MILLIS); + } + } + + private boolean isNeedRolling(LoggingEvent event, Long timeInterval) { + if (0 == startTime || ((event.getTimeStamp() / timeInterval) - (startTime / timeInterval)) > 0) { + startTime = event.getTimeStamp(); + return true; + } + return false; + } + + private String parseHdfsWordingDir() { + return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/") + + StringUtils.replace(getMetadataIdentifier(), "/", "-"); + } +} +