[
https://issues.apache.org/jira/browse/SPARK-43991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
shuyouZZ updated SPARK-43991:
-
Description:
Currently, if enable rolling log in SHS, only {{originalFilePath}} is used to
determine the path of compact file.
{code:java}
override val logPath: String = originalFilePath.toUri.toString +
EventLogFileWriter.COMPACTED
{code}
If the user set {{spark.eventLog.compression.codec}} in sparkConf and it is
different from the default value of spark conf, when the log compact logic is
triggered, the old event log file will be compacted and use the default value
of spark conf.
{code:java}
protected val compressionCodec =
if (shouldCompress) {
Some(CompressionCodec.createCodec(sparkConf,
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
} else {
None
}
private[history] val compressionCodecName = compressionCodec.map { c =>
CompressionCodec.getShortName(c.getClass.getName)
}
{code}
However, The compression codec used by EventLogFileReader to read log is split
from the log path, this will lead to EventLogFileReader can not read the
compacted log file normally.
{code:java}
def codecName(log: Path): Option[String] = {
// Compression codec is encoded as an extension, e.g. app_123.lzf
// Since we sanitize the app ID to not include periods, it is safe to split
on it
val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS)
logName.split("\\.").tail.lastOption
}
{code}
So we should override the {{shouldCompress}} and {{compressionCodec}} variable
in class {{{}CompactedEventLogFileWriter{}}}, use the compression codec set by
the user.
was:
Currently, if enable rolling log in SHS, only {{originalFilePath}} is used to
determine the path of compact file.
{code:java}
override val logPath: String = originalFilePath.toUri.toString +
EventLogFileWriter.COMPACTED
{code}
If the user set {{spark.eventLog.compression.codec}} in spark conf, when the
log compact logic is triggered, the old event log file will be compacted and
use the compression codec set by the spark default config file.
{code:java}
protected val compressionCodec =
if (shouldCompress) {
Some(CompressionCodec.createCodec(sparkConf,
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
} else {
None
}
private[history] val compressionCodecName = compressionCodec.map { c =>
CompressionCodec.getShortName(c.getClass.getName)
}
{code}
However, The compression codec used by EventLogFileReader to read log is split
from the log path, this will lead to EventLogFileReader can not read the
compacted log file normally.
{code:java}
def codecName(log: Path): Option[String] = {
// Compression codec is encoded as an extension, e.g. app_123.lzf
// Since we sanitize the app ID to not include periods, it is safe to split
on it
val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS)
logName.split("\\.").tail.lastOption
}
{code}
So we should improve the {{logPath}} method in class
CompactedEventLogFileWriter, use compression codec set by the spark default
config.
> Use the compression codec set by the spark config file when write compact log
> -
>
> Key: SPARK-43991
> URL: https://issues.apache.org/jira/browse/SPARK-43991
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, Web UI
>Affects Versions: 3.4.0
>Reporter: shuyouZZ
>Priority: Major
>
> Currently, if enable rolling log in SHS, only {{originalFilePath}} is used to
> determine the path of compact file.
> {code:java}
> override val logPath: String = originalFilePath.toUri.toString +
> EventLogFileWriter.COMPACTED
> {code}
> If the user set {{spark.eventLog.compression.codec}} in sparkConf and it is
> different from the default value of spark conf, when the log compact logic is
> triggered, the old event log file will be compacted and use the default value
> of spark conf.
> {code:java}
> protected val compressionCodec =
> if (shouldCompress) {
> Some(CompressionCodec.createCodec(sparkConf,
> sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
> } else {
> None
> }
> private[history] val compressionCodecName = compressionCodec.map { c =>
> CompressionCodec.getShortName(c.getClass.getName)
> }
> {code}
> However, The compression codec used by EventLogFileReader to read log is
> split from the log path, this will lead to EventLogFileReader can not read
> the compacted log file normally.
> {code:java}
> def codecName(log: Path): Option[String] = {
> // Compression codec is encoded as an extension, e.g. app_123.lzf
> // Since we sanitize the app ID to not include periods, it is safe to
> split on it
> val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS)
> l