Hi,

We were getting OOM error when we are accumulating the results of each
worker. We were trying to avoid collecting data to driver node instead used
accumulator as per below code snippet,

Is there any spark config to set the accumulator settings Or am i doing the
wrong way to collect the huge data set?

  CollectionAccumulator<String> accumulate;
  Dataset<Row> bin;

    bin.foreach((ForeachFunction<Row>) row -> {
      accumulate.add(row.get(0) + "|" + row.get(1) + "|" + row.get(2));
    });

    accumulate.value().forEach(element -> {
      String[] arr = element.split("\\|");
      String count = arr[2];
      double percentage =
          (total == 0.0) ? 0.0 : (Double.valueOf(count) / total);
      PayloadBin payload = new PayloadBin(arr[0],
          arr[1], 0, Long.valueOf(count), percentage, sortBy, sortOrder);
      binArray.add(payload);

    });


18/02/21 17:35:23 INFO storage.BlockManagerInfo: Added taskresult_5050 in
memory on rhlhddfrd225.fairisaac.com:41640 (size: 3.7 MB, free: 8.3 GB)

18/02/21 17:35:24 INFO storage.BlockManagerInfo: Removed taskresult_5034 on
rhlhddfrd218.fairisaac.com:46584 in memory (size: 3.7 MB, free: 8.4 GB)

18/02/21 17:35:25 INFO scheduler.TaskSetManager: Finished task 59.0 in
stage 20.0 (TID 5034) in 9908 ms on rhlhddfrd218.fairisaac.com (executor
92) (14/200)

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError:
Java heap space

        at java.util.Arrays.copyOf(Arrays.java:3181)

        at java.util.ArrayList.toArray(ArrayList.java:376)

        at
java.util.Collections$SynchronizedCollection.toArray(Collections.java:2024)

        at java.util.ArrayList.<init>(ArrayList.java:177)

        at
org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:470)

Reply via email to