Hello!

I'm running into a very strange issue with pretty much no hits on the
internet, and I'm hoping someone here can give me some protips! At this
point, I'm at a loss. This is a little long-winded, but hopefully you'll
indulge me.

Background: I'm currently trying to port some existing spark jobs from
scala to python as part of a greater effort to change owners of our
analytics pipeline, both sets running on AWS EMR. For the most part this
has been going reasonably well, excepting for some cluster tuning problems
that I've since sorted out, but my last job is consistently failing (but
only in prod, not locally with a reduced dataset). Because the jobs are
ported from existing scala code, there are a few odd idioms that I think
could be made smoother with a little bit of work, but I'd rather not rock
the boat too much. I'm using EMR release 5.6.0, which has spark 2.1.0. I've
also ran this against EMR 5.7.0/Spark 2.1.1, with the hope that this might
magically fix my problem, but no dice. The traceback, at any rate, is from
2.1.0. Also potentially relevant is that I have spark configured to use
python 3.4, which is the version of python 3 that ships with EMR.

Anyway, the error I get is "unhashable type: 'dict'" from inside
shuffle.py's ExternalMerger#mergeValues method--full stack trace at:

https://gist.github.com/jfhbrook/5864939b498d7d1adb9856c1697cf2e5

According to the history dashboard, this step is failing at:

    max at
/mnt/yarn/usercache/hadoop/appcache/application_1500043838599_0001/container_1500043838599_0001_02_000001/kinjaspark.zip/kinjaspark/util.py:91

Which in my code looks like this:

    def get_latest_time(rdd):
        return rdd.map(lambda event: event.timestamp).max()

I realized while investigating this that I could rewrite this step to skip
the map and use the key argument with the max call; I have a trial run of
this change ongoing while I write this email. Timestamp, for what it's
worth, is validated in my code as being a python datetime.

Search results for this error are pretty scant
https://encrypted.google.com/search?hl=en&q=spark%20unhashable%20type%20dict%20%2Bshuffle%20mergeValues
and mostly have to do with use of reduceByKey, which I'm not doing at this
stage, so I think something else is going on. It's probably worth noting,
though, that max is a thin wrapper around regular reduce
https://github.com/apache/spark/blob/branch-2.1/python/pyspark/rdd.py#L1004-L1006
.

As a matter of full disclosure, the source data in the rdd variable should
be instances of an attrs-wrapped class
https://pypi.python.org/pypi/attrs/17.2.0 and I can show the source for how
we're loading the events if this is helpful. Another thing potentially
worth noting is that the input RDDs are set to
.persist(StorageLevel.DISK_ONLY) -- this is directly ported from the old
job, and I want to experiment with removing it, but rocking the boat etc. I
also do execute a reduceByKey at a later stage, though by assert its keys
shouldn't involve dicts either (being of the form (str, datetime)).

The data in the traceback is *almost* straightforward: max calls reduce,
which ultimately creates a new PipelinedRDD
https://github.com/apache/spark/blob/branch-2.1/python/pyspark/rdd.py#L335-L358
(I don't know what separates a pipelined RDD from a "regular" RDD), and
somewhere in this process a shuffle is triggered and for whatever reason it
tries to set the key of a dict to another dict
https://github.com/apache/spark/blob/branch-2.1/python/pyspark/shuffle.py#L238
which causes python to explode. Things I don't know: What self.pdata is and
how it's populated, how I can ensure that it doesn't populate with dicts as
keys, and why this step doesn't fail for other jobs that have extremely
similar logic (this job mostly differs in that it later broadcasts a dict
lookup table, but that's well after this step and on the face of it seems
unrelated). They also pull from the same dataset.

This is the last job I have to port over before we can sunset the old jobs
and I'm at my wits' end, so any suggestions are highly appreciated!

Thanks,

--Josh

Reply via email to