[ 
https://issues.apache.org/jira/browse/SPARK-43991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuyouZZ updated SPARK-43991:
-----------------------------
    Description: 
Currently, if enable rolling log in SHS, only {{originalFilePath}} is used to 
determine the path of compact file.
{code:java}
override val logPath: String = originalFilePath.toUri.toString + 
EventLogFileWriter.COMPACTED
{code}
If the user set {{spark.eventLog.compression.codec}} in sparkConf and it is 
different from the default value of spark conf, when the log compact logic is 
triggered, the old event log file will be compacted and use the default value 
of spark conf.
{code:java}
protected val compressionCodec =
    if (shouldCompress) {
      Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
    } else {
      None
    }

private[history] val compressionCodecName = compressionCodec.map { c =>
    CompressionCodec.getShortName(c.getClass.getName)
  }
{code}
However, The compression codec used by EventLogFileReader to read log is split 
from the log path, this will lead to EventLogFileReader can not read the 
compacted log file normally.
{code:java}
def codecName(log: Path): Option[String] = {
    // Compression codec is encoded as an extension, e.g. app_123.lzf
    // Since we sanitize the app ID to not include periods, it is safe to split 
on it
    val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS)
    logName.split("\\.").tail.lastOption
  }
{code}
So we should override the {{shouldCompress}} and {{compressionCodec}} variable 
in class {{{}CompactedEventLogFileWriter{}}}, use the compression codec set by 
the user.

  was:
Currently, if enable rolling log in SHS, only {{originalFilePath}} is used to 
determine the path of compact file.
{code:java}
override val logPath: String = originalFilePath.toUri.toString + 
EventLogFileWriter.COMPACTED
{code}
If the user set {{spark.eventLog.compression.codec}} in spark conf, when the 
log compact logic is triggered, the old event log file will be compacted and 
use the compression codec set by the spark default config file.
{code:java}
protected val compressionCodec =
    if (shouldCompress) {
      Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
    } else {
      None
    }

private[history] val compressionCodecName = compressionCodec.map { c =>
    CompressionCodec.getShortName(c.getClass.getName)
  }
{code}
However, The compression codec used by EventLogFileReader to read log is split 
from the log path, this will lead to EventLogFileReader can not read the 
compacted log file normally.
{code:java}
def codecName(log: Path): Option[String] = {
    // Compression codec is encoded as an extension, e.g. app_123.lzf
    // Since we sanitize the app ID to not include periods, it is safe to split 
on it
    val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS)
    logName.split("\\.").tail.lastOption
  }
{code}
So we should improve the {{logPath}} method in class 
CompactedEventLogFileWriter, use compression codec set by the spark default 
config.


> Use the compression codec set by the spark config file when write compact log
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-43991
>                 URL: https://issues.apache.org/jira/browse/SPARK-43991
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Web UI
>    Affects Versions: 3.4.0
>            Reporter: shuyouZZ
>            Priority: Major
>
> Currently, if enable rolling log in SHS, only {{originalFilePath}} is used to 
> determine the path of compact file.
> {code:java}
> override val logPath: String = originalFilePath.toUri.toString + 
> EventLogFileWriter.COMPACTED
> {code}
> If the user set {{spark.eventLog.compression.codec}} in sparkConf and it is 
> different from the default value of spark conf, when the log compact logic is 
> triggered, the old event log file will be compacted and use the default value 
> of spark conf.
> {code:java}
> protected val compressionCodec =
>     if (shouldCompress) {
>       Some(CompressionCodec.createCodec(sparkConf, 
> sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
>     } else {
>       None
>     }
> private[history] val compressionCodecName = compressionCodec.map { c =>
>     CompressionCodec.getShortName(c.getClass.getName)
>   }
> {code}
> However, The compression codec used by EventLogFileReader to read log is 
> split from the log path, this will lead to EventLogFileReader can not read 
> the compacted log file normally.
> {code:java}
> def codecName(log: Path): Option[String] = {
>     // Compression codec is encoded as an extension, e.g. app_123.lzf
>     // Since we sanitize the app ID to not include periods, it is safe to 
> split on it
>     val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS)
>     logName.split("\\.").tail.lastOption
>   }
> {code}
> So we should override the {{shouldCompress}} and {{compressionCodec}} 
> variable in class {{{}CompactedEventLogFileWriter{}}}, use the compression 
> codec set by the user.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to