Hi Macro,
Yes it was in the same host when problem was found.
Even when I tried to start with different host, the problem is still there.
Any hints or suggestion will be appreciated.
 Thanks & Best Regards,
Palash Gupta


      From: Marco Mistroni <mmistr...@gmail.com>
 To: Palash Gupta <spline_pal...@yahoo.com> 
Cc: ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org>
 Sent: Thursday, January 5, 2017 1:01 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed 
to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
   
Hi If it only happens when u run 2 app at same time could it be that these 2 
apps somehow run on same host?Kr
On 5 Jan 2017 9:00 am, "Palash Gupta" <spline_pal...@yahoo.com> wrote:

Hi Marco and respected member,
I have done all the possible things suggested by Forum but still I'm having 
same issue:

1. I will migrate my applications to production environment where I will have 
more resourcesPalash>> I migrated my application in production where I have 
more CPU Cores, Memory & total 7 host in spark cluster. 
2. Use Spark 2.0.0 function to load CSV rather using databrics apiPalash>> 
Earlier I'm using databricks csv api with Spark 2.0.0. As suggested by one of 
the mate, Now I'm using spark 2.0.0 built in csv loader.
3. In production I will run multiple spark application at a time and try to 
reproduce this error for both file system and HDFS loading casPalash>> yes I 
reproduced and it only happen when two spark application run at a time. Please 
see the logs:
17/01/05 01:50:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 
(TID 0, 10.15.187.79): java.io.IOException: org.apache.spa
rk.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
        at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: 1260)
        at org.apache.spark.broadcast. TorrentBroadcast. readBroadcastBlock( 
TorrentBroadcast.scala:174)
        at org.apache.spark.broadcast. TorrentBroadcast._value$ 
lzycompute(TorrentBroadcast. scala:65)
        at org.apache.spark.broadcast. TorrentBroadcast._value( 
TorrentBroadcast.scala:65)
        at org.apache.spark.broadcast. TorrentBroadcast.getValue( 
TorrentBroadcast.scala:89)
        at org.apache.spark.broadcast. Broadcast.value(Broadcast. scala:70)
        at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:67)
        at org.apache.spark.scheduler. Task.run(Task.scala:85)
        at org.apache.spark.executor. Executor$TaskRunner.run( 
Executor.scala:274)
        at java.util.concurrent. ThreadPoolExecutor.runWorker( 
ThreadPoolExecutor.java:1145)
        at java.util.concurrent. ThreadPoolExecutor$Worker.run( 
ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread. java:745)
Caused by: org.apache.spark. SparkException: Failed to get broadcast_1_piece0 
of broadcast_1
        at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ 
apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply$mcVI$s
p(TorrentBroadcast.scala:146)
        at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ 
apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply(Torren
tBroadcast.scala:125)
        at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ 
apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply(Torren
tBroadcast.scala:125)
        at scala.collection.immutable. List.foreach(List.scala:381)
        at org.apache.spark.broadcast. TorrentBroadcast.org$apache$ 
spark$broadcast$ TorrentBroadcast$$readBlocks( TorrentBroadcast.scala:
125)
        at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$ 
readBroadcastBlock$1.apply( TorrentBroadcast.scala:186)
        at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: 1253)
        ... 11 more

17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 
(TID 1, 10.15.187.78, partition 0, ANY, 7305 bytes)
17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: 
Launching task 1 on executor id: 1 hostname: 10.15.187.78
.
17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 
(TID 1) on executor 10.15.187.78: java.io.IOException (org
.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) 
[duplicate 1]
17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 
(TID 2, 10.15.187.78, partition 0, ANY, 7305 bytes)
17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: 
Launching task 2 on executor id: 1 hostname: 10.15.187.78
.
17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 
(TID 2) on executor 10.15.187.78: java.io.IOException (org
.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) 
[duplicate 2]
17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 
(TID 3, 10.15.187.76, partition 0, ANY, 7305 bytes)
17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: 
Launching task 3 on executor id: 6 hostname: 10.15.187.76
.
17/01/05 01:50:16 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 
(TID 3) on executor 10.15.187.76: java.io.IOException (org
.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) 
[duplicate 3]
17/01/05 01:50:16 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 
times; aborting job
17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose 
tasks have all completed, from pool
17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
17/01/05 01:50:16 INFO scheduler.DAGScheduler: ResultStage 0 (load at 
NativeMethodAccessorImpl.java: -2) failed in 2.110 s
17/01/05 01:50:16 INFO scheduler.DAGScheduler: Job 0 failed: load at 
NativeMethodAccessorImpl.java: -2, took 2.262950 s
Traceback (most recent call last):
  File "/home/hadoop/development/ datareloadwithps.py", line 851, in <module>
    datareporcessing(expected_ datetime,expected_directory_ hdfs)
  File "/home/hadoop/development/ datareloadwithps.py", line 204, in 
datareporcessing
    df_codingsc_raw = sqlContext.read.format("csv"). 
option("header",'true').load( HDFS_BASE_URL + hdfs_dir + filename)
  File "/usr/local/spark/python/lib/ pyspark.zip/pyspark/sql/ readwriter.py", 
line 147, in load
  File "/usr/local/spark/python/lib/ py4j-0.10.1-src.zip/py4j/java_ 
gateway.py", line 933, in __call__
  File "/usr/local/spark/python/lib/ pyspark.zip/pyspark/sql/utils. py", line 
63, in deco
  File "/usr/local/spark/python/lib/ py4j-0.10.1-src.zip/py4j/ protocol.py", 
line 312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o58.load.






 Thanks & Best Regards,
Palash Gupta


      From: Palash Gupta <spline_pal...@yahoo.com>
 To: Marco Mistroni <mmistr...@gmail.com> 
Cc: ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org>
 Sent: Saturday, December 31, 2016 12:43 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed 
to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
  
Hi Marco,
Thanks!
Please have my response:
so you have a pyspark application running on spark 2.0Palash>> Yes
You have python scripts dropping files on HDFSPalash>> Yes (it is not part of 
spark process, just independent python script)

then you have two spark jobPalash>> Yes
- 1 load expected hour data (pls explain. HOw many files on average)Palash>>
35,000 rows in each file at least with 150 columns 

Number of CSV file types: 7

Number of file for each type: 4

total number of file: 28

- 1 load delayed data(pls explain. how many files on average)Palash>> We may or 
may not get delayed data in each hour. But for example disconnection between 
CSV generation system and spark system has a network issue then we will get 
many delayed hour files. 

On average:
35,000 rows in each file at least with 150 columns 

Number of CSV file types: 7

Number of file for each type: 2

total number of file: 14
Do these scripts run continuously (they have a while loop) or you kick them off 
 via a job scheduler on an hourly basisPalash>> No this script is running in 
linux cron schedule (not in while loop). 

Do these scripts run on a cluster? 
Palash>> My pyspark application is running in a standalone cluster mode where I 
have only two VM (One master, two workers).

So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 of 
them, does aggregation etc then populate mongo
Palash>> Yes


At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 additonal. 
Presumably these files are not deleted). So your job now loads 5 files, does 
aggregation and store data in mongodb? Or does your job at T+1 only loads 
deltas (the two new csv files which appeared at T+1)?
Palash>> No it will only handle with newly arrived file for new expected hour. 
But in delayed data handling there is a possibility to reprocess an specific 
hour data and re-calculate KPI and update in mongodb. 

You said before that simply parsing csv files via spark in a standalone app 
works fine. 
Palash>> I said that when I stopped delayed data loading spark script now 
expected hour data loading is smooth and running good since last three days.

Then what you can try is to do exactly the same processig you are doing but 
instead of loading csv files from HDFS you can load from local directory and 
see if the problem persists......(this just to exclude any issues with loading 
HDFS data.)Palash>> The issue is same loading from file system. When I'm 
running only single script it is smooth. When I'm running both script at a time 
in two separate pyspark applications, sometimes it is failing showing this 
error while loading file from file system. 

Now I'm doing below things as per suggestion:
1. I will migrate my applications to production environment where I will have 
more resources2. Use Spark 2.0.0 function to load CSV rather using databrics 
api3. In production I will run multiple spark application at a time and try to 
reproduce this error for both file system and HDFS loading case
When I'm done I will share details with you. 

If you have any suggestion more for debug point of view, you can add here for me


 Thanks & Best Regards,
Palash Gupta


      From: Marco Mistroni <mmistr...@gmail.com>
 To: "spline_pal...@yahoo.com" <spline_pal...@yahoo.com> 
Cc: ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org>
 Sent: Saturday, December 31, 2016 1:42 AM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed 
to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
  
Hi Palash

so you have a pyspark application running on spark 2.0
You have python scripts dropping files on HDFS
then you have two spark job
- 1 load expected hour data (pls explain. HOw many files on average)
- 1 load delayed data(pls explain. how many files on average)

Do these scripts run continuously (they have a while loop) or you kick them off 
 via a job scheduler on an hourly basis
Do these scripts run on a cluster? 


So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 of 
them, does aggregation etc then populate mongo
At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 additonal. 
Presumably these files are not deleted). So your job now loads 5 files, does 
aggregation and store data in mongodb? Or does your job at T+1 only loads 
deltas (the two new csv files which appeared at T+1)?

You said before that simply parsing csv files via spark in a standalone app 
works fine. Then what you can try is to do exactly the same processig you are 
doing but instead of loading csv files from HDFS you can load from local 
directory and see if the problem persists......(this just to exclude any issues 
with loading HDFS data.)

hth
   Marco












On Fri, Dec 30, 2016 at 2:02 PM, Palash Gupta <spline_pal...@yahoo.com> wrote:

Hi Marco & Ayan,
I have now clearer idea about what Marco means by Reduce. I will do it to dig 
down.
Let me answer to your queries:
hen you see the broadcast errors, does your job terminate? Palash>> Yes it 
terminated the app.
Or are you assuming that something is wrong just because you see the message in 
the logs?
Palash>> No it terminated for the very first step of Spark processing (in my 
case loading csv from hdfs)
Plus...Wrt logic....Who writes the CSV? With what frequency?Palash>> We parsed 
xml files using python (not in spark scope) & make csv and put in hdfs
Does it app run all the time loading CSV from hadoop?
Palash>> Every hour two separate pyspark app are running1. Loading current 
expected hour data, prepare kpi, do aggregation, load in mongodb2. Same 
operation will run for delayed hour data

Are you using spark streaming?Palash>> No
Does it app run fine with an older version of spark (1.6 )Palash>> I didn't 
test with Spark 1.6. My app is running now good as I stopped second app 
(delayed data loading) since last two days. Even most of the case both are 
running well except few times...

Sent from Yahoo Mail on Android 
 
 On Fri, 30 Dec, 2016 at 4:57 pm, Marco Mistroni<mmistr...@gmail.com> wrote:  
Correct. I mean reduce the functionality.Uhm I realised I didn't ask u a 
fundamental question. When you see the broadcast errors, does your job 
terminate? Or are you assuming that something is wrong just because you see the 
message in the logs?Plus...Wrt logic....Who writes the CSV? With what 
frequency?Does it app run all the time loading CSV from hadoop?Are you using 
spark streaming?Does it app run fine with an older version of spark (1.6 )Hth
On 30 Dec 2016 12:44 pm, "ayan guha" <guha.a...@gmail.com> wrote:

@Palash: I think what Macro meant by "reduce functionality" is to reduce scope 
of your application's functionality so that you can isolate the issue in 
certain part(s) of the app...I do not think he meant "reduce" operation :)
On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta <spline_pal...@yahoo.com. 
invalid> wrote:

Hi Marco,
All of your suggestions are highly appreciated, whatever you said so far. I 
would apply to implement in my code and let you know. 

Let me answer your query:
What does your program do? 
Palash>> In each hour I am loading many CSV files and then I'm making some 
KPI(s) out of them. Finally I am doing some aggregation and inserting into 
mongodb from spark. 

 you say it runs for 2-3 hours, what is the logic? just processing a huge 
amount of data? doing ML ?Palash>> Yes you are right whatever I'm processing it 
should not take much time. Initially my processing was taking only 5 minutes as 
I was using all cores running only one application. When I created more 
separate spark applications for handling delayed data loading and implementing 
more use cases with parallel run, I started facing the error randomly. And due 
to separate resource distribution among four parallel spark application to run 
in parallel now some task is taking longer time than usual. But still it should 
not take 2-3 hours time...

Currently whole applications are running in a development environment where we 
have only two VM cluster and I will migrate to production platform by next 
week. I will let you know if there is any improvement over there. 

 I'd say break down your application..  reduce functionality , run and see 
outcome. then add more functionality, run and see again.
Palash>> Macro as I'm not very good in Spark. It would be helpful for me if you 
provide some example of reduce functionality. Cause I'm using Spark data frame, 
join data frames, use SQL statement to manipulate KPI(s). Here How could I 
apply reduce functionality?


 Thanks & Best Regards,
Palash Gupta


      From: Marco Mistroni <mmistr...@gmail.com>
 To: "spline_pal...@yahoo.com" <spline_pal...@yahoo.com> 
Cc: User <user@spark.apache.org>
 Sent: Thursday, December 29, 2016 11:28 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed 
to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
   
Hello
 no sorry i dont have any further insight into that.... i have seen similar 
errors but for completely different issues, and in most of hte cases it had to 
do with my data or my processing rather than Spark itself.
What does your program do? you say it runs for 2-3 hours, what is the logic? 
just processing a huge amount of data?
doing ML ?
i'd say break down your application..  reduce functionality , run and see 
outcome. then add more functionality, run and see again.
I found myself doing htese kinds of things when i got errors in my spark apps.

To get a concrete help you will have to trim down the code to a few lines that 
can reproduces the error  That will be a great start

Sorry for not being of much help

hth
 marco





On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta <spline_pal...@yahoo.com> wrote:

Hi Marco,
Thanks for your response.
Yes I tested it before & am able to load from linux filesystem and it also 
sometimes have similar issue.
However in both cases (either from hadoop or linux file system), this error 
comes in some specific scenario as per my observations:
1. When two parallel spark separate application is initiated from one driver 
(not all the time, sometime)2. If one spark jobs are running for more than 
expected hour let say 2-3 hours, the second application terminated giving the 
error.
To debug the problem for me it will be good if you can share some possible 
reasons why failed to broadcast error may come.
Or if you need more logs I can share.
Thanks again Spark User Group.
Best RegardsPalash Gupta


Sent from Yahoo Mail on Android 
 
 On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni<mmistr...@gmail.com> wrote:  
Hi Pls try to read a CSV from filesystem instead of hadoop. If you can read it 
successfully then your hadoop file is the issue and you can start debugging 
from there.Hth
On 29 Dec 2016 6:26 am, "Palash Gupta" <spline_pal...@yahoo.com. invalid> wrote:

Hi Apache Spark User team,


Greetings!
I started developing an application using Apache Hadoop and Spark using python. 
My pyspark application randomly terminated saying "Failed to get broadcast_1*" 
and I have been searching for suggestion and support in Stakeoverflow at Failed 
to get broadcast_1_piece0 of broadcast_1 in pyspark application

  
|  
|  
|  
|   |    |

  |

  |
|  
|   |  
Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
 I was building an application on Apache Spark 2.00 with Python 3.4 and trying 
to load some CSV files from HDFS (...  |   |

  |

  |

 

Could you please provide suggestion registering myself in Apache User list or 
how can I get suggestion or support to debug the problem I am facing?

Your response will be highly appreciated. 


 Thanks & Best Regards,
Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494


   
  




   



-- 
Best Regards,
Ayan Guha

  




   

   


   

Reply via email to