Hello, I recently encountered a bug with the results from JavaRDD#countByValue that does not reproduce when running locally. For background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0.
The code in question is something like this: JavaRDD<Item> rdd = // ... > rdd.count(); // 75187 // Get the count broken down by type > rdd.map(Item::getType).countByValue(); Which gives these results from the resulting Map: TypeA: 556 TypeB: 9168 TypeC: 590 TypeD: 205 (total: 10519) These values are incorrect, since every item has a type defined, so the total of all the types should be 75187. When I inspected this stage in the Spark UI, I found that it was using 7 executors. Since the value here is about 1/7th of the actual expected value, I suspect that there is some issue with the way that the executors report their results back to the driver. These results for the same code are correct when I run the job in local mode ("local[4]"), so it may also have something to do with how data is shared across processes. For workarounds, I have also tried: rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey(); > rdd.mapToPair(item -> Tuple2.apply(item.getType(), > 1L)).reduceByKey(Long::sum).collectAsMap(); These yielded the same (incorrect) result. I did find that using Dataset.groupBy().count() did yield the correct results: TypeA: 3996 TypeB: 65490 TypeC: 4224 TypeD: 1477 So, I have an immediate workaround, but it is somewhat awkward since I have to create a Dataframe from a JavaRDD each time. Am I doing something wrong? Do these methods not work the way that I expected them to from reading the documentation? Is this a legitimate bug? I would be happy to provide more details if that would help in debugging this scenario. Thank you for your time, ~Stuart Fehr