Hello, I recently encountered a bug with the results from
JavaRDD#countByValue that does not reproduce when running locally. For
background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0.

The code in question is something like this:

JavaRDD<Item> rdd = // ...
> rdd.count();  // 75187



// Get the count broken down by type
> rdd.map(Item::getType).countByValue();


Which gives these results from the resulting Map:

TypeA: 556
TypeB: 9168
TypeC: 590
TypeD: 205
(total: 10519)

These values are incorrect, since every item has a type defined, so the
total of all the types should be 75187. When I inspected this stage in the
Spark UI, I found that it was using 7 executors. Since the value here is
about 1/7th of the actual expected value, I suspect that there is some
issue with the way that the executors report their results back to the
driver. These results for the same code are correct when I run the job in
local mode ("local[4]"), so it may also have something to do with how data
is shared across processes.

For workarounds, I have also tried:

rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey();
> rdd.mapToPair(item -> Tuple2.apply(item.getType(),
> 1L)).reduceByKey(Long::sum).collectAsMap();


These yielded the same (incorrect) result.

I did find that using Dataset.groupBy().count() did yield the correct
results:

TypeA: 3996
TypeB: 65490
TypeC: 4224
TypeD: 1477

So, I have an immediate workaround, but it is somewhat awkward since I have
to create a Dataframe from a JavaRDD each time.

Am I doing something wrong? Do these methods not work the way that I
expected them to from reading the documentation? Is this a legitimate bug?

I would be happy to provide more details if that would help in debugging
this scenario.

Thank you for your time,
~Stuart Fehr

Reply via email to