[ https://issues.apache.org/jira/browse/SPARK-17549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15511459#comment-15511459 ]
Apache Spark commented on SPARK-17549: -------------------------------------- User 'vanzin' has created a pull request for this issue: https://github.com/apache/spark/pull/15189 > InMemoryRelation doesn't scale to large tables > ---------------------------------------------- > > Key: SPARK-17549 > URL: https://issues.apache.org/jira/browse/SPARK-17549 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.0, 2.0.0 > Reporter: Marcelo Vanzin > Fix For: 2.0.1, 2.1.0 > > Attachments: create_parquet.scala, example_1.6_post_patch.png, > example_1.6_pre_patch.png, spark-1.6-2.patch, spark-1.6.patch, spark-2.0.patch > > > An {{InMemoryRelation}} is created when you cache a table; but if the table > is large, defined by either having a really large amount of columns, or a > really large amount of partitions (in the file split sense, not the "table > partition" sense), or both, it causes an immense amount of memory to be used > in the driver. > The reason is that it uses an accumulator to collect statistics about each > partition, and instead of summarizing the data in the driver, it keeps *all* > entries in memory. > I'm attaching a script I used to create a parquet file with 20,000 columns > and a single row, which I then copied 500 times so I'd have 500 partitions. > When doing the following: > {code} > sqlContext.read.parquet(...).count() > {code} > Everything works fine, both in Spark 1.6 and 2.0. (It's super slow with the > settings I used, but it works.) > I ran spark-shell like this: > {code} > ./bin/spark-shell --master 'local-cluster[4,1,4096]' --driver-memory 2g > --conf spark.executor.memory=2g > {code} > And ran: > {code} > sqlContext.read.parquet(...).cache().count() > {code} > You'll see the results in screenshot {{example_1.6_pre_patch.png}}. After 40 > partitions were processed, there were 40 GenericInternalRow objects with > 100,000 items each (5 stat info fields * 20,000 columns). So, memory usage > was: > {code} > 40 * 100000 * (4 * 20 + 24) = 416000000 =~ 400MB > {code} > (Note: Integer = 20 bytes, Long = 24 bytes.) > If I waited until the end, there would be 500 partitions, so ~ 5GB of memory > to hold the stats. > I'm also attaching a patch I made on top of 1.6 that uses just a long > accumulator to capture the table size; with that patch memory usage on the > driver doesn't keep growing. Also note in the patch that I'm multiplying the > column size by the row count, which I think is a different bug in the > existing code (those stats should be for the whole batch, not just a single > row, right?). I also added {{example_1.6_post_patch.png}} to show the > {{InMemoryRelation}} with the patch. > I also applied a very similar patch on top of Spark 2.0. But there things > blow up even more spectacularly when I try to run the count on the cached > table. It starts with this error: > {noformat} > 14:19:43 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, > vanzin-st1-3.gce.cloudera.com): java.util.concurrent.ExecutionException: > java.lang.Exception: failed to compile: java.lang.IndexOutOfBoundsException: > Index: 63235, Size: 1 > (lots of generated code here...) > Caused by: java.lang.IndexOutOfBoundsException: Index: 63235, Size: 1 > at java.util.ArrayList.rangeCheck(ArrayList.java:635) > at java.util.ArrayList.get(ArrayList.java:411) > at > org.codehaus.janino.util.ClassFile.getConstantPoolInfo(ClassFile.java:556) > at > org.codehaus.janino.util.ClassFile.getConstantUtf8(ClassFile.java:572) > at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1513) > at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644) > at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623) > at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:913) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:911) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:911) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:883) > ... 54 more > {noformat} > And basically a lot of that going on making the output unreadable, so I just > killed the shell. Anyway, I believe the same fix should work there, but I > can't be sure because the test doesn't work for different reasons, it seems. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org