Hi, We were getting OOM error when we are accumulating the results of each worker. We were trying to avoid collecting data to driver node instead used accumulator as per below code snippet,
Is there any spark config to set the accumulator settings Or am i doing the wrong way to collect the huge data set? CollectionAccumulator<String> accumulate; Dataset<Row> bin; bin.foreach((ForeachFunction<Row>) row -> { accumulate.add(row.get(0) + "|" + row.get(1) + "|" + row.get(2)); }); accumulate.value().forEach(element -> { String[] arr = element.split("\\|"); String count = arr[2]; double percentage = (total == 0.0) ? 0.0 : (Double.valueOf(count) / total); PayloadBin payload = new PayloadBin(arr[0], arr[1], 0, Long.valueOf(count), percentage, sortBy, sortOrder); binArray.add(payload); }); 18/02/21 17:35:23 INFO storage.BlockManagerInfo: Added taskresult_5050 in memory on rhlhddfrd225.fairisaac.com:41640 (size: 3.7 MB, free: 8.3 GB) 18/02/21 17:35:24 INFO storage.BlockManagerInfo: Removed taskresult_5034 on rhlhddfrd218.fairisaac.com:46584 in memory (size: 3.7 MB, free: 8.4 GB) 18/02/21 17:35:25 INFO scheduler.TaskSetManager: Finished task 59.0 in stage 20.0 (TID 5034) in 9908 ms on rhlhddfrd218.fairisaac.com (executor 92) (14/200) Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3181) at java.util.ArrayList.toArray(ArrayList.java:376) at java.util.Collections$SynchronizedCollection.toArray(Collections.java:2024) at java.util.ArrayList.<init>(ArrayList.java:177) at org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:470)