PySpark OOM when running PCA
Hi list, I am having troubles running a PCA with pyspark. I am trying to reduce a matrix size since my features after OHE gets 40k wide. Spark 2.2.0 Stand-alone (Oracle JVM) pyspark 2.2.0 from a docker (OpenJDK) I'm starting the spark session from the notebook however I make sure to: - PYSPARK_SUBMIT_ARGS: "--packages ... --driver-memory 20G pyspark-shell" - sparkConf.set("spark.executor.memory", "24G") - sparkConf.set("spark.driver.memory", "20G") My executors gets 24Gb per node, and my driver process starts with: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/* -Xmx20G org.apache.spark.deploy.SparkSubmit --conf spark.executor.memory=24G ... pyspark-shell So I should have plenty of memory to play with, however when running PCA.fit I get in the spark driver logs: 19/02/08 01:02:43 WARN TaskSetManager: Stage 29 contains a task of very large size (142 KB). The maximum recommended task size is 100 KB. 19/02/08 01:02:43 WARN RowMatrix: 34771 columns will require at least 9672 megabytes of memory! 19/02/08 01:02:46 WARN RowMatrix: 34771 columns will require at least 9672 megabytes of memory! Eventually fails: Py4JJavaError: An error occurred while calling o287.fit. : java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41) ... at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:122) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:344) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponentsAndExplainedVariance(RowMatrix.scala:387) at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:48) ... What am I missing ? Any hints much appreciated,
Re: Aws
Hi Pedro, It seems that you disabled maximize resource allocation in 5.16, but enabled in 5.20. This config can be different based on how you start EMR cluster (via quick wizard, advanced wizard in console, or CLI/API). You can see that in EMR console Configuration tab. Please compare spark properties (especially spark.executor.cores, spark.executor.memory, spark.dynamicAllocation.enabled, etc.) between your two Spark cluster with different version of EMR. You can see them from Spark web UI’s environment tab or log files. Then please try with the same properties against the same dataset with the same deployment mode (cluster or client). Even in EMR, you can configure num of cores and memory of driver/executors in config files, arguments in spark-submit, and inside Spark app if you need. Warm regards, Nori 2019年2月8日(金) 8:16 Hiroyuki Nagata : > Hi, > thank you Pedro > > I tested maximizeResourceAllocation option. When it's enabled, it seems > Spark utilized their cores fully. However the performance is not so > different from default setting. > > I consider to use s3-distcp for uploading files. And, I think > table(dataframe) caching is also effectiveness. > > Regards, > Hiroyuki > > 2019年2月2日(土) 1:12 Pedro Tuero : > >> Hi Hiroyuki, thanks for the answer. >> >> I found a solution for the cores per executor configuration: >> I set this configuration to true: >> >> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation >> Probably it was true by default at version 5.16, but I didn't find when >> it has changed. >> In the same link, it says that dynamic allocation is true by default. I >> thought it would do the trick but reading again I think it is related to >> the number of executors rather than the number of cores. >> >> But the jobs are still taking more than before. >> Watching application history, I see these differences: >> For the same job, the same kind of instances types, default (aws managed) >> configuration for executors, cores, and memory: >> Instances: >> 6 r5.xlarge : 4 vCpu , 32gb of mem. (So there is 24 cores: 6 instances * >> 4 cores). >> >> With 5.16: >> - 24 executors (4 in each instance, including the one who also had the >> driver). >> - 4 cores each. >> - 2.7 * 2 (Storage + on-heap storage) memory each. >> - 1 executor per core, but at the same time 4 cores per executor (?). >> - Total Mem in executors per Instance : 21.6 (2.7 * 2 * 4) >> - Total Elapsed Time: 6 minutes >> With 5.20: >> - 5 executors (1 in each instance, 0 in the instance with the driver). >> - 4 cores each. >> - 11.9 * 2 (Storage + on-heap storage) memory each. >> - Total Mem in executors per Instance : 23.8 (11.9 * 2 * 1) >> - Total Elapsed Time: 8 minutes >> >> >> I don't understand the configuration of 5.16, but it works better. >> It seems that in 5.20, a full instance is wasted with the driver only, >> while it could also contain an executor. >> >> >> Regards, >> Pedro. >> >> >> >> l jue., 31 de ene. de 2019 20:16, Hiroyuki Nagata >> escribió: >> >>> Hi, Pedro >>> >>> >>> I also start using AWS EMR, with Spark 2.4.0. I'm seeking methods for >>> performance tuning. >>> >>> Do you configure dynamic allocation ? >>> >>> FYI: >>> >>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation >>> >>> I've not tested it yet. I guess spark-submit needs to specify number of >>> executors. >>> >>> Regards, >>> Hiroyuki >>> >>> 2019年2月1日(金) 5:23、Pedro Tuero さん(tuerope...@gmail.com)のメッセージ: >>> Hi guys, I use to run spark jobs in Aws emr. Recently I switch from aws emr label 5.16 to 5.20 (which use Spark 2.4.0). I've noticed that a lot of steps are taking longer than before. I think it is related to the automatic configuration of cores by executor. In version 5.16, some executors toke more cores if the instance allows it. Let say, if an instance had 8 cores and 40gb of ram, and ram configured by executor was 10gb, then aws emr automatically assigned 2 cores by executor. Now in label 5.20, unless I configure the number of cores manually, only one core is assigned per executor. I don't know if it is related to Spark 2.4.0 or if it is something managed by aws... Does anyone know if there is a way to automatically use more cores when it is physically possible? Thanks, Peter. >>>
Re: Aws
Hi, thank you Pedro I tested maximizeResourceAllocation option. When it's enabled, it seems Spark utilized their cores fully. However the performance is not so different from default setting. I consider to use s3-distcp for uploading files. And, I think table(dataframe) caching is also effectiveness. Regards, Hiroyuki 2019年2月2日(土) 1:12 Pedro Tuero : > Hi Hiroyuki, thanks for the answer. > > I found a solution for the cores per executor configuration: > I set this configuration to true: > > https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation > Probably it was true by default at version 5.16, but I didn't find when it > has changed. > In the same link, it says that dynamic allocation is true by default. I > thought it would do the trick but reading again I think it is related to > the number of executors rather than the number of cores. > > But the jobs are still taking more than before. > Watching application history, I see these differences: > For the same job, the same kind of instances types, default (aws managed) > configuration for executors, cores, and memory: > Instances: > 6 r5.xlarge : 4 vCpu , 32gb of mem. (So there is 24 cores: 6 instances * > 4 cores). > > With 5.16: > - 24 executors (4 in each instance, including the one who also had the > driver). > - 4 cores each. > - 2.7 * 2 (Storage + on-heap storage) memory each. > - 1 executor per core, but at the same time 4 cores per executor (?). > - Total Mem in executors per Instance : 21.6 (2.7 * 2 * 4) > - Total Elapsed Time: 6 minutes > With 5.20: > - 5 executors (1 in each instance, 0 in the instance with the driver). > - 4 cores each. > - 11.9 * 2 (Storage + on-heap storage) memory each. > - Total Mem in executors per Instance : 23.8 (11.9 * 2 * 1) > - Total Elapsed Time: 8 minutes > > > I don't understand the configuration of 5.16, but it works better. > It seems that in 5.20, a full instance is wasted with the driver only, > while it could also contain an executor. > > > Regards, > Pedro. > > > > l jue., 31 de ene. de 2019 20:16, Hiroyuki Nagata > escribió: > >> Hi, Pedro >> >> >> I also start using AWS EMR, with Spark 2.4.0. I'm seeking methods for >> performance tuning. >> >> Do you configure dynamic allocation ? >> >> FYI: >> >> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation >> >> I've not tested it yet. I guess spark-submit needs to specify number of >> executors. >> >> Regards, >> Hiroyuki >> >> 2019年2月1日(金) 5:23、Pedro Tuero さん(tuerope...@gmail.com)のメッセージ: >> >>> Hi guys, >>> I use to run spark jobs in Aws emr. >>> Recently I switch from aws emr label 5.16 to 5.20 (which use Spark >>> 2.4.0). >>> I've noticed that a lot of steps are taking longer than before. >>> I think it is related to the automatic configuration of cores by >>> executor. >>> In version 5.16, some executors toke more cores if the instance allows >>> it. >>> Let say, if an instance had 8 cores and 40gb of ram, and ram configured >>> by executor was 10gb, then aws emr automatically assigned 2 cores by >>> executor. >>> Now in label 5.20, unless I configure the number of cores manually, only >>> one core is assigned per executor. >>> >>> I don't know if it is related to Spark 2.4.0 or if it is something >>> managed by aws... >>> Does anyone know if there is a way to automatically use more cores when >>> it is physically possible? >>> >>> Thanks, >>> Peter. >>> >>
Spark 2.4 partitions and tasks
Hi, I am running a job in spark (using aws emr) and some stages are taking a lot more using spark 2.4 instead of Spark 2.3.1: Spark 2.4: [image: image.png] Spark 2.3.1: [image: image.png] With Spark 2.4, the keyBy operation take more than 10X what it took with Spark 2.3.1 It seems to be related to the number of tasks / partitions. Questions: - Is it not supposed that the number of task of a job is related to number of parts of the RDD left by the previous job? Did that change in version 2.4?? - Which tools/ configuration may I try, to reduce this aberrant downgrade of performance?? Thanks. Pedro.
Re: java.lang.IllegalArgumentException: Unsupported class file major version 55
ASM 6 doesn't support Java 11. In master branch (for Spark 3.0) there's dependency upgrade on ASM 7 and also some efforts (if my understanding is right) to support Java 11, so you may need to use lower version of JDK (8 safest) for Spark 2.4.0, and try out master branch for preparing Java 11. Thanks, Jungtaek Lim (HeartSaVioR) 2019년 2월 7일 (목) 오후 9:18, Gabor Somogyi 님이 작성: > Hi Hande, > > "Unsupported class file major version 55" means java incompatibility. > This error means you're trying to load a Java "class" file that was > compiled with a newer version of Java than you have installed. > For example, your .class file could have been compiled for JDK 8, and > you're trying to run it with JDK 7. > Are you sure 11 is the only JDK which is the default? > > Small number of peoples playing with JDK 11 but not heavily tested and > used. > Spark may or may not work but not suggested for production in general. > > BR, > G > > > On Thu, Feb 7, 2019 at 12:53 PM Hande, Ranjit Dilip (Ranjit) < > ha...@avaya.com> wrote: > >> Hi, >> >> I am developing one java process which will consume data from Kafka using >> Apache Spark Streaming. >> For this I am using following: >> >> Java: >> openjdk version "11.0.1" 2018-10-16 LTS >> OpenJDK Runtime Environment Zulu11.2+3 (build 11.0.1+13-LTS) OpenJDK >> 64-Bit Server VM Zulu11.2+3 (build 11.0.1+13-LTS, mixed mode) >> >> Maven: (Spark Streaming) >> >> org.apache.spark >> spark-streaming-kafka-0-10_2.11 >> 2.4.0 >> >> >> org.apache.spark >> spark-streaming_2.11 >> 2.4.0 >> >> >> I am able to compile project successfully but when I try to run I get >> following error: >> >> {"@timestamp":"2019-02-07T11:54:30.624+05:30","@version":"1","message":"Application >> run >> failed","logger_name":"org.springframework.boot.SpringApplication","thread_name":"main","level":"ERROR","level_value":4,"stack_trace":"java.lang.IllegalStateException: >> Failed to execute CommandLineRunner at >> org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:816) >> at >> org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797) >> at >> org.springframework.boot.SpringApplication.run(SpringApplication.java:324) >> at >> com.avaya.measures.AgentMeasures.AgentMeasuresApplication.main(AgentMeasuresApplication.java:41) >> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) at >> org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) >> at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) at >> >> org.springframework.boot.loader.Launcher.launch(Launcher.java:50) at >> org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51)\r\nCaused >> by: java.lang.IllegalArgumentException: Unsupported class file major >> version 55 at >> >> org.apache.xbean.asm6.ClassReader.(ClassReader.java:166) at >> org.apache.xbean.asm6.ClassReader.(ClassReader.java:148) at >> org.apache.xbean.asm6.ClassReader.(ClassReader.java:136) at >> org.apache.xbean.asm6.ClassReader.(ClassReader.java:237) at >> org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49) >> at >> org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517) >> at >> org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500) >> at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) >> at >> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) >> at >> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) >> at >> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) >> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at >> scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134) at >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) >> at >> org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500) >> at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175) at >> org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238) at >> org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631) at >> org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355) at >> org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307) >> at >> org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306) >> at
Re: java.lang.IllegalArgumentException: Unsupported class file major version 55
Hi Hande, "Unsupported class file major version 55" means java incompatibility. This error means you're trying to load a Java "class" file that was compiled with a newer version of Java than you have installed. For example, your .class file could have been compiled for JDK 8, and you're trying to run it with JDK 7. Are you sure 11 is the only JDK which is the default? Small number of peoples playing with JDK 11 but not heavily tested and used. Spark may or may not work but not suggested for production in general. BR, G On Thu, Feb 7, 2019 at 12:53 PM Hande, Ranjit Dilip (Ranjit) < ha...@avaya.com> wrote: > Hi, > > I am developing one java process which will consume data from Kafka using > Apache Spark Streaming. > For this I am using following: > > Java: > openjdk version "11.0.1" 2018-10-16 LTS > OpenJDK Runtime Environment Zulu11.2+3 (build 11.0.1+13-LTS) OpenJDK > 64-Bit Server VM Zulu11.2+3 (build 11.0.1+13-LTS, mixed mode) > > Maven: (Spark Streaming) > > org.apache.spark > spark-streaming-kafka-0-10_2.11 > 2.4.0 > > > org.apache.spark > spark-streaming_2.11 > 2.4.0 > > > I am able to compile project successfully but when I try to run I get > following error: > > {"@timestamp":"2019-02-07T11:54:30.624+05:30","@version":"1","message":"Application > run > failed","logger_name":"org.springframework.boot.SpringApplication","thread_name":"main","level":"ERROR","level_value":4,"stack_trace":"java.lang.IllegalStateException: > Failed to execute CommandLineRunner at > org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:816) > at > org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797) > at > org.springframework.boot.SpringApplication.run(SpringApplication.java:324) > at > com.avaya.measures.AgentMeasures.AgentMeasuresApplication.main(AgentMeasuresApplication.java:41) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) > at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) at > > org.springframework.boot.loader.Launcher.launch(Launcher.java:50) at > org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51)\r\nCaused > by: java.lang.IllegalArgumentException: Unsupported class file major > version 55 at > > org.apache.xbean.asm6.ClassReader.(ClassReader.java:166) at > org.apache.xbean.asm6.ClassReader.(ClassReader.java:148) at > org.apache.xbean.asm6.ClassReader.(ClassReader.java:136) at > org.apache.xbean.asm6.ClassReader.(ClassReader.java:237) at > org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49) > at > org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517) > at > org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) > at > scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at > scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at > org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500) > at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175) at > org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238) at > org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631) at > org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355) at > org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307) > at > org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306) > at scala.collection.immutable.List.foreach(List.scala:392) at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at > org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at > org.apache.spark.SparkContext.runJob(SparkContext.scala:2100) at > org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364) at >
java.lang.IllegalArgumentException: Unsupported class file major version 55
Hi, I am developing one java process which will consume data from Kafka using Apache Spark Streaming. For this I am using following: Java: openjdk version "11.0.1" 2018-10-16 LTS OpenJDK Runtime Environment Zulu11.2+3 (build 11.0.1+13-LTS) OpenJDK 64-Bit Server VM Zulu11.2+3 (build 11.0.1+13-LTS, mixed mode) Maven: (Spark Streaming) org.apache.spark spark-streaming-kafka-0-10_2.11 2.4.0 org.apache.spark spark-streaming_2.11 2.4.0 I am able to compile project successfully but when I try to run I get following error: {"@timestamp":"2019-02-07T11:54:30.624+05:30","@version":"1","message":"Application run failed","logger_name":"org.springframework.boot.SpringApplication","thread_name":"main","level":"ERROR","level_value":4,"stack_trace":"java.lang.IllegalStateException: Failed to execute CommandLineRunner at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:816) at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797) at org.springframework.boot.SpringApplication.run(SpringApplication.java:324) at com.avaya.measures.AgentMeasures.AgentMeasuresApplication.main(AgentMeasuresApplication.java:41) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51)\r\nCaused by: java.lang.IllegalArgumentException: Unsupported class file major version 55 at org.apache.xbean.asm6.ClassReader.(ClassReader.java:166) at org.apache.xbean.asm6.ClassReader.(ClassReader.java:148) at org.apache.xbean.asm6.ClassReader.(ClassReader.java:136) at org.apache.xbean.asm6.ClassReader.(ClassReader.java:237) at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49) at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517) at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500) at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175) at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238) at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631) at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355) at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307) at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.take(RDD.scala:1337) at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735) at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at