Hello, I recently encountered a bug with the results from
JavaRDD#countByValue that does not reproduce when running locally. For
background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0.
The code in question is something like this:
JavaRDD<Item> rdd = // ...
> rdd.count(); // 75187
// Get the count broken down by type
> rdd.map(Item::getType).countByValue();
Which gives these results from the resulting Map:
TypeA: 556
TypeB: 9168
TypeC: 590
TypeD: 205
(total: 10519)
These values are incorrect, since every item has a type defined, so the
total of all the types should be 75187. When I inspected this stage in the
Spark UI, I found that it was using 7 executors. Since the value here is
about 1/7th of the actual expected value, I suspect that there is some
issue with the way that the executors report their results back to the
driver. These results for the same code are correct when I run the job in
local mode ("local[4]"), so it may also have something to do with how data
is shared across processes.
For workarounds, I have also tried:
rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey();
> rdd.mapToPair(item -> Tuple2.apply(item.getType(),
> 1L)).reduceByKey(Long::sum).collectAsMap();
These yielded the same (incorrect) result.
I did find that using Dataset.groupBy().count() did yield the correct
results:
TypeA: 3996
TypeB: 65490
TypeC: 4224
TypeD: 1477
So, I have an immediate workaround, but it is somewhat awkward since I have
to create a Dataframe from a JavaRDD each time.
Am I doing something wrong? Do these methods not work the way that I
expected them to from reading the documentation? Is this a legitimate bug?
I would be happy to provide more details if that would help in debugging
this scenario.
Thank you for your time,
~Stuart Fehr