I specified JavaSerializer in both cases (and attempted to use Kryo in the
shell, but failed due to SPARK-6520), and still get the vastly differing
Somehow the shell-compiler must impact either serialization or shuffling,
but at a level other than the standard REPL API, since Zeppelin can use
that to create jobs that perform sufficiently close to a spark-job, that I
wouldn't be able to tell the difference (as you would hope to expect).

I use Hive, left-accumulating lists and leftOuterJoins, amid normal
.map/.reduceByKey in my code, if that's somehow dealt with differently in
the spark-shell.

On Mon, Sep 28, 2015 at 8:47 PM, Kartik Mathur <kar...@bluedata.com> wrote:

> Ok, that might be possible , to confirm that you can explicitly specify
> the serializer in both cases (by setting this spark.serializer i guess).
> So then you can be sure that same serializers are used and may be then do
> an analysis.
> Best,
> Kartik
> On Mon, Sep 28, 2015 at 11:38 AM, Rick Moritz <rah...@gmail.com> wrote:
>> Hi Kartik,
>> Thanks for the input!
>> Sadly, that's not it - I'm using YARN - the configuration looks
>> identical, and the nodes/memory/cores are deployed identically and exactly
>> as specified.
>> My current hunch, is that for some reason different serializers are used
>> in each case, but I can find no documentation on why that could be the
>> case, and the configuration isn't indicative of that either.
>> Nonetheless, the symptom of different shuffle volume for same shuffle
>> number of tuples could well point to that as source of my issue.
>> In fact, a colleague pointed out that HIS (Cloudera) installation was
>> defaulting to kryo for the spark-shell, which had an impact for some jobs.
>> I couldn't find the document he was referring to as a source of this
>> information, but the behavior sounds plausible at least.
>> Best,
>> Rick
>> On Mon, Sep 28, 2015 at 8:24 PM, Kartik Mathur <kar...@bluedata.com>
>> wrote:
>>> Hey Rick ,
>>> Not sure on this but similar situation happened with me, when starting
>>> spark-shell it was starting a new cluster instead of using the existing
>>> cluster and this new cluster was a single node cluster , that's why jobs
>>> were taking forever to complete from spark-shell and were running much
>>> faster using submit (which reads conf correctly) or zeppelin for that
>>> matter.
>>> Thanks,
>>> Kartik
>>> On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz <rah...@gmail.com> wrote:
>>>> I've finally been able to pick this up again, after upgrading to Spark
>>>> 1.4.1, because my code used the HiveContext, which runs fine in the REPL
>>>> (be it via Zeppelin or the shell) but won't work with spark-submit.
>>>> With 1.4.1, I hav actually managed to get a result with the Spark
>>>> shell, but after
>>>> 3847,802237 seconds and in particular the last stage took 1320,672
>>>> seconds.
>>>> This was after I used coalesce to balance the workload initiall, since
>>>> a Hive filter I applied normally would make for a skewed distribution of
>>>> the data onto the nodes.
>>>> Nonetheless, the same code (even withouth the coalesce) would work much
>>>> faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit
>>>> job, the run time was just a tenth at
>>>> 446,657534 seconds for the entire job and notably 38,961 seconds for
>>>> the final stage.
>>>> Again, there is a huge difference in the amount of data that gets
>>>> shuffled/spilled (which leads to much earlier OOM-conditions), when using
>>>> spark-shell.
>>>> What could be the reason for this different behaviour using very
>>>> similar configurations and identical data, machines and code (identical
>>>> DAGs and sources) and identical spark binaries? Why would code launched
>>>> from spark-shell generate more shuffled data for the same number of
>>>> shuffled tuples?
>>>> An analysis would be much appreciated.
>>>> Best,
>>>> Rick
>>>> On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz <rah...@gmail.com> wrote:
>>>>> oops, forgot to reply-all on this thread.
>>>>> ---------- Forwarded message ----------
>>>>> From: Rick Moritz <rah...@gmail.com>
>>>>> Date: Wed, Aug 19, 2015 at 2:46 PM
>>>>> Subject: Re: Strange shuffle behaviour difference between Zeppelin and
>>>>> Spark-shell
>>>>> To: Igor Berman <igor.ber...@gmail.com>
>>>>> Those values are not explicitely set, and attempting to read their
>>>>> values results in 'java.util.NoSuchElementException:
>>>>> spark.shuffle.spill.compress'.
>>>>> What I mean by the volume per element being larger is illustrated in
>>>>> my original post: for each case the number of elements is identical, but
>>>>> the volume of data required to obtain/manage these elements is many times
>>>>> greater.
>>>>> The only difference used to be that Zeppelin had FAIR scheduling over
>>>>> FIFO scheduling for spark-shell. I just verified that spark-shell with 
>>>>> FAIR
>>>>> scheduling makes no difference. The only other difference in the
>>>>> environment lies in some class-path variables which should only affect
>>>>> method availability, not actual usage.
>>>>> Another fact to note: Spark assembly (1.4.0-rc4) was built with
>>>>> provided hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6
>>>>> -Dhadoop.version=2.6.0 -Phadoop-provided -Phive -Phive-thriftserver
>>>>> -Psparkr -DskipTests clean package) for 2.6.0 from Hortonworks, while
>>>>> Zeppelin was built with dependencies against 2.6.0 from Maven central.
>>>>> On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman <igor.ber...@gmail.com>
>>>>> wrote:
>>>>>> so what your case for version differences?
>>>>>> what do u mean by  "in spark-shell the volume per element is much
>>>>>> larger"
>>>>>> can you verify that configuration in spark ui (under Environment tab
>>>>>> is same).
>>>>>> if you suspect compression than check following properties:
>>>>>> spark.shuffle.compress
>>>>>> spark.shuffle.spill.compress
>>>>>> spark.io.compression.codec
>>>>>> spark.rdd.compress
>>>>>> On 19 August 2015 at 15:03, Rick Moritz <rah...@gmail.com> wrote:
>>>>>>> Number of partitions and even size look relatively similar - except
>>>>>>> in spark-shell the volume per element is much larger, especially in 
>>>>>>> later
>>>>>>> stages. That's when shuffles start to spill. Zeppelin creates almost no
>>>>>>> spills at all. The number of elements per partition are the same for 
>>>>>>> both
>>>>>>> setups, but with very different data volume in/out. Almost as though
>>>>>>> compression was used in one case, and not in another, or as though
>>>>>>> shuffling is somehow less specific, and more nodes get data that they
>>>>>>> ultimately don't process at all. The same shuffling algorithm appears 
>>>>>>> to be
>>>>>>> at work in each case, if the partitioning of the number of elements is
>>>>>>> anything to go by.
>>>>>>> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman <igor.ber...@gmail.com>
>>>>>>> wrote:
>>>>>>>> i would compare spark ui metrics for both cases and see any
>>>>>>>> differences(number of partitions, number of spills etc)
>>>>>>>> why can't you make repl to be consistent with zepellin spark
>>>>>>>> version?  might be rc has issues...
>>>>>>>> On 19 August 2015 at 14:42, Rick Moritz <rah...@gmail.com> wrote:
>>>>>>>>> No, the setup is one driver with 32g of memory, and three
>>>>>>>>> executors each with 8g of memory in both cases. No core-number has 
>>>>>>>>> been
>>>>>>>>> specified, thus it should default to single-core (though I've seen the
>>>>>>>>> yarn-owned jvms wrapping the executors take up to 3 cores in top). 
>>>>>>>>> That is,
>>>>>>>>> unless, as I suggested, there are different defaults for the two 
>>>>>>>>> means of
>>>>>>>>> job submission that come into play in a non-transparent fashion (i.e. 
>>>>>>>>> not
>>>>>>>>> visible in SparkConf).
>>>>>>>>> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman <
>>>>>>>>> igor.ber...@gmail.com> wrote:
>>>>>>>>>> any differences in number of cores, memory settings for executors?
>>>>>>>>>> On 19 August 2015 at 09:49, Rick Moritz <rah...@gmail.com> wrote:
>>>>>>>>>>> Dear list,
>>>>>>>>>>> I am observing a very strange difference in behaviour between a
>>>>>>>>>>> Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 
>>>>>>>>>>> 1.4.0
>>>>>>>>>>> zeppelin interpreter (compiled with Java 6 and sourced from maven 
>>>>>>>>>>> central).
>>>>>>>>>>> The workflow loads data from Hive, applies a number of
>>>>>>>>>>> transformations (including quite a lot of shuffle operations) and 
>>>>>>>>>>> then
>>>>>>>>>>> presents an enriched dataset. The code (an resulting DAGs) are 
>>>>>>>>>>> identical in
>>>>>>>>>>> each case.
>>>>>>>>>>> The following particularities are noted:
>>>>>>>>>>> Importing the HiveRDD and caching it yields identical results on
>>>>>>>>>>> both platforms.
>>>>>>>>>>> Applying case classes, leads to a 2-2.5MB increase in dataset
>>>>>>>>>>> size per partition (excepting empty partitions).
>>>>>>>>>>> Writing shuffles shows this much more significant result:
>>>>>>>>>>> Zeppelin:
>>>>>>>>>>> *Total Time Across All Tasks: * 2,6 min
>>>>>>>>>>> *Input Size / Records: * 2.4 GB / 7314771
>>>>>>>>>>> *Shuffle Write: * 673.5 MB / 7314771
>>>>>>>>>>> vs
>>>>>>>>>>> Spark-shell:
>>>>>>>>>>> *Total Time Across All Tasks: * 28 min
>>>>>>>>>>> *Input Size / Records: * 3.6 GB / 7314771
>>>>>>>>>>> *Shuffle Write: * 9.0 GB / 7314771
>>>>>>>>>>> This is one of the early stages, which reads from a cached
>>>>>>>>>>> partition and then feeds into a join-stage. The latter stages show 
>>>>>>>>>>> similar
>>>>>>>>>>> behaviour in producing excessive shuffle spills.
>>>>>>>>>>> Quite often the excessive shuffle volume will lead to massive
>>>>>>>>>>> shuffle spills which ultimately kill not only performance, but the 
>>>>>>>>>>> actual
>>>>>>>>>>> executors as well.
>>>>>>>>>>> I have examined the Environment tab in the SParkUI and
>>>>>>>>>>> identified no notable difference besides FAIR (Zeppelin) vs FIFO
>>>>>>>>>>> (spark-shell) scheduling mode. I fail to see how this would impact 
>>>>>>>>>>> shuffle
>>>>>>>>>>> writes in such a drastic way, since it should be on the inter-job 
>>>>>>>>>>> level,
>>>>>>>>>>> while this happens at the inter-stage level.
>>>>>>>>>>> I was somewhat supicious of maybe compression or serialization
>>>>>>>>>>> playing a role, but the SparkConf points to those being set to the 
>>>>>>>>>>> default.
>>>>>>>>>>> Also Zeppelin's interpreter adds no relevant additional default 
>>>>>>>>>>> parameters.
>>>>>>>>>>> I performed a diff between rc4 (which was later released) and
>>>>>>>>>>> 1.4.0 and as expected there were no differences, besides a single 
>>>>>>>>>>> class
>>>>>>>>>>> (remarkably, a shuffle-relevant class:
>>>>>>>>>>> /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
>>>>>>>>>>> differing in its binary representation due to being compiled with 
>>>>>>>>>>> Java 7
>>>>>>>>>>> instead of Java 6. The decompiled sources of those two are again 
>>>>>>>>>>> identical.
>>>>>>>>>>> I may attempt as a next step to simply replace that file in the
>>>>>>>>>>> packaged jar, to ascertain that indeed there is no difference 
>>>>>>>>>>> between the
>>>>>>>>>>> two versions, but would consider this to be a major bg, if a simple
>>>>>>>>>>> compiler change leads to this kind of issue.
>>>>>>>>>>> I a also open for any other ideas, in particular to verify that
>>>>>>>>>>> the same compression/serialization is indeed happening, and 
>>>>>>>>>>> regarding ways
>>>>>>>>>>> to determin what exactly is written into these shuffles -- 
>>>>>>>>>>> currently I only
>>>>>>>>>>> know that the tuples are bigger (or smaller) than they ought to be. 
>>>>>>>>>>> The
>>>>>>>>>>> Zeppelin-obtained results do appear to be consistent at least, thus 
>>>>>>>>>>> the
>>>>>>>>>>> suspicion is, that there is an issue with the process launched from
>>>>>>>>>>> spark-shell. I will also attempt to build a spark job and 
>>>>>>>>>>> spark-submit it
>>>>>>>>>>> using different spark-binaries to further explore the issue.
>>>>>>>>>>> Best Regards,
>>>>>>>>>>> Rick Moritz
>>>>>>>>>>> PS: I already tried to send this mail yesterday, but it never
>>>>>>>>>>> made it onto the list, as far as I can tell -- I apologize should 
>>>>>>>>>>> anyone
>>>>>>>>>>> receive this as a second copy.

Reply via email to