[ https://issues.apache.org/jira/browse/SPARK-21710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-21710: ------------------------------------ Assignee: (was: Apache Spark) > ConsoleSink causes OOM crashes with large inputs. > ------------------------------------------------- > > Key: SPARK-21710 > URL: https://issues.apache.org/jira/browse/SPARK-21710 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.0 > Environment: affects all environments > Reporter: Gerard Maas > Labels: easyfix > > ConsoleSink does a full collect of the streaming dataset in order to show few > lines on screen. This is problematic with large inputs, like a kafka backlog > or a file source with files larger than the driver's memory. > Here's an example: > {code:java} > import spark.implicits._ > import org.apache.spark.sql.functions > import org.apache.spark.sql.types.StructType > import org.apache.spark.sql.types._ > val schema = StructType(StructField("text", StringType, true) :: Nil) > val lines = spark > .readStream > .format("text") > .option("path", "/tmp/data") > .schema(schema) > .load() > val base = lines.writeStream > .outputMode("append") > .format("console") > .start() > {code} > When a large file larger than the available driver memory is fed through this > streaming job, we get: > {code:java} > ------------------------------------------- > Batch: 0 > ------------------------------------------- > [Stage 0:> (0 + 8) / > 111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID > 6) > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at > net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205) > at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at > org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:237) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > 17/08/11 15:10:45 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker for task 6,5,main] > java.lang.OutOfMemoryError: Java heap space > {code} > This issue can be traced back to a `collect` on the source `DataFrame`: > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L52 > A fairly simple solution would be to do a `take(numRows)` instead of the > collect. (PR in progress) -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org