Hi
 If it only happens when u run 2 app at same time could it be that these 2
apps somehow run on same host?
Kr

On 5 Jan 2017 9:00 am, "Palash Gupta" <spline_pal...@yahoo.com> wrote:

> Hi Marco and respected member,
>
> I have done all the possible things suggested by Forum but still I'm
> having same issue:
>
> 1. I will migrate my applications to production environment where I will
> have more resources
> Palash>> I migrated my application in production where I have more CPU
> Cores, Memory & total 7 host in spark cluster.
> 2. Use Spark 2.0.0 function to load CSV rather using databrics api
> Palash>> Earlier I'm using databricks csv api with Spark 2.0.0. As
> suggested by one of the mate, Now I'm using spark 2.0.0 built in csv loader.
> 3. In production I will run multiple spark application at a time and try
> to reproduce this error for both file system and HDFS loading cas
> Palash>> yes I reproduced and it only happen when two spark application
> run at a time. Please see the logs:
>
> 17/01/05 01:50:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 0.0 (TID 0, 10.15.187.79): java.io.IOException: org.apache.spa
> rk.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260)
>         at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(
> TorrentBroadcast.scala:174)
>         at org.apache.spark.broadcast.TorrentBroadcast._value$
> lzycompute(TorrentBroadcast.scala:65)
>         at org.apache.spark.broadcast.TorrentBroadcast._value(
> TorrentBroadcast.scala:65)
>         at org.apache.spark.broadcast.TorrentBroadcast.getValue(
> TorrentBroadcast.scala:89)
>         at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:67)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:274)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_1_piece0 of broadcast_1
>         at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$
> apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$s
> p(TorrentBroadcast.scala:146)
>         at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$
> apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(Torren
> tBroadcast.scala:125)
>         at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$
> apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(Torren
> tBroadcast.scala:125)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at org.apache.spark.broadcast.TorrentBroadcast.org$apache$
> spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:
> 125)
>         at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$
> readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
>         ... 11 more
>
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.1 in
> stage 0.0 (TID 1, 10.15.187.78, partition 0, ANY, 7305 bytes)
> 17/01/05 01:50:15 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Launching task 1 on executor id: 1 hostname: 10.15.187.78
> .
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.1 in stage
> 0.0 (TID 1) on executor 10.15.187.78: java.io.IOException (org
> .apache.spark.SparkException: Failed to get broadcast_1_piece0 of
> broadcast_1) [duplicate 1]
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.2 in
> stage 0.0 (TID 2, 10.15.187.78, partition 0, ANY, 7305 bytes)
> 17/01/05 01:50:15 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Launching task 2 on executor id: 1 hostname: 10.15.187.78
> .
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.2 in stage
> 0.0 (TID 2) on executor 10.15.187.78: java.io.IOException (org
> .apache.spark.SparkException: Failed to get broadcast_1_piece0 of
> broadcast_1) [duplicate 2]
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.3 in
> stage 0.0 (TID 3, 10.15.187.76, partition 0, ANY, 7305 bytes)
> 17/01/05 01:50:15 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Launching task 3 on executor id: 6 hostname: 10.15.187.76
> .
> 17/01/05 01:50:16 INFO scheduler.TaskSetManager: Lost task 0.3 in stage
> 0.0 (TID 3) on executor 10.15.187.76: java.io.IOException (org
> .apache.spark.SparkException: Failed to get broadcast_1_piece0 of
> broadcast_1) [duplicate 3]
> 17/01/05 01:50:16 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0
> failed 4 times; aborting job
> 17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
> whose tasks have all completed, from pool
> 17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
> 17/01/05 01:50:16 INFO scheduler.DAGScheduler: ResultStage 0 (load at
> NativeMethodAccessorImpl.java:-2) failed in 2.110 s
> 17/01/05 01:50:16 INFO scheduler.DAGScheduler: Job 0 failed: load at
> NativeMethodAccessorImpl.java:-2, took 2.262950 s
> Traceback (most recent call last):
>   File "/home/hadoop/development/datareloadwithps.py", line 851, in
> <module>
>     datareporcessing(expected_datetime,expected_directory_hdfs)
>   File "/home/hadoop/development/datareloadwithps.py", line 204, in
> datareporcessing
>     df_codingsc_raw = sqlContext.read.format("csv").
> option("header",'true').load(HDFS_BASE_URL + hdfs_dir + filename)
>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
> line 147, in load
>   File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
> line 933, in __call__
>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 63, in deco
>   File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
> line 312, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o58.load.
>
>
>
>
>
>
>
> Thanks & Best Regards,
> Palash Gupta
>
>
> ------------------------------
> *From:* Palash Gupta <spline_pal...@yahoo.com>
> *To:* Marco Mistroni <mmistr...@gmail.com>
> *Cc:* ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org>
> *Sent:* Saturday, December 31, 2016 12:43 PM
> *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying
> "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
>
> Hi Marco,
>
> Thanks!
>
> Please have my response:
>
> so you have a pyspark application running on spark 2.0
> Palash>> Yes
>
> You have python scripts dropping files on HDFS
> Palash>> Yes (it is not part of spark process, just independent python
> script)
>
> then you have two spark job
> Palash>> Yes
>
> - 1 load expected hour data (pls explain. HOw many files on average)
> Palash>>
>
> 35,000 rows in each file at least with 150 columns
>
> Number of CSV file types: 7
>
> Number of file for each type: 4
>
> total number of file: 28
>
> - 1 load delayed data(pls explain. how many files on average)
> Palash>> We may or may not get delayed data in each hour. But for example
> disconnection between CSV generation system and spark system has a network
> issue then we will get many delayed hour files.
>
> On average:
>
> 35,000 rows in each file at least with 150 columns
>
> Number of CSV file types: 7
>
> Number of file for each type: 2
>
> total number of file: 14
>
> Do these scripts run continuously (they have a while loop) or you kick
> them off  via a job scheduler on an hourly basis
> Palash>> No this script is running in linux cron schedule (not in while
> loop).
>
> Do these scripts run on a cluster?
> Palash>> My pyspark application is running in a standalone cluster mode
> where I have only two VM (One master, two workers).
>
> So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3
> of them, does aggregation etc then populate mongo
>
> Palash>> Yes
>
>
> At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2
> additonal. Presumably these files are not deleted). So your job now loads 5
> files, does aggregation and store data in mongodb? Or does your job at T+1
> only loads deltas (the two new csv files which appeared at T+1)?
>
> Palash>> No it will only handle with newly arrived file for new expected
> hour. But in delayed data handling there is a possibility to reprocess an
> specific hour data and re-calculate KPI and update in mongodb.
>
> You said before that simply parsing csv files via spark in a standalone
> app works fine.
> Palash>> I said that when I stopped delayed data loading spark script now
> expected hour data loading is smooth and running good since last three days.
>
> Then what you can try is to do exactly the same processig you are doing
> but instead of loading csv files from HDFS you can load from local
> directory and see if the problem persists......(this just to exclude any
> issues with loading HDFS data.)
> Palash>> The issue is same loading from file system. When I'm running only
> single script it is smooth. When I'm running both script at a time in two
> separate pyspark applications, sometimes it is failing showing this error
> while loading file from file system.
>
> Now I'm doing below things as per suggestion:
>
> 1. I will migrate my applications to production environment where I will
> have more resources
> 2. Use Spark 2.0.0 function to load CSV rather using databrics api
> 3. In production I will run multiple spark application at a time and try
> to reproduce this error for both file system and HDFS loading case
>
> When I'm done I will share details with you.
>
> If you have any suggestion more for debug point of view, you can add here
> for me
>
>
>
> Thanks & Best Regards,
> Palash Gupta
>
>
> ------------------------------
> *From:* Marco Mistroni <mmistr...@gmail.com>
> *To:* "spline_pal...@yahoo.com" <spline_pal...@yahoo.com>
> *Cc:* ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org>
> *Sent:* Saturday, December 31, 2016 1:42 AM
> *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying
> "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
>
> Hi Palash
>
> so you have a pyspark application running on spark 2.0
> You have python scripts dropping files on HDFS
> then you have two spark job
> - 1 load expected hour data (pls explain. HOw many files on average)
> - 1 load delayed data(pls explain. how many files on average)
>
> Do these scripts run continuously (they have a while loop) or you kick
> them off  via a job scheduler on an hourly basis
> Do these scripts run on a cluster?
>
>
> So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3
> of them, does aggregation etc then populate mongo
> At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2
> additonal. Presumably these files are not deleted). So your job now loads 5
> files, does aggregation and store data in mongodb? Or does your job at T+1
> only loads deltas (the two new csv files which appeared at T+1)?
>
> You said before that simply parsing csv files via spark in a standalone
> app works fine. Then what you can try is to do exactly the same processig
> you are doing but instead of loading csv files from HDFS you can load from
> local directory and see if the problem persists......(this just to exclude
> any issues with loading HDFS data.)
>
> hth
>    Marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Fri, Dec 30, 2016 at 2:02 PM, Palash Gupta <spline_pal...@yahoo.com>
> wrote:
>
> Hi Marco & Ayan,
>
> I have now clearer idea about what Marco means by Reduce. I will do it to
> dig down.
>
> Let me answer to your queries:
>
> hen you see the broadcast errors, does your job terminate?
> Palash>> Yes it terminated the app.
>
> Or are you assuming that something is wrong just because you see the
> message in the logs?
>
> Palash>> No it terminated for the very first step of Spark processing (in
> my case loading csv from hdfs)
>
> Plus...Wrt logic....Who writes the CSV? With what frequency?
> Palash>> We parsed xml files using python (not in spark scope) & make csv
> and put in hdfs
>
> Does it app run all the time loading CSV from hadoop?
>
> Palash>> Every hour two separate pyspark app are running
> 1. Loading current expected hour data, prepare kpi, do aggregation, load
> in mongodb
> 2. Same operation will run for delayed hour data
>
>
> Are you using spark streaming?
> Palash>> No
>
> Does it app run fine with an older version of spark (1.6 )
> Palash>> I didn't test with Spark 1.6. My app is running now good as I
> stopped second app (delayed data loading) since last two days. Even most of
> the case both are running well except few times...
>
>
> Sent from Yahoo Mail on Android
> <https://overview.mail.yahoo.com/mobile/?.src=Android>
>
> On Fri, 30 Dec, 2016 at 4:57 pm, Marco Mistroni
> <mmistr...@gmail.com> wrote:
> Correct. I mean reduce the functionality.
> Uhm I realised I didn't ask u a fundamental question. When you see the
> broadcast errors, does your job terminate? Or are you assuming that
> something is wrong just because you see the message in the logs?
> Plus...Wrt logic....Who writes the CSV? With what frequency?
> Does it app run all the time loading CSV from hadoop?
> Are you using spark streaming?
> Does it app run fine with an older version of spark (1.6 )
> Hth
>
> On 30 Dec 2016 12:44 pm, "ayan guha" <guha.a...@gmail.com> wrote:
>
> @Palash: I think what Macro meant by "reduce functionality" is to reduce
> scope of your application's functionality so that you can isolate the issue
> in certain part(s) of the app...I do not think he meant "reduce" operation
> :)
>
> On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta <spline_pal...@yahoo.com.
> invalid> wrote:
>
> Hi Marco,
>
> All of your suggestions are highly appreciated, whatever you said so far.
> I would apply to implement in my code and let you know.
>
> Let me answer your query:
>
> What does your program do?
> Palash>> In each hour I am loading many CSV files and then I'm making some
> KPI(s) out of them. Finally I am doing some aggregation and inserting into
> mongodb from spark.
>
> you say it runs for 2-3 hours, what is the logic? just processing a huge
> amount of data? doing ML ?
> Palash>> Yes you are right whatever I'm processing it should not take much
> time. Initially my processing was taking only 5 minutes as I was using all
> cores running only one application. When I created more separate spark
> applications for handling delayed data loading and implementing more use
> cases with parallel run, I started facing the error randomly. And due to
> separate resource distribution among four parallel spark application to run
> in parallel now some task is taking longer time than usual. But still it
> should not take 2-3 hours time...
>
> Currently whole applications are running in a development environment
> where we have only two VM cluster and I will migrate to production platform
> by next week. I will let you know if there is any improvement over there.
>
> I'd say break down your application..  reduce functionality , run and see
> outcome. then add more functionality, run and see again.
>
> Palash>> Macro as I'm not very good in Spark. It would be helpful for me
> if you provide some example of reduce functionality. Cause I'm using Spark
> data frame, join data frames, use SQL statement to manipulate KPI(s). Here
> How could I apply reduce functionality?
>
>
>
> Thanks & Best Regards,
> Palash Gupta
>
>
> ------------------------------
> *From:* Marco Mistroni <mmistr...@gmail.com>
> *To:* "spline_pal...@yahoo.com" <spline_pal...@yahoo.com>
> *Cc:* User <user@spark.apache.org>
> *Sent:* Thursday, December 29, 2016 11:28 PM
>
> *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying
> "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
>
> Hello
>  no sorry i dont have any further insight into that.... i have seen
> similar errors but for completely different issues, and in most of hte
> cases it had to do with my data or my processing rather than Spark itself.
> What does your program do? you say it runs for 2-3 hours, what is the
> logic? just processing a huge amount of data?
> doing ML ?
> i'd say break down your application..  reduce functionality , run and see
> outcome. then add more functionality, run and see again.
> I found myself doing htese kinds of things when i got errors in my spark
> apps.
>
> To get a concrete help you will have to trim down the code to a few lines
> that can reproduces the error  That will be a great start
>
> Sorry for not being of much help
>
> hth
>  marco
>
>
>
>
>
> On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta <spline_pal...@yahoo.com>
> wrote:
>
> Hi Marco,
>
> Thanks for your response.
>
> Yes I tested it before & am able to load from linux filesystem and it also
> sometimes have similar issue.
>
> However in both cases (either from hadoop or linux file system), this
> error comes in some specific scenario as per my observations:
>
> 1. When two parallel spark separate application is initiated from one
> driver (not all the time, sometime)
> 2. If one spark jobs are running for more than expected hour let say 2-3
> hours, the second application terminated giving the error.
>
> To debug the problem for me it will be good if you can share some possible
> reasons why failed to broadcast error may come.
>
> Or if you need more logs I can share.
>
> Thanks again Spark User Group.
>
> Best Regards
> Palash Gupta
>
>
>
> Sent from Yahoo Mail on Android
> <https://overview.mail.yahoo.com/mobile/?.src=Android>
>
> On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni
> <mmistr...@gmail.com> wrote:
> Hi
>  Pls try to read a CSV from filesystem instead of hadoop. If you can read
> it successfully then your hadoop file is the issue and you can start
> debugging from there.
> Hth
>
> On 29 Dec 2016 6:26 am, "Palash Gupta" <spline_pal...@yahoo.com. invalid>
> wrote:
>
> Hi Apache Spark User team,
>
>
>
> Greetings!
>
> I started developing an application using Apache Hadoop and Spark using
> python. My pyspark application randomly terminated saying "Failed to get
> broadcast_1*" and I have been searching for suggestion and support in
> Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in
> pyspark application
> <http://stackoverflow.com/questions/41236661/failed-to-get-broadcast-1-piece0-of-broadcast-1-in-pyspark-application>
>
>
> Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
> I was building an application on Apache Spark 2.00 with Python 3.4 and
> trying to load some CSV files from HDFS (...
>
> <http://stackoverflow.com/questions/41236661/failed-to-get-broadcast-1-piece0-of-broadcast-1-in-pyspark-application>
>
>
> Could you please provide suggestion registering myself in Apache User list
> or how can I get suggestion or support to debug the problem I am facing?
>
> Your response will be highly appreciated.
>
>
>
> Thanks & Best Regards,
> Engr. Palash Gupta
> WhatsApp/Viber: +8801817181502
> Skype: palash2494
>
>
>
>
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>
>
>
>
>

Reply via email to