[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2020-09-08 Thread Avner Livne (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280
 ] 

Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:45 PM:
--

for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p => 
implicit val formats = DefaultFormats
p.collect({
case "v1" => "v1"
case x if { 
parse(x).extract[SinkFileStatus].modificationTime > ttl 
} => x
})
}).coalesce(1)
println(s"removing ${lines.count - reduced_lines.count} lines from 
${file.toString}...")
reduced_lines.saveAsTextFile(compacted_file)
FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, 
false, sc.hadoopConfiguration)
removePath(compacted_file, fs)
}

/**
  * get last compacted files if exists
  */
def getLastCompactFile(path: Path) = {
fs.listFiles(path, 
true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) 
=> 
x.getPath
})
}

val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))

val df = spark
  .readStream
  .format("kafka") ///. whatever stream you like

df.writeStream
  .trigger(Trigger.ProcessingTime(30))
  .format("parquet")
  .outputMode(OutputMode.Append())
  .option("checkpointLocation", "/my/checkpoint/path")
  .option("path", my_folder)
  .start()
{code}


this example will retain SinkFileStatus from the last 20 days and will purge 
everything else
I run this code on driver startup - but it can certainly run async in some 
sidecar cronjob 

tested on spark 3.0.0 writing parquet files 



was (Author: sta...@gmail.com):
for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = 

[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2020-09-08 Thread Avner Livne (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280
 ] 

Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:45 PM:
--

for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p => 
implicit val formats = DefaultFormats
p.collect({
case "v1" => "v1"
case x if { 
parse(x).extract[SinkFileStatus].modificationTime > ttl 
} => x
})
}).coalesce(1)
println(s"removing ${lines.count - reduced_lines.count} lines from 
${file.toString}...")
reduced_lines.saveAsTextFile(compacted_file)
FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, 
false, sc.hadoopConfiguration)
removePath(compacted_file, fs)
}

/**
  * get last compacted files if exists
  */
def getLastCompactFile(path: Path) = {
fs.listFiles(path, 
true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) 
=> 
x.getPath
})
}

val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))

val df = spark
  .readStream
  .format("kafka") ///. whatever stream you like

df.writeStream
  .trigger(Trigger.ProcessingTime(30))
  .format("parquet")
  .outputMode(OutputMode.Append())
  .option("checkpointLocation", "/my/checkpoint/path")
  .option("path", my_folder )
  .start()
{code}


this example will retain SinkFileStatus from the last 20 days and will purge 
everything else
I run this code on driver startup - but it can certainly run async in some 
sidecar cronjob 

tested on spark 3.0.0 writing parquet files 



was (Author: sta...@gmail.com):
for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = 

[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2020-09-08 Thread Avner Livne (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280
 ] 

Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:44 PM:
--

for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p => 
implicit val formats = DefaultFormats
p.collect({
case "v1" => "v1"
case x if { 
parse(x).extract[SinkFileStatus].modificationTime > ttl 
} => x
})
}).coalesce(1)
println(s"removing ${lines.count - reduced_lines.count} lines from 
${file.toString}...")
reduced_lines.saveAsTextFile(compacted_file)
FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, 
false, sc.hadoopConfiguration)
removePath(compacted_file, fs)
}

/**
  * get last compacted files if exists
  */
def getLastCompactFile(path: Path) = {
fs.listFiles(path, 
true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) 
=> 
x.getPath
})
}

val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))

val df = spark
  .readStream
  .format("kafka") ///. whatever stream you like

df3
  .writeStream
  .trigger(Trigger.ProcessingTime(30))
  .format("parquet")
  .outputMode(OutputMode.Append())
  .option("checkpointLocation", "/my/checkpoint/path")
  .option("path", my_folder )
  .start()
{code}


this example will retain SinkFileStatus from the last 20 days and will purge 
everything else
I run this code on driver startup - but it can certainly run async in some 
sidecar cronjob 

tested on spark 3.0.0 writing parquet files 



was (Author: sta...@gmail.com):
for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = 

[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2020-09-08 Thread Avner Livne (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280
 ] 

Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:42 PM:
--

for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p => 
implicit val formats = DefaultFormats
p.collect({
case "v1" => "v1"
case x if { 
parse(x).extract[SinkFileStatus].modificationTime > ttl 
} => x
})
}).coalesce(1)
println(s"removing ${lines.count - reduced_lines.count} lines from 
${file.toString}...")
reduced_lines.saveAsTextFile(compacted_file)
FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, 
false, sc.hadoopConfiguration)
removePath(compacted_file, fs)
}

/**
  * get last compacted files if exists
  */
def getLastCompactFile(path: Path) = {
fs.listFiles(path, 
true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) 
=> 
x.getPath
})
}

val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))

val df = spark
  .readStream
  .format("kafka") ///. whatever stream you like
{code}


this example will retain SinkFileStatus from the last 20 days and will purge 
everything else
I run this code on driver startup - but it can certainly run async in some 
sidecar cronjob 

tested on spark 3.0.0 writing parquet files 



was (Author: sta...@gmail.com):
for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p => 
implicit val formats = DefaultFormats
p.collect({

[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2020-09-08 Thread Avner Livne (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280
 ] 

Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:42 PM:
--

for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p => 
implicit val formats = DefaultFormats
p.collect({
case "v1" => "v1"
case x if { 
parse(x).extract[SinkFileStatus].modificationTime > ttl 
} => x
})
}).coalesce(1)
println(s"removing ${lines.count - reduced_lines.count} lines from 
${file.toString}...")
reduced_lines.saveAsTextFile(compacted_file)
FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, 
false, sc.hadoopConfiguration)
removePath(compacted_file, fs)
}

/**
  * get last compacted files if exists
  */
def getLastCompactFile(path: Path) = {
fs.listFiles(path, 
true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) 
=> 
x.getPath
})
}

val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))

val df = spark
  .readStream
  .format("kafka") ///. whatever stream you like
{code}


this example will retain SinkFileStatus from the last 20 days and will purge 
everything else
I run this code on driver startup - but it can certainly run async in some 
sidecar cronjob 

tested on spark 3.0.0 writing parquet files 



was (Author: sta...@gmail.com):
for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p => 
implicit val formats = DefaultFormats
p.collect({

[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2020-09-08 Thread Avner Livne (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280
 ] 

Avner Livne commented on SPARK-24295:
-

for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p => 
implicit val formats = DefaultFormats
p.collect({
case "v1" => "v1"
case x if { 
parse(x).extract[SinkFileStatus].modificationTime > ttl 
} => x
})
}).coalesce(1)
println(s"removing ${lines.count - reduced_lines.count} lines from 
${file.toString}...")
reduced_lines.saveAsTextFile(compacted_file)
FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, 
false, sc.hadoopConfiguration)
removePath(compacted_file, fs)
}

/**
  * get last compacted files if exists
  */
def getLastCompactFile(path: Path) = {
fs.listFiles(path, 
true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) 
=> 
x.getPath
})
}

val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))

val df1 = spark
  .readStream
  .format("kafka") ///. whatever stream you like
{code}


this example will retain SinkFileStatus from the last 20 days and will purge 
everything else
I run this code on driver startup - but it can certainly run async in some 
sidecar cronjob 

tested on spark 3.0.0 writing parquet files 


> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org