[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.
[ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280 ] Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:45 PM: -- for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file = s"/tmp/${file.getName.toString}" removePath(compacted_file, fs) val lines = sc.textFile(file.toString) val reduced_lines = lines.mapPartitions({ p => implicit val formats = DefaultFormats p.collect({ case "v1" => "v1" case x if { parse(x).extract[SinkFileStatus].modificationTime > ttl } => x }) }).coalesce(1) println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...") reduced_lines.saveAsTextFile(compacted_file) FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, false, sc.hadoopConfiguration) removePath(compacted_file, fs) } /** * get last compacted files if exists */ def getLastCompactFile(path: Path) = { fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({ case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) => x.getPath }) } val my_folder = "/my/root/spark/structerd/streaming/output/folder" val metadata_folder = new Path(s"$my_folder/_spark_metadata")) getLastCompactFile(metadata_folder).map(x => compact(x, 20)) val df = spark .readStream .format("kafka") ///. whatever stream you like df.writeStream .trigger(Trigger.ProcessingTime(30)) .format("parquet") .outputMode(OutputMode.Append()) .option("checkpointLocation", "/my/checkpoint/path") .option("path", my_folder) .start() {code} this example will retain SinkFileStatus from the last 20 days and will purge everything else I run this code on driver startup - but it can certainly run async in some sidecar cronjob tested on spark 3.0.0 writing parquet files was (Author: sta...@gmail.com): for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file =
[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.
[ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280 ] Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:45 PM: -- for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file = s"/tmp/${file.getName.toString}" removePath(compacted_file, fs) val lines = sc.textFile(file.toString) val reduced_lines = lines.mapPartitions({ p => implicit val formats = DefaultFormats p.collect({ case "v1" => "v1" case x if { parse(x).extract[SinkFileStatus].modificationTime > ttl } => x }) }).coalesce(1) println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...") reduced_lines.saveAsTextFile(compacted_file) FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, false, sc.hadoopConfiguration) removePath(compacted_file, fs) } /** * get last compacted files if exists */ def getLastCompactFile(path: Path) = { fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({ case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) => x.getPath }) } val my_folder = "/my/root/spark/structerd/streaming/output/folder" val metadata_folder = new Path(s"$my_folder/_spark_metadata")) getLastCompactFile(metadata_folder).map(x => compact(x, 20)) val df = spark .readStream .format("kafka") ///. whatever stream you like df.writeStream .trigger(Trigger.ProcessingTime(30)) .format("parquet") .outputMode(OutputMode.Append()) .option("checkpointLocation", "/my/checkpoint/path") .option("path", my_folder ) .start() {code} this example will retain SinkFileStatus from the last 20 days and will purge everything else I run this code on driver startup - but it can certainly run async in some sidecar cronjob tested on spark 3.0.0 writing parquet files was (Author: sta...@gmail.com): for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file =
[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.
[ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280 ] Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:44 PM: -- for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file = s"/tmp/${file.getName.toString}" removePath(compacted_file, fs) val lines = sc.textFile(file.toString) val reduced_lines = lines.mapPartitions({ p => implicit val formats = DefaultFormats p.collect({ case "v1" => "v1" case x if { parse(x).extract[SinkFileStatus].modificationTime > ttl } => x }) }).coalesce(1) println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...") reduced_lines.saveAsTextFile(compacted_file) FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, false, sc.hadoopConfiguration) removePath(compacted_file, fs) } /** * get last compacted files if exists */ def getLastCompactFile(path: Path) = { fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({ case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) => x.getPath }) } val my_folder = "/my/root/spark/structerd/streaming/output/folder" val metadata_folder = new Path(s"$my_folder/_spark_metadata")) getLastCompactFile(metadata_folder).map(x => compact(x, 20)) val df = spark .readStream .format("kafka") ///. whatever stream you like df3 .writeStream .trigger(Trigger.ProcessingTime(30)) .format("parquet") .outputMode(OutputMode.Append()) .option("checkpointLocation", "/my/checkpoint/path") .option("path", my_folder ) .start() {code} this example will retain SinkFileStatus from the last 20 days and will purge everything else I run this code on driver startup - but it can certainly run async in some sidecar cronjob tested on spark 3.0.0 writing parquet files was (Author: sta...@gmail.com): for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file =
[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.
[ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280 ] Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:42 PM: -- for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file = s"/tmp/${file.getName.toString}" removePath(compacted_file, fs) val lines = sc.textFile(file.toString) val reduced_lines = lines.mapPartitions({ p => implicit val formats = DefaultFormats p.collect({ case "v1" => "v1" case x if { parse(x).extract[SinkFileStatus].modificationTime > ttl } => x }) }).coalesce(1) println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...") reduced_lines.saveAsTextFile(compacted_file) FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, false, sc.hadoopConfiguration) removePath(compacted_file, fs) } /** * get last compacted files if exists */ def getLastCompactFile(path: Path) = { fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({ case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) => x.getPath }) } val my_folder = "/my/root/spark/structerd/streaming/output/folder" val metadata_folder = new Path(s"$my_folder/_spark_metadata")) getLastCompactFile(metadata_folder).map(x => compact(x, 20)) val df = spark .readStream .format("kafka") ///. whatever stream you like {code} this example will retain SinkFileStatus from the last 20 days and will purge everything else I run this code on driver startup - but it can certainly run async in some sidecar cronjob tested on spark 3.0.0 writing parquet files was (Author: sta...@gmail.com): for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file = s"/tmp/${file.getName.toString}" removePath(compacted_file, fs) val lines = sc.textFile(file.toString) val reduced_lines = lines.mapPartitions({ p => implicit val formats = DefaultFormats p.collect({
[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.
[ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280 ] Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:42 PM: -- for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file = s"/tmp/${file.getName.toString}" removePath(compacted_file, fs) val lines = sc.textFile(file.toString) val reduced_lines = lines.mapPartitions({ p => implicit val formats = DefaultFormats p.collect({ case "v1" => "v1" case x if { parse(x).extract[SinkFileStatus].modificationTime > ttl } => x }) }).coalesce(1) println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...") reduced_lines.saveAsTextFile(compacted_file) FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, false, sc.hadoopConfiguration) removePath(compacted_file, fs) } /** * get last compacted files if exists */ def getLastCompactFile(path: Path) = { fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({ case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) => x.getPath }) } val my_folder = "/my/root/spark/structerd/streaming/output/folder" val metadata_folder = new Path(s"$my_folder/_spark_metadata")) getLastCompactFile(metadata_folder).map(x => compact(x, 20)) val df = spark .readStream .format("kafka") ///. whatever stream you like {code} this example will retain SinkFileStatus from the last 20 days and will purge everything else I run this code on driver startup - but it can certainly run async in some sidecar cronjob tested on spark 3.0.0 writing parquet files was (Author: sta...@gmail.com): for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file = s"/tmp/${file.getName.toString}" removePath(compacted_file, fs) val lines = sc.textFile(file.toString) val reduced_lines = lines.mapPartitions({ p => implicit val formats = DefaultFormats p.collect({
[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.
[ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280 ] Avner Livne commented on SPARK-24295: - for those looking for a temporary workaround: run this code before you start the streaming: {code:java} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.sql._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.execution.streaming._ /** * regex to find last compact file */ val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored val fs = FileSystem.get(sc.hadoopConfiguration) /** * implicit hadoop RemoteIterator convertor */ implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = { case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] { override def hasNext: Boolean = underlying.hasNext override def next: T = underlying.next } wrapper(underlying) } /** * delete file or folder recursively */ def removePath(dstPath: String, fs: FileSystem): Unit = { val path = new Path(dstPath) if (fs.exists(path)) { println(s"deleting ${dstPath}...") fs.delete(path, true) } } /** * remove json entries older than `days` from compact file * preserve `v1` at the head of the file * re write the small file back to the original destination */ def compact(file: Path, days: Int = 20) = { val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days) val compacted_file = s"/tmp/${file.getName.toString}" removePath(compacted_file, fs) val lines = sc.textFile(file.toString) val reduced_lines = lines.mapPartitions({ p => implicit val formats = DefaultFormats p.collect({ case "v1" => "v1" case x if { parse(x).extract[SinkFileStatus].modificationTime > ttl } => x }) }).coalesce(1) println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...") reduced_lines.saveAsTextFile(compacted_file) FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, false, sc.hadoopConfiguration) removePath(compacted_file, fs) } /** * get last compacted files if exists */ def getLastCompactFile(path: Path) = { fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({ case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) => x.getPath }) } val my_folder = "/my/root/spark/structerd/streaming/output/folder" val metadata_folder = new Path(s"$my_folder/_spark_metadata")) getLastCompactFile(metadata_folder).map(x => compact(x, 20)) val df1 = spark .readStream .format("kafka") ///. whatever stream you like {code} this example will retain SinkFileStatus from the last 20 days and will purge everything else I run this code on driver startup - but it can certainly run async in some sidecar cronjob tested on spark 3.0.0 writing parquet files > Purge Structured streaming FileStreamSinkLog metadata compact file data. > > > Key: SPARK-24295 > URL: https://issues.apache.org/jira/browse/SPARK-24295 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Iqbal Singh >Priority: Major > Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz > > > FileStreamSinkLog metadata logs are concatenated to a single compact file > after defined compact interval. > For long running jobs, compact file size can grow up to 10's of GB's, Causing > slowness while reading the data from FileStreamSinkLog dir as spark is > defaulting to the "__spark__metadata" dir for the read. > We need a functionality to purge the compact file size. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org