Hi Vadim,
Thank you so much this was a very useful command. This conversation is
going on here
https://www.mail-archive.com/user@spark.apache.org/msg58656.html
or you can just google "
why spark driver program is creating so many threads? How can I limit this
number?
Any idea about this?
From: Kürşat Kurt [mailto:kur...@kursatkurt.com]
Sent: Sunday, October 30, 2016 7:59 AM
To: 'Jörn Franke'
Cc: 'user@spark.apache.org'
Subject: RE: Out Of Memory issue
Hi Jörn;
I am reading 300.000 line csv file. It is
well I suppose one can drop tempTable as below
scala> df.registerTempTable("tmp")
scala> spark.sql("select count(1) from tmp").show
++
|count(1)|
++
| 904180|
++
scala> spark.sql("drop table if exists tmp")
res22: org.apache.spark.sql.DataFrame = []
Also your point
It seems like the best solution is to set: yarn.nodemanager.aux-services to
mapred_shuffle,spark_shuffle
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Hive-and-Spark-together-with-Dynamic-Resource-Allocation-tp27968p27978.html
Sent from the Apache
If there is some leaking threads, I think you should be able to see the
number of threads is increasing. You can just dump threads after 1-2 hours.
On Mon, Oct 31, 2016 at 12:59 PM, kant kodali wrote:
> yes I can certainly use jstack but it requires 4 to 5 hours for me to
>
yes I can certainly use jstack but it requires 4 to 5 hours for me to
reproduce the error so I can get back as early as possible.
Thanks a lot!
On Mon, Oct 31, 2016 at 12:41 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:
> Then it should not be a Receiver issue. Could you use `jstack`
Then it should not be a Receiver issue. Could you use `jstack` to find out
the name of leaking threads?
On Mon, Oct 31, 2016 at 12:35 PM, kant kodali wrote:
> Hi Ryan,
>
> It happens on the driver side and I am running on a client mode (not the
> cluster mode).
>
> Thanks!
>
Hi Ryan,
It happens on the driver side and I am running on a client mode (not the
cluster mode).
Thanks!
On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:
> Sorry, there is a typo in my previous email: this may **not** be the root
> cause if the leak
Sorry, there is a typo in my previous email: this may **not** be the root
cause if the leak threads are in the driver side.
Does it happen in the driver or executors?
On Mon, Oct 31, 2016 at 12:20 PM, kant kodali wrote:
> Hi Ryan,
>
> Ahh My Receiver.onStop method is
This is more of a Java question. You don't 'clean up' threads but rather
rearchitect your app so that you don't create long running threads that
don't terminate. Consider also an Executor instead of manually creating
threads.
On Mon, Oct 31, 2016 at 7:20 PM kant kodali wrote:
Hi Ryan,
Ahh My Receiver.onStop method is currently empty.
1) I have a hard time seeing why the receiver would crash so many
times within a span of 4 to 5 hours but anyways I understand I should
still cleanup during OnStop.
2) How do I clean up those threads? The documentation here
So in your code, each Receiver will start a new thread. Did you stop the
receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
after a receiver crashes and is restarted by Spark. However, this may be
the root cause since the leak threads are in the driver side. Could you use
I am also under the assumption that *onStart *function of the Receiver is
only called only once by Spark. please correct me if I am wrong.
On Mon, Oct 31, 2016 at 11:35 AM, kant kodali wrote:
> My driver program runs a spark streaming job. And it spawns a thread by
> itself
Trying again. Hoping to find some help in figuring out the performance
bottleneck we are observing.
Thanks,
Bharath
On Sun, Oct 30, 2016 at 11:58 AM, Spark User
wrote:
> Hi All,
>
> I have a UDAF that seems to perform poorly when its input is skewed. I
> have been
My driver program runs a spark streaming job. And it spawns a thread by
itself only in the *onStart()* function below Other than that it doesn't
spawn any other threads. It only calls MapToPair, ReduceByKey, forEachRDD,
Collect functions.
public class NSQReceiver extends Receiver {
private
Can you check if this JIRA is relevant?
https://issues.apache.org/jira/browse/SPARK-2608
If not, can you make a new one?
On Thu, Oct 27, 2016 at 10:27 PM, Rodrick Brown
wrote:
> Try setting the values in $SPARK_HOME/conf/spark-defaults.conf
>
> i.e.
>
> $ egrep
> how do I tell my spark driver program to not create so many?
This may depend on your driver program. Do you spawn any threads in
it? Could you share some more information on the driver program, spark
version and your environment? It would greatly help others to help you
On Mon, Oct 31, 2016
Thanks,
As I understand for Kafka case the way to do it is to define my
kafka.Partitioner that is used when data is produced to Kafka and just
reuse this partitioner as spark.Partitioner in mapWithState spec.
I think I'll stick with that.
Thanks,
Andrii
2016-10-31 16:55 GMT+02:00 Cody
Have you tried to get number of threads in a running process using `cat
/proc//status` ?
On Sun, Oct 30, 2016 at 11:04 PM, kant kodali wrote:
> yes I did run ps -ef | grep "app_name" and it is root.
>
>
>
> On Sun, Oct 30, 2016 at 8:00 PM, Chan Chor Pang
You may know that those streams share the same keys, but Spark doesn't
unless you tell it.
mapWithState takes a StateSpec, which should allow you to specify a partitioner.
On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi wrote:
> Thanks for response,
>
> So as I understand
Can it handle state that is large than what memory will hold?
Hi Mich,
Thank you again for your reply.
As I see you are caching the table already sorted
>
> val keyValRDDSorted = keyValRDD.sortByKey().cache
>
> and the next stage is you are creating multiple tempTables (different
> ranges) that cache a subset of rows already cached in RDD. The data stored
If you call a transformation on an rdd using the same partitioner as that
rdd, no shuffle will occur. KafkaRDD doesn't have a partitioner, there's
no consistent partitioning scheme that works for all kafka uses. You can
wrap each kafkardd with an rdd that has a custom partitioner that you write
Hi Michael,
As I see you are caching the table already sorted
val keyValRDDSorted = keyValRDD.sortByKey().cache
and the next stage is you are creating multiple tempTables (different
ranges) that cache a subset of rows already cached in RDD. The data stored
in tempTable is in Hive columnar
Hello,
>From my point of view, it would be more efficient and probably i more
>"readible" if you just extracted the required data using some json parsing
>library (GSON, Jackson), construct some global object (or pre-process data),
>and then begin with the Spark operations.
Jan
From:
Hi Mich,
Thank you for your quick reply!
What type of table is the underlying table? Is it Hbase, Hive ORC or what?
>
It is a custom datasource, but ultimately backed by HBase.
> By Key you mean a UNIQUE ID or something similar and then you do multiple
> scans on the tempTable which stores
Hello All,
I am processing a nested complex Json and below is the schema for it.
root
|-- businessEntity: array (nullable = true)
||-- element: struct (containsNull = true)
|||-- payGroup: array (nullable = true)
||||-- element: struct (containsNull = true)
|||
The source of my problem is actually that I am running into the following
error. This error seems to happen after running my driver program for 4
hours.
"Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
"dag-scheduler-event-loop" Exception in thread "ForkJoinPool-50-worker-13"
Hi Michael.
What type of table is the underlying table? Is it Hbase, Hive ORC or what?
By Key you mean a UNIQUE ID or something similar and then you do multiple
scans on the tempTable which stores data using in-memory columnar format.
That is the optimisation of tempTable storage as far as I
ps -L [pid] is what shows threads. I am not sure this is counting what you
think it does. My shell process has about a hundred threads, and I can't
imagine why one would have thousands unless your app spawned them.
On Mon, Oct 31, 2016 at 10:20 AM kant kodali wrote:
> when I
when I do
ps -elfT | grep "spark-driver-program.jar" | wc -l
The result is around 32K. why does it create so many threads how can I
limit this?
Hello,
I've got a Spark SQL dataframe containing a "key" column. The queries I
want to run start by filtering on the key range. My question in outline: is
it possible to sort the dataset by key so as to do efficient key range
filters, before subsequently running a more complex SQL query?
I'm
This is my personal email so I can't exactly discuss work-related topics.
But yes, many teams in my company use Spark 2.0 in production environment.
What are the challenges that prevent you from adopting it (besides
migration from Spark 1.x)?
---
Regards,
Andy
On Mon, Oct 31, 2016 at 8:16
Hi all,
I'm using Spark Streaming mapWithState operation to do a stateful operation
on my Kafka stream (though I think similar arguments would apply for any
source).
Trying to understand a way to control mapWithState's partitioning schema.
My transformations are simple:
1) create KafkaDStream
Hi guys,
Just for personal interest. I wonder whether spark 2.0 a productive version? Is
there any company use this version as its main version in daily work? THX
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
with the following simple code
val a =
sc.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)]
val grouped = a.groupByKey({x:(Int,Int)=>x._1})
val mappedGroups = grouped.mapGroups((k,x)=>{(k,1)})
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>{
36 matches
Mail list logo