Hello! I'm running into a very strange issue with pretty much no hits on the internet, and I'm hoping someone here can give me some protips! At this point, I'm at a loss. This is a little long-winded, but hopefully you'll indulge me.
Background: I'm currently trying to port some existing spark jobs from scala to python as part of a greater effort to change owners of our analytics pipeline, both sets running on AWS EMR. For the most part this has been going reasonably well, excepting for some cluster tuning problems that I've since sorted out, but my last job is consistently failing (but only in prod, not locally with a reduced dataset). Because the jobs are ported from existing scala code, there are a few odd idioms that I think could be made smoother with a little bit of work, but I'd rather not rock the boat too much. I'm using EMR release 5.6.0, which has spark 2.1.0. I've also ran this against EMR 5.7.0/Spark 2.1.1, with the hope that this might magically fix my problem, but no dice. The traceback, at any rate, is from 2.1.0. Also potentially relevant is that I have spark configured to use python 3.4, which is the version of python 3 that ships with EMR. Anyway, the error I get is "unhashable type: 'dict'" from inside shuffle.py's ExternalMerger#mergeValues method--full stack trace at: https://gist.github.com/jfhbrook/5864939b498d7d1adb9856c1697cf2e5 According to the history dashboard, this step is failing at: max at /mnt/yarn/usercache/hadoop/appcache/application_1500043838599_0001/container_1500043838599_0001_02_000001/kinjaspark.zip/kinjaspark/util.py:91 Which in my code looks like this: def get_latest_time(rdd): return rdd.map(lambda event: event.timestamp).max() I realized while investigating this that I could rewrite this step to skip the map and use the key argument with the max call; I have a trial run of this change ongoing while I write this email. Timestamp, for what it's worth, is validated in my code as being a python datetime. Search results for this error are pretty scant https://encrypted.google.com/search?hl=en&q=spark%20unhashable%20type%20dict%20%2Bshuffle%20mergeValues and mostly have to do with use of reduceByKey, which I'm not doing at this stage, so I think something else is going on. It's probably worth noting, though, that max is a thin wrapper around regular reduce https://github.com/apache/spark/blob/branch-2.1/python/pyspark/rdd.py#L1004-L1006 . As a matter of full disclosure, the source data in the rdd variable should be instances of an attrs-wrapped class https://pypi.python.org/pypi/attrs/17.2.0 and I can show the source for how we're loading the events if this is helpful. Another thing potentially worth noting is that the input RDDs are set to .persist(StorageLevel.DISK_ONLY) -- this is directly ported from the old job, and I want to experiment with removing it, but rocking the boat etc. I also do execute a reduceByKey at a later stage, though by assert its keys shouldn't involve dicts either (being of the form (str, datetime)). The data in the traceback is *almost* straightforward: max calls reduce, which ultimately creates a new PipelinedRDD https://github.com/apache/spark/blob/branch-2.1/python/pyspark/rdd.py#L335-L358 (I don't know what separates a pipelined RDD from a "regular" RDD), and somewhere in this process a shuffle is triggered and for whatever reason it tries to set the key of a dict to another dict https://github.com/apache/spark/blob/branch-2.1/python/pyspark/shuffle.py#L238 which causes python to explode. Things I don't know: What self.pdata is and how it's populated, how I can ensure that it doesn't populate with dicts as keys, and why this step doesn't fail for other jobs that have extremely similar logic (this job mostly differs in that it later broadcasts a dict lookup table, but that's well after this step and on the face of it seems unrelated). They also pull from the same dataset. This is the last job I have to port over before we can sunset the old jobs and I'm at my wits' end, so any suggestions are highly appreciated! Thanks, --Josh