Hi all, Just an update: I ran a variation of the job with the new latest_time code and it failed again, but I think I was misreading the history dash. This time, it shows 2 attempts, the second of which failed during the max call as before, but the *first* of which appears to be failing during the reduceByKey step:
http://imgur.com/a/ipVMY so the max call is likely a red herring. The weird thing is that I definitely wrote a variation that straight called hash() on the key on generation of the RDD on which reduceByKey() is called, as an assert/sanity check. I'll try this again, but I'm not keeping my hopes up... Thanks, --Josh On Tue, Jul 18, 2017 at 3:17 PM, Josh Holbrook <josh.holbr...@fusion.net> wrote: > Hello! > > I'm running into a very strange issue with pretty much no hits on the > internet, and I'm hoping someone here can give me some protips! At this > point, I'm at a loss. This is a little long-winded, but hopefully you'll > indulge me. > > Background: I'm currently trying to port some existing spark jobs from > scala to python as part of a greater effort to change owners of our > analytics pipeline, both sets running on AWS EMR. For the most part this > has been going reasonably well, excepting for some cluster tuning problems > that I've since sorted out, but my last job is consistently failing (but > only in prod, not locally with a reduced dataset). Because the jobs are > ported from existing scala code, there are a few odd idioms that I think > could be made smoother with a little bit of work, but I'd rather not rock > the boat too much. I'm using EMR release 5.6.0, which has spark 2.1.0. I've > also ran this against EMR 5.7.0/Spark 2.1.1, with the hope that this might > magically fix my problem, but no dice. The traceback, at any rate, is from > 2.1.0. Also potentially relevant is that I have spark configured to use > python 3.4, which is the version of python 3 that ships with EMR. > > Anyway, the error I get is "unhashable type: 'dict'" from inside > shuffle.py's ExternalMerger#mergeValues method--full stack trace at: > > https://gist.github.com/jfhbrook/5864939b498d7d1adb9856c1697cf2e5 > > According to the history dashboard, this step is failing at: > > max at /mnt/yarn/usercache/hadoop/appcache/application_ > 1500043838599_0001/container_1500043838599_0001_02_000001/ > kinjaspark.zip/kinjaspark/util.py:91 > > Which in my code looks like this: > > def get_latest_time(rdd): > return rdd.map(lambda event: event.timestamp).max() > > I realized while investigating this that I could rewrite this step to skip > the map and use the key argument with the max call; I have a trial run of > this change ongoing while I write this email. Timestamp, for what it's > worth, is validated in my code as being a python datetime. > > Search results for this error are pretty scant > https://encrypted.google.com/search?hl=en&q=spark% > 20unhashable%20type%20dict%20%2Bshuffle%20mergeValues and mostly have to > do with use of reduceByKey, which I'm not doing at this stage, so I think > something else is going on. It's probably worth noting, though, that max is > a thin wrapper around regular reduce https://github.com/apache/ > spark/blob/branch-2.1/python/pyspark/rdd.py#L1004-L1006 . > > As a matter of full disclosure, the source data in the rdd variable should > be instances of an attrs-wrapped class https://pypi.python.org/pypi/ > attrs/17.2.0 and I can show the source for how we're loading the events > if this is helpful. Another thing potentially worth noting is that the > input RDDs are set to .persist(StorageLevel.DISK_ONLY) -- this is > directly ported from the old job, and I want to experiment with removing > it, but rocking the boat etc. I also do execute a reduceByKey at a later > stage, though by assert its keys shouldn't involve dicts either (being of > the form (str, datetime)). > > The data in the traceback is *almost* straightforward: max calls reduce, > which ultimately creates a new PipelinedRDD https://github.com/apache/ > spark/blob/branch-2.1/python/pyspark/rdd.py#L335-L358 (I don't know what > separates a pipelined RDD from a "regular" RDD), and somewhere in this > process a shuffle is triggered and for whatever reason it tries to set the > key of a dict to another dict https://github.com/apache/ > spark/blob/branch-2.1/python/pyspark/shuffle.py#L238 which causes python > to explode. Things I don't know: What self.pdata is and how it's populated, > how I can ensure that it doesn't populate with dicts as keys, and why this > step doesn't fail for other jobs that have extremely similar logic (this > job mostly differs in that it later broadcasts a dict lookup table, but > that's well after this step and on the face of it seems unrelated). They > also pull from the same dataset. > > This is the last job I have to port over before we can sunset the old jobs > and I'm at my wits' end, so any suggestions are highly appreciated! > > Thanks, > > --Josh > > >