AM restart in a other node make SparkSQL job into a state of feign death

2017-12-20 Thread Bang Xiao
I run "spark-sql  --master yarn --deploy-mode client -f 'SQLs' " in shell, 
The application  is stuck when the AM is down and restart in other nodes. It
seems the driver wait for the next sql. Is this a bug?In my opinion,Either
the application execute the failed sql or exit with a failure when the AM
restart。



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



AM restart in a other node makes SparkSQL jobs into a state of feign death

2017-12-20 Thread Bang Xiao
I run "spark-sql  --master yarn --deploy-mode client -f 'SQLs' " in shell, 
The application  is stuck when the AM is down and restart in other nodes. It
seems the driver wait for the next sql. Is this a bug?In my opinion,Either
the application execute the failed sql or exit with a failure when the AM
restart。



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Exception in Shutdown-thread, bad file descriptor

2017-12-20 Thread Noorul Islam Kamal Malmiyoda
Hi all,

We are getting the following exception and this somehow blocks the parent
thread from proceeding further.

17/11/14 16:50:09 SPARK_APP WARN NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
17/11/14 16:50:17 SPARK_APP WARN SparkContext: Use an existing
SparkContext, some configuration may not take effect.
[Stage 4:> (13 + 1) / 200][Stage 5:> (1 + 0) / 8][Stage 8:> (0 + 0) /
8]Exception
in thread "Shutdown-checker" java.lang.RuntimeException: eventfd_write()
failed: Bad file descriptor
at io.netty.channel.epoll.Native.eventFdWrite(Native Method)
at io.netty.channel.epoll.EpollEventLoop.wakeup(EpollEventLoop.java:106)
at
io.netty.util.concurrent.SingleThreadEventExecutor.shutdownGracefully(SingleThreadEventExecutor.java:538)
at
io.netty.util.concurrent.MultithreadEventExecutorGroup.shutdownGracefully(MultithreadEventExecutorGroup.java:146)
at
io.netty.util.concurrent.AbstractEventExecutorGroup.shutdownGracefully(AbstractEventExecutorGroup.java:69)
at
com.datastax.driver.core.NettyOptions.onClusterClose(NettyOptions.java:193)
at com.datastax.driver.core.Connection$Factory.shutdown(Connection.java:902)
at
com.datastax.driver.core.Cluster$Manager$ClusterCloseFuture$1.run(Cluster.java:2539)

This is very hard to replicate, so I am not able to come up with a
re-produceable recipe. Has anyone faced similar issue? Any help is
appreciated.

Spark Version: 2.0.1


Thanks and Regards
Noorul


Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-20 Thread Sunitha Chennareddy
Hi,

Thank You All..

Here is my requirement, I have a dataframe which contains list of rows
retrieved from oracle table.
I need to iterate dataframe and fetch each record and call a common
function by passing few parameters.

Issue I am facing is : I am not able to call common function

JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
Function() {
  @Override
  public Person call(Row row)  throws Exception{
  Person person = new Person();
  person.setId(row.getDecimal(0).longValue());
  person.setName(row.getString(1));

personLst.add(person);
return person;
  }
});

personRDD.foreach(new VoidFunction() {
private static final long serialVersionUID = 1123456L;

@Override
public void call(Person person) throws Exception
{
  System.out.println(person.getId());
Here I tried to call common function 
}
   });

I am able to print data in foreach loop, however if I tried to call common
function it gives me below error
Error Message :  org.apache.spark.SparkException: Task not serializable

I kindly request you to share some idea(sample code / link to refer) on how
to call a common function/Interace method by passing values in each record
of the dataframe.

Regards,
Sunitha


On Tue, Dec 19, 2017 at 1:20 PM, Weichen Xu 
wrote:

> Hi Sunitha,
>
> In the mapper function, you cannot update outer variables such as 
> `personLst.add(person)`,
> this won't work so that's the reason you got an empty list.
>
> You can use `rdd.collect()` to get a local list of `Person` objects
> first, then you can safely iterate on the local list and do any update you
> want.
>
> Thanks.
>
> On Tue, Dec 19, 2017 at 2:16 PM, Sunitha Chennareddy <
> chennareddysuni...@gmail.com> wrote:
>
>> Hi Deepak,
>>
>> I am able to map row to person class, issue is I want to to call another
>> method.
>> I tried converting to list and its not working with out using collect.
>>
>> Regards
>> Sunitha
>> On Tuesday, December 19, 2017, Deepak Sharma 
>> wrote:
>>
>>> I am not sure about java but in scala it would be something like
>>> df.rdd.map{ x => MyClass(x.getString(0),.)}
>>>
>>> HTH
>>>
>>> --Deepak
>>>
>>> On Dec 19, 2017 09:25, "Sunitha Chennareddy" >> > wrote:
>>>
>>> Hi All,
>>>
>>> I am new to Spark, I want to convert DataFrame to List with
>>> out using collect().
>>>
>>> Main requirement is I need to iterate through the rows of dataframe and
>>> call another function by passing column value of each row (person.getId())
>>>
>>> Here is the snippet I have tried, Kindly help me to resolve the issue,
>>> personLst is returning 0:
>>>
>>> List personLst= new ArrayList();
>>> JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
>>> Function() {
>>>   public Person call(Row row)  throws Exception{
>>>   Person person = new Person();
>>>   person.setId(row.getDecimal(0).longValue());
>>>   person.setName(row.getString(1));
>>>
>>> personLst.add(person);
>>> // here I tried to call another function but control never passed
>>> return person;
>>>   }
>>> });
>>> logger.info("personLst size =="+personLst.size());
>>> logger.info("personRDD count ==="+personRDD.count());
>>>
>>> //output is
>>> personLst size == 0
>>> personRDD count === 3
>>>
>>>
>>>
>


Re: Can spark shuffle leverage Alluxio to abtain higher stability?

2017-12-20 Thread chopinxb
In my practice of spark application(almost Spark-SQL) , when there is a
complete node failure in my cluster, jobs which have shuffle blocks on the
node will completely fail after 4 task retries.  It seems that data lineage
didn't work. What' more, our applications use multiple SQL statements for
data analysis. After a lengthy calculation, entire application failed
because of one job failure is unacceptable.  So we consider more stability
rather than speed in some way.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



keep sparkContext alive and wait for next job just like spark-shell

2017-12-20 Thread CondyZhou
Hi ,All.   i am confused of how can i keep a sparkContext alive. Just in
the situation that we write a sql query on a web and backend we init a
sparkContext then submit the spark jobs. However the question is everytime
we run the query string,spark with request the resources from yarn.It is
painful to waste a lot of time on init the sparkContxt. So i think about
the way to run the spark job that when the query finnished the context will
not ended and do not release the resource on yarn just like the spark-shell(
i find the spark-shell will keep the resources when started).  Is there
any idea,please give me  a tutor . Thanks for all !



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

Re: /tmp fills up to 100GB when using a window function

2017-12-20 Thread Vadim Semenov
Ah, yes, I missed that part

it's `spark.local.dir`

spark.local.dir /tmp Directory to use for "scratch" space in Spark,
including map output files and RDDs that get stored on disk. This should be
on a fast, local disk in your system. It can also be a comma-separated list
of multiple directories on different disks. NOTE: In Spark 1.0 and later
this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or
LOCAL_DIRS (YARN) environment variables set by the cluster manager.

On Wed, Dec 20, 2017 at 2:58 PM, Gourav Sengupta 
wrote:

> I do think that there is an option to set the temporary shuffle location
> to a particular directory. While working with EMR I set it to /mnt1/. Let
> me know in case you are not able to find it.
>
> On Mon, Dec 18, 2017 at 8:10 PM, Mihai Iacob  wrote:
>
>> This code generates files under /tmp...blockmgr... which do not get
>> cleaned up after the job finishes.
>>
>> Anything wrong with the code below? or are there any known issues with
>> spark not cleaning up /tmp files?
>>
>>
>> window = Window.\
>>   partitionBy('***', 'date_str').\
>>   orderBy(sqlDf['***'])
>>
>> sqlDf = sqlDf.withColumn("***",rank().over(window))
>> df_w_least = sqlDf.filter("***=1")
>>
>>
>>
>>
>>
>> Regards,
>>
>> *Mihai Iacob*
>> DSX Local  - Security, IBM Analytics
>>
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: /tmp fills up to 100GB when using a window function

2017-12-20 Thread Gourav Sengupta
I do think that there is an option to set the temporary shuffle location to
a particular directory. While working with EMR I set it to /mnt1/. Let me
know in case you are not able to find it.

On Mon, Dec 18, 2017 at 8:10 PM, Mihai Iacob  wrote:

> This code generates files under /tmp...blockmgr... which do not get
> cleaned up after the job finishes.
>
> Anything wrong with the code below? or are there any known issues with
> spark not cleaning up /tmp files?
>
>
> window = Window.\
>   partitionBy('***', 'date_str').\
>   orderBy(sqlDf['***'])
>
> sqlDf = sqlDf.withColumn("***",rank().over(window))
> df_w_least = sqlDf.filter("***=1")
>
>
>
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local  - Security, IBM Analytics
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Fwd: ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

2017-12-20 Thread Vishal Verma
Hi All,

please help me with this error

17/12/20 11:07:16 INFO executor.CoarseGrainedExecutorBackend: Started
daemon with process name: 19581@ddh-dev-dataproc-sw-hdgx
17/12/20 11:07:16 INFO util.SignalUtils: Registered signal handler for TERM
17/12/20 11:07:16 INFO util.SignalUtils: Registered signal handler for HUP
17/12/20 11:07:16 INFO util.SignalUtils: Registered signal handler for INT
17/12/20 11:07:16 INFO spark.SecurityManager: Changing view acls to:
yarn,tkmafag
17/12/20 11:07:16 INFO spark.SecurityManager: Changing modify acls to:
yarn,tkmafag
17/12/20 11:07:16 INFO spark.SecurityManager: Changing view acls groups to:
17/12/20 11:07:16 INFO spark.SecurityManager: Changing modify acls groups to:
17/12/20 11:07:16 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users  with view
permissions: Set(yarn, tkmafag); groups with view permissions: Set();
users  with modify permissions: Set(yarn, tkmafag); groups with modify
permissions: Set()
17/12/20 11:07:16 INFO client.TransportClientFactory: Successfully
created connection to /10.206.52.20:35617 after 48 ms (0 ms spent in
bootstraps)
17/12/20 11:07:17 INFO spark.SecurityManager: Changing view acls to:
yarn,tkmafag
17/12/20 11:07:17 INFO spark.SecurityManager: Changing modify acls to:
yarn,tkmafag
17/12/20 11:07:17 INFO spark.SecurityManager: Changing view acls groups to:
17/12/20 11:07:17 INFO spark.SecurityManager: Changing modify acls groups to:
17/12/20 11:07:17 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users  with view
permissions: Set(yarn, tkmafag); groups with view permissions: Set();
users  with modify permissions: Set(yarn, tkmafag); groups with modify
permissions: Set()
17/12/20 11:07:17 INFO client.TransportClientFactory: Successfully
created connection to /10.206.52.20:35617 after 1 ms (0 ms spent in
bootstraps)
17/12/20 11:07:17 INFO storage.DiskBlockManager: Created local
directory at 
/hadoop/yarn/nm-local-dir/usercache/tkmafag/appcache/application_1512677738429_16167/blockmgr-d585ecec-829a-432b-a8f1-89503359510e
17/12/20 11:07:17 INFO memory.MemoryStore: MemoryStore started with
capacity 7.8 GB
17/12/20 11:07:17 INFO executor.CoarseGrainedExecutorBackend:
Connecting to driver:
spark://CoarseGrainedScheduler@10.206.52.20:35617
17/12/20 11:07:17 INFO executor.CoarseGrainedExecutorBackend:
Successfully registered with driver
17/12/20 11:07:17 INFO executor.Executor: Starting executor ID 1 on
host ddh-dev-dataproc-sw-hdgx.c.kohls-ddh-lle.internal
17/12/20 11:07:17 INFO util.Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port
60054.
17/12/20 11:07:17 INFO netty.NettyBlockTransferService: Server created
on ddh-dev-dataproc-sw-hdgx.c.kohls-ddh-lle.internal:60054
17/12/20 11:07:17 INFO storage.BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block
replication policy
17/12/20 11:07:17 INFO storage.BlockManagerMaster: Registering
BlockManager BlockManagerId(1,
ddh-dev-dataproc-sw-hdgx.c.kohls-ddh-lle.internal, 60054, None)
17/12/20 11:07:17 INFO storage.BlockManagerMaster: Registered
BlockManager BlockManagerId(1,
ddh-dev-dataproc-sw-hdgx.c.kohls-ddh-lle.internal, 60054, None)
17/12/20 11:07:17 INFO storage.BlockManager: external shuffle service
port = 7337
17/12/20 11:07:17 INFO storage.BlockManager: Registering executor with
local external shuffle service.
17/12/20 11:07:17 INFO client.TransportClientFactory: Successfully
created connection to
ddh-dev-dataproc-sw-hdgx.c.kohls-ddh-lle.internal/10.206.53.214:7337
after 1 ms (0 ms spent in bootstraps)
17/12/20 11:07:17 INFO storage.BlockManager: Initialized BlockManager:
BlockManagerId(1, ddh-dev-dataproc-sw-hdgx.c.kohls-ddh-lle.internal,
60054, None)
17/12/20 11:08:21 ERROR executor.CoarseGrainedExecutorBackend:
RECEIVED SIGNAL TERM
17/12/20 11:08:21 INFO storage.DiskBlockManager: Shutdown hook called
17/12/20 11:08:21 INFO util.ShutdownHookManager: Shutdown hook called

I am using following spark config
 maxCores = 5
 driverMemory=2g
 executorMemory=17g
 executorInstances=100

Out of 100 Executors My job end up with only 10 active executors,
nonetheless the enough memory is available. Even tried setting the
executors to 250 only 10 remains active.

All I am trying to do is loading a mulitpartition hive table  and
doing df.count over it.

Please help me understanding the issue causing the executors kill

Thanks & Regards,
*Vishal Verma*

-- 

*DISCLAIMER:*
All the content in email is intended for the recipient and not to be 
published elsewhere without Exadatum consent. And attachments shall be send 
only if required and with ownership of the sender. This message contains 
confidential information and is intended only for the individual named. If 
you are not the named addressee, you should not disseminate, distribute or 
copy this email. Please notify the sender immediately by email if you have 
received this email by mistake and 

Re: Can spark shuffle leverage Alluxio to abtain higher stability?

2017-12-20 Thread vincent gromakowski
Probability of a complete node failure is low. I would rely on data lineage
and accept the reprocessing overhead. Another option would be to Write on
distributed FS but it will drastically reduce all your jobs speed

Le 20 déc. 2017 11:23, "chopinxb"  a écrit :

> Yes,shuffle service was already started in each NodeManager. What i mean
> about node fails is the machine is down,all the service include nodemanager
> process in this machine  is down. So in this case, shuffle service is no
> longer helpfull
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Can spark shuffle leverage Alluxio to abtain higher stability?

2017-12-20 Thread chopinxb
Yes,shuffle service was already started in each NodeManager. What i mean
about node fails is the machine is down,all the service include nodemanager
process in this machine  is down. So in this case, shuffle service is no
longer helpfull



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can spark shuffle leverage Alluxio to abtain higher stability?

2017-12-20 Thread vincent gromakowski
In your case you need to externalize the shuffle files to a component
outside of your spark cluster to make them persist after spark workers
death.
https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service


2017-12-20 10:46 GMT+01:00 chopinxb :

> In my use case, i run spark on yarn-client mode with dynamicAllocation
> enabled,  When a node shutting down abnormally, my spark application will
> fails because of task fail to fetch shuffle blocks from that node 4 times.
> Why spark do not leverage Alluxio(distributed in-memory filesystem) to
> write
> shuffle blocks with replicas ?  In this situation,when a node shutdown,task
> can fetch shuffle blocks from other replicas. we can abtain higher
> stability
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Can spark shuffle leverage Alluxio to abtain higher stability?

2017-12-20 Thread chopinxb
In my use case, i run spark on yarn-client mode with dynamicAllocation
enabled,  When a node shutting down abnormally, my spark application will
fails because of task fail to fetch shuffle blocks from that node 4 times.
Why spark do not leverage Alluxio(distributed in-memory filesystem) to write
shuffle blocks with replicas ?  In this situation,when a node shutdown,task
can fetch shuffle blocks from other replicas. we can abtain higher stability 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org