Hi all,

Just an update: I ran a variation of the job with the new latest_time code
and it failed again, but I think I was misreading the history dash. This
time, it shows 2 attempts, the second of which failed during the max call
as before, but the *first* of which appears to be failing during the
reduceByKey step:

http://imgur.com/a/ipVMY

so the max call is likely a red herring.

The weird thing is that I definitely wrote a variation that straight called
hash() on the key on generation of the RDD on which reduceByKey() is
called, as an assert/sanity check. I'll try this again, but I'm not keeping
my hopes up...

Thanks,

--Josh

On Tue, Jul 18, 2017 at 3:17 PM, Josh Holbrook <josh.holbr...@fusion.net>
wrote:

> Hello!
>
> I'm running into a very strange issue with pretty much no hits on the
> internet, and I'm hoping someone here can give me some protips! At this
> point, I'm at a loss. This is a little long-winded, but hopefully you'll
> indulge me.
>
> Background: I'm currently trying to port some existing spark jobs from
> scala to python as part of a greater effort to change owners of our
> analytics pipeline, both sets running on AWS EMR. For the most part this
> has been going reasonably well, excepting for some cluster tuning problems
> that I've since sorted out, but my last job is consistently failing (but
> only in prod, not locally with a reduced dataset). Because the jobs are
> ported from existing scala code, there are a few odd idioms that I think
> could be made smoother with a little bit of work, but I'd rather not rock
> the boat too much. I'm using EMR release 5.6.0, which has spark 2.1.0. I've
> also ran this against EMR 5.7.0/Spark 2.1.1, with the hope that this might
> magically fix my problem, but no dice. The traceback, at any rate, is from
> 2.1.0. Also potentially relevant is that I have spark configured to use
> python 3.4, which is the version of python 3 that ships with EMR.
>
> Anyway, the error I get is "unhashable type: 'dict'" from inside
> shuffle.py's ExternalMerger#mergeValues method--full stack trace at:
>
> https://gist.github.com/jfhbrook/5864939b498d7d1adb9856c1697cf2e5
>
> According to the history dashboard, this step is failing at:
>
>     max at /mnt/yarn/usercache/hadoop/appcache/application_
> 1500043838599_0001/container_1500043838599_0001_02_000001/
> kinjaspark.zip/kinjaspark/util.py:91
>
> Which in my code looks like this:
>
>     def get_latest_time(rdd):
>         return rdd.map(lambda event: event.timestamp).max()
>
> I realized while investigating this that I could rewrite this step to skip
> the map and use the key argument with the max call; I have a trial run of
> this change ongoing while I write this email. Timestamp, for what it's
> worth, is validated in my code as being a python datetime.
>
> Search results for this error are pretty scant
> https://encrypted.google.com/search?hl=en&q=spark%
> 20unhashable%20type%20dict%20%2Bshuffle%20mergeValues and mostly have to
> do with use of reduceByKey, which I'm not doing at this stage, so I think
> something else is going on. It's probably worth noting, though, that max is
> a thin wrapper around regular reduce https://github.com/apache/
> spark/blob/branch-2.1/python/pyspark/rdd.py#L1004-L1006 .
>
> As a matter of full disclosure, the source data in the rdd variable should
> be instances of an attrs-wrapped class https://pypi.python.org/pypi/
> attrs/17.2.0 and I can show the source for how we're loading the events
> if this is helpful. Another thing potentially worth noting is that the
> input RDDs are set to .persist(StorageLevel.DISK_ONLY) -- this is
> directly ported from the old job, and I want to experiment with removing
> it, but rocking the boat etc. I also do execute a reduceByKey at a later
> stage, though by assert its keys shouldn't involve dicts either (being of
> the form (str, datetime)).
>
> The data in the traceback is *almost* straightforward: max calls reduce,
> which ultimately creates a new PipelinedRDD https://github.com/apache/
> spark/blob/branch-2.1/python/pyspark/rdd.py#L335-L358 (I don't know what
> separates a pipelined RDD from a "regular" RDD), and somewhere in this
> process a shuffle is triggered and for whatever reason it tries to set the
> key of a dict to another dict https://github.com/apache/
> spark/blob/branch-2.1/python/pyspark/shuffle.py#L238 which causes python
> to explode. Things I don't know: What self.pdata is and how it's populated,
> how I can ensure that it doesn't populate with dicts as keys, and why this
> step doesn't fail for other jobs that have extremely similar logic (this
> job mostly differs in that it later broadcasts a dict lookup table, but
> that's well after this step and on the face of it seems unrelated). They
> also pull from the same dataset.
>
> This is the last job I have to port over before we can sunset the old jobs
> and I'm at my wits' end, so any suggestions are highly appreciated!
>
> Thanks,
>
> --Josh
>
>
>

Reply via email to