I specified JavaSerializer in both cases (and attempted to use Kryo in the shell, but failed due to SPARK-6520), and still get the vastly differing performance. Somehow the shell-compiler must impact either serialization or shuffling, but at a level other than the standard REPL API, since Zeppelin can use that to create jobs that perform sufficiently close to a spark-job, that I wouldn't be able to tell the difference (as you would hope to expect).
I use Hive, left-accumulating lists and leftOuterJoins, amid normal .map/.reduceByKey in my code, if that's somehow dealt with differently in the spark-shell. On Mon, Sep 28, 2015 at 8:47 PM, Kartik Mathur <kar...@bluedata.com> wrote: > Ok, that might be possible , to confirm that you can explicitly specify > the serializer in both cases (by setting this spark.serializer i guess). > So then you can be sure that same serializers are used and may be then do > an analysis. > > Best, > Kartik > > On Mon, Sep 28, 2015 at 11:38 AM, Rick Moritz <rah...@gmail.com> wrote: > >> Hi Kartik, >> >> Thanks for the input! >> >> Sadly, that's not it - I'm using YARN - the configuration looks >> identical, and the nodes/memory/cores are deployed identically and exactly >> as specified. >> >> My current hunch, is that for some reason different serializers are used >> in each case, but I can find no documentation on why that could be the >> case, and the configuration isn't indicative of that either. >> Nonetheless, the symptom of different shuffle volume for same shuffle >> number of tuples could well point to that as source of my issue. >> In fact, a colleague pointed out that HIS (Cloudera) installation was >> defaulting to kryo for the spark-shell, which had an impact for some jobs. >> I couldn't find the document he was referring to as a source of this >> information, but the behavior sounds plausible at least. >> >> Best, >> >> Rick >> >> >> On Mon, Sep 28, 2015 at 8:24 PM, Kartik Mathur <kar...@bluedata.com> >> wrote: >> >>> Hey Rick , >>> Not sure on this but similar situation happened with me, when starting >>> spark-shell it was starting a new cluster instead of using the existing >>> cluster and this new cluster was a single node cluster , that's why jobs >>> were taking forever to complete from spark-shell and were running much >>> faster using submit (which reads conf correctly) or zeppelin for that >>> matter. >>> >>> Thanks, >>> Kartik >>> >>> On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz <rah...@gmail.com> wrote: >>> >>>> I've finally been able to pick this up again, after upgrading to Spark >>>> 1.4.1, because my code used the HiveContext, which runs fine in the REPL >>>> (be it via Zeppelin or the shell) but won't work with spark-submit. >>>> With 1.4.1, I hav actually managed to get a result with the Spark >>>> shell, but after >>>> 3847,802237 seconds and in particular the last stage took 1320,672 >>>> seconds. >>>> This was after I used coalesce to balance the workload initiall, since >>>> a Hive filter I applied normally would make for a skewed distribution of >>>> the data onto the nodes. >>>> Nonetheless, the same code (even withouth the coalesce) would work much >>>> faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit >>>> job, the run time was just a tenth at >>>> 446,657534 seconds for the entire job and notably 38,961 seconds for >>>> the final stage. >>>> >>>> Again, there is a huge difference in the amount of data that gets >>>> shuffled/spilled (which leads to much earlier OOM-conditions), when using >>>> spark-shell. >>>> What could be the reason for this different behaviour using very >>>> similar configurations and identical data, machines and code (identical >>>> DAGs and sources) and identical spark binaries? Why would code launched >>>> from spark-shell generate more shuffled data for the same number of >>>> shuffled tuples? >>>> >>>> An analysis would be much appreciated. >>>> >>>> Best, >>>> >>>> Rick >>>> >>>> On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz <rah...@gmail.com> wrote: >>>> >>>>> oops, forgot to reply-all on this thread. >>>>> >>>>> ---------- Forwarded message ---------- >>>>> From: Rick Moritz <rah...@gmail.com> >>>>> Date: Wed, Aug 19, 2015 at 2:46 PM >>>>> Subject: Re: Strange shuffle behaviour difference between Zeppelin and >>>>> Spark-shell >>>>> To: Igor Berman <igor.ber...@gmail.com> >>>>> >>>>> >>>>> Those values are not explicitely set, and attempting to read their >>>>> values results in 'java.util.NoSuchElementException: >>>>> spark.shuffle.spill.compress'. >>>>> What I mean by the volume per element being larger is illustrated in >>>>> my original post: for each case the number of elements is identical, but >>>>> the volume of data required to obtain/manage these elements is many times >>>>> greater. >>>>> >>>>> The only difference used to be that Zeppelin had FAIR scheduling over >>>>> FIFO scheduling for spark-shell. I just verified that spark-shell with >>>>> FAIR >>>>> scheduling makes no difference. The only other difference in the >>>>> environment lies in some class-path variables which should only affect >>>>> method availability, not actual usage. >>>>> >>>>> Another fact to note: Spark assembly (1.4.0-rc4) was built with >>>>> provided hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 >>>>> -Dhadoop.version=2.6.0 -Phadoop-provided -Phive -Phive-thriftserver >>>>> -Psparkr -DskipTests clean package) for 2.6.0 from Hortonworks, while >>>>> Zeppelin was built with dependencies against 2.6.0 from Maven central. >>>>> >>>>> On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman <igor.ber...@gmail.com> >>>>> wrote: >>>>> >>>>>> so what your case for version differences? >>>>>> what do u mean by "in spark-shell the volume per element is much >>>>>> larger" >>>>>> can you verify that configuration in spark ui (under Environment tab >>>>>> is same). >>>>>> if you suspect compression than check following properties: >>>>>> spark.shuffle.compress >>>>>> spark.shuffle.spill.compress >>>>>> spark.io.compression.codec >>>>>> spark.rdd.compress >>>>>> >>>>>> >>>>>> >>>>>> On 19 August 2015 at 15:03, Rick Moritz <rah...@gmail.com> wrote: >>>>>> >>>>>>> Number of partitions and even size look relatively similar - except >>>>>>> in spark-shell the volume per element is much larger, especially in >>>>>>> later >>>>>>> stages. That's when shuffles start to spill. Zeppelin creates almost no >>>>>>> spills at all. The number of elements per partition are the same for >>>>>>> both >>>>>>> setups, but with very different data volume in/out. Almost as though >>>>>>> compression was used in one case, and not in another, or as though >>>>>>> shuffling is somehow less specific, and more nodes get data that they >>>>>>> ultimately don't process at all. The same shuffling algorithm appears >>>>>>> to be >>>>>>> at work in each case, if the partitioning of the number of elements is >>>>>>> anything to go by. >>>>>>> >>>>>>> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman <igor.ber...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> i would compare spark ui metrics for both cases and see any >>>>>>>> differences(number of partitions, number of spills etc) >>>>>>>> why can't you make repl to be consistent with zepellin spark >>>>>>>> version? might be rc has issues... >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 19 August 2015 at 14:42, Rick Moritz <rah...@gmail.com> wrote: >>>>>>>> >>>>>>>>> No, the setup is one driver with 32g of memory, and three >>>>>>>>> executors each with 8g of memory in both cases. No core-number has >>>>>>>>> been >>>>>>>>> specified, thus it should default to single-core (though I've seen the >>>>>>>>> yarn-owned jvms wrapping the executors take up to 3 cores in top). >>>>>>>>> That is, >>>>>>>>> unless, as I suggested, there are different defaults for the two >>>>>>>>> means of >>>>>>>>> job submission that come into play in a non-transparent fashion (i.e. >>>>>>>>> not >>>>>>>>> visible in SparkConf). >>>>>>>>> >>>>>>>>> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman < >>>>>>>>> igor.ber...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> any differences in number of cores, memory settings for executors? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 19 August 2015 at 09:49, Rick Moritz <rah...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Dear list, >>>>>>>>>>> >>>>>>>>>>> I am observing a very strange difference in behaviour between a >>>>>>>>>>> Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark >>>>>>>>>>> 1.4.0 >>>>>>>>>>> zeppelin interpreter (compiled with Java 6 and sourced from maven >>>>>>>>>>> central). >>>>>>>>>>> >>>>>>>>>>> The workflow loads data from Hive, applies a number of >>>>>>>>>>> transformations (including quite a lot of shuffle operations) and >>>>>>>>>>> then >>>>>>>>>>> presents an enriched dataset. The code (an resulting DAGs) are >>>>>>>>>>> identical in >>>>>>>>>>> each case. >>>>>>>>>>> >>>>>>>>>>> The following particularities are noted: >>>>>>>>>>> Importing the HiveRDD and caching it yields identical results on >>>>>>>>>>> both platforms. >>>>>>>>>>> Applying case classes, leads to a 2-2.5MB increase in dataset >>>>>>>>>>> size per partition (excepting empty partitions). >>>>>>>>>>> >>>>>>>>>>> Writing shuffles shows this much more significant result: >>>>>>>>>>> >>>>>>>>>>> Zeppelin: >>>>>>>>>>> *Total Time Across All Tasks: * 2,6 min >>>>>>>>>>> *Input Size / Records: * 2.4 GB / 7314771 >>>>>>>>>>> *Shuffle Write: * 673.5 MB / 7314771 >>>>>>>>>>> >>>>>>>>>>> vs >>>>>>>>>>> >>>>>>>>>>> Spark-shell: >>>>>>>>>>> *Total Time Across All Tasks: * 28 min >>>>>>>>>>> *Input Size / Records: * 3.6 GB / 7314771 >>>>>>>>>>> *Shuffle Write: * 9.0 GB / 7314771 >>>>>>>>>>> >>>>>>>>>>> This is one of the early stages, which reads from a cached >>>>>>>>>>> partition and then feeds into a join-stage. The latter stages show >>>>>>>>>>> similar >>>>>>>>>>> behaviour in producing excessive shuffle spills. >>>>>>>>>>> >>>>>>>>>>> Quite often the excessive shuffle volume will lead to massive >>>>>>>>>>> shuffle spills which ultimately kill not only performance, but the >>>>>>>>>>> actual >>>>>>>>>>> executors as well. >>>>>>>>>>> >>>>>>>>>>> I have examined the Environment tab in the SParkUI and >>>>>>>>>>> identified no notable difference besides FAIR (Zeppelin) vs FIFO >>>>>>>>>>> (spark-shell) scheduling mode. I fail to see how this would impact >>>>>>>>>>> shuffle >>>>>>>>>>> writes in such a drastic way, since it should be on the inter-job >>>>>>>>>>> level, >>>>>>>>>>> while this happens at the inter-stage level. >>>>>>>>>>> >>>>>>>>>>> I was somewhat supicious of maybe compression or serialization >>>>>>>>>>> playing a role, but the SparkConf points to those being set to the >>>>>>>>>>> default. >>>>>>>>>>> Also Zeppelin's interpreter adds no relevant additional default >>>>>>>>>>> parameters. >>>>>>>>>>> I performed a diff between rc4 (which was later released) and >>>>>>>>>>> 1.4.0 and as expected there were no differences, besides a single >>>>>>>>>>> class >>>>>>>>>>> (remarkably, a shuffle-relevant class: >>>>>>>>>>> /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class ) >>>>>>>>>>> differing in its binary representation due to being compiled with >>>>>>>>>>> Java 7 >>>>>>>>>>> instead of Java 6. The decompiled sources of those two are again >>>>>>>>>>> identical. >>>>>>>>>>> >>>>>>>>>>> I may attempt as a next step to simply replace that file in the >>>>>>>>>>> packaged jar, to ascertain that indeed there is no difference >>>>>>>>>>> between the >>>>>>>>>>> two versions, but would consider this to be a major bg, if a simple >>>>>>>>>>> compiler change leads to this kind of issue. >>>>>>>>>>> >>>>>>>>>>> I a also open for any other ideas, in particular to verify that >>>>>>>>>>> the same compression/serialization is indeed happening, and >>>>>>>>>>> regarding ways >>>>>>>>>>> to determin what exactly is written into these shuffles -- >>>>>>>>>>> currently I only >>>>>>>>>>> know that the tuples are bigger (or smaller) than they ought to be. >>>>>>>>>>> The >>>>>>>>>>> Zeppelin-obtained results do appear to be consistent at least, thus >>>>>>>>>>> the >>>>>>>>>>> suspicion is, that there is an issue with the process launched from >>>>>>>>>>> spark-shell. I will also attempt to build a spark job and >>>>>>>>>>> spark-submit it >>>>>>>>>>> using different spark-binaries to further explore the issue. >>>>>>>>>>> >>>>>>>>>>> Best Regards, >>>>>>>>>>> >>>>>>>>>>> Rick Moritz >>>>>>>>>>> >>>>>>>>>>> PS: I already tried to send this mail yesterday, but it never >>>>>>>>>>> made it onto the list, as far as I can tell -- I apologize should >>>>>>>>>>> anyone >>>>>>>>>>> receive this as a second copy. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >