KMean clustering resulting Skewed Issue

2017-03-24 Thread Reth RM
Hi,

  I am using spark k mean for clustering records that consist of news
documents, vectors are created by applying tf-idf. Dataset that I am using
for testing right now is the gold-truth classified
http://qwone.com/~jason/20Newsgroups/

Issue is all the documents are getting assigned to same cluster and others
just have the vector(doc) picked as cluster center(skewed clustering). What
could be the possible reasons for the issue, any suggestions? Should I be
retuning the epsilon?


Re: unable to stream kafka messages

2017-03-24 Thread kaniska Mandal
Hi Michael,

Thanks much for the suggestion.

I was wondering - whats the best way to deserialize the 'value' field


On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust 
wrote:

> Encoders can only map data into an object if those columns already exist.
> When we are reading from Kafka, we just get a binary blob and you'll need
> to help Spark parse that first.  Assuming your data is stored in JSON it
> should be pretty straight forward.
>
> streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .withColumn("message", from_json(col("value").cast("string"),
> tweetSchema)) // cast the binary value to a string and parse it as json
>   .select("message.*") // unnest the json
>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
> lambda functions on the data using this class
>
> Here is some more info on working with JSON and other semi-structured
> formats
> 
> .
>
> On Fri, Mar 24, 2017 at 10:49 AM, kaniska 
> wrote:
>
>> Hi,
>>
>> Currently , encountering the following exception while working with
>> below-mentioned code snippet :
>>
>> > Please suggest the correct approach for reading the stream into a sql
>> > schema.
>> > If I add 'tweetSchema' while reading stream, it errors out with message
>> -
>> > we can not change static schema for kafka.
>>
>> 
>> ---
>>
>> *exception*
>>
>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>> timestampType, partition]*;
>> at
>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>> At.failAnalysis(package.scala:42)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>> 
>> 
>>
>> *structured streaming code snippet*
>>
>> String bootstrapServers = "localhost:9092";
>> String subscribeType = "subscribe";
>> String topics = "events";
>>
>> StructType tweetSchema = new StructType()
>> .add("tweetId", "string")
>> .add("tweetText", "string")
>> .add("location", "string")
>> .add("timestamp", "string");
>>
>>SparkSession spark = SparkSession
>>   .builder()
>>   .appName("StreamProcessor")
>>   .config("spark.master", "local")
>>   .getOrCreate();
>>
>>   Dataset streams = spark
>>   .readStream()
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers",
>> bootstrapServers)
>>   .option(subscribeType, topics)
>>   .load()
>>   .as(Encoders.bean(Tweet.class));
>>
>>  streams.createOrReplaceTempView("streamsData");
>>
>>String sql = "SELECT location,  COUNT(*) as count FROM
>> streamsData
>> GROUP BY location";
>>Dataset countsByLocation = spark.sql(sql);
>>
>> StreamingQuery query = countsByLocation.writeStream()
>>   .outputMode("complete")
>>   .format("console")
>>   .start();
>>
>> query.awaitTermination();
>> 
>> --
>>
>> *Tweet *
>>
>> Tweet.java - has public constructor and getter / setter methods
>>
>> public class Tweet implements Serializable{
>>
>> private String tweetId;
>> private String tweetText;
>> private String location;
>> private String timestamp;
>>
>> public Tweet(){
>>
>> }
>> .
>>
>> 
>> 
>>
>> *pom.xml *
>>
>>
>> 
>> org.apache.spark
>> spark-core_2.10
>> 2.1.0
>> 
>> 
>> org.apache.spark
>> spark-streaming_2.10
>> 2.1.0
>> 
>> 
>> org.apache.spark
>> spark-streaming-ka
>> fka-0-8_2.10
>> 2.1.0
>> 
>> 
>>

Re: how to read object field within json file

2017-03-24 Thread Yong Zhang
I missed the part to pass in a schema to force the "struct" to a Map, then use 
explode. Good option.


Yong



From: Michael Armbrust 
Sent: Friday, March 24, 2017 3:02 PM
To: Yong Zhang
Cc: Selvam Raman; user
Subject: Re: how to read object field within json file

I'm not sure you can parse this as an Array, but you can hint to the parser 
that you would like to treat source as a map instead of as a struct.  This is a 
good strategy when you have dynamic columns in your data.

Here is an example of the schema you can use to parse this JSON and also how to 
use explode to turn it into separate 
rows.
  This blog post has more on working with semi-structured data in 
Spark.

On Thu, Mar 23, 2017 at 2:49 PM, Yong Zhang 
> wrote:

That's why your "source" should be defined as an Array[Struct] type (which 
makes sense in this case, it has an undetermined length  , so you can explode 
it and get the description easily.

Now you need write your own UDF, maybe can do what you want.

Yong


From: Selvam Raman >
Sent: Thursday, March 23, 2017 5:03 PM
To: user
Subject: how to read object field within json file

Hi,

{
"id": "test1",
"source": {
"F1": {
  "id": "4970",
  "eId": "F1",
  "description": "test1",
},
"F2": {
  "id": "5070",
  "eId": "F2",
  "description": "test2",
},
"F3": {
  "id": "5170",
  "eId": "F3",
  "description": "test3",
},
"F4":{}
  etc..
  "F999":{}
}

I am having bzip json files like above format.
some json row contains two objects within source(like F1 and F2), sometime 
five(F1,F2,F3,F4,F5),etc. So the final schema will contains combination of all 
objects for the source field.

Now, every row will contain n number of objects but only some contains valid 
records.
how can i retreive the value of "description" in "source" field.

source.F1.description - returns the result but how can i get all description 
result for every row..(something like this "source.*.description").

--
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"



Re: EXT: Multiple cores/executors in Pyspark standalone mode

2017-03-24 Thread Li Jin
Thanks for the reply. Yeah I found the same doc and am able to use multiple
cores in spark-shell, however, when I use pyspark, it appears only to use
one core, I am wondering if this is something I did't configure correctly
or something supported in pyspark.

On Fri, Mar 24, 2017 at 3:52 PM, Kadam, Gangadhar (GE Aviation, Non-GE) <
gangadhar.ka...@ge.com> wrote:

> In Local Mode  all processes are executed inside a single JVM.
> Application is started in a local mode by setting master to local,
> local[*] or local[n].
> spark.executor.cores and spark.executor.cores are not applicable in the
> local mode because there is only one embedded executor.
>
>
> In Standalone mode, you need  standalone Spark cluster<
> https://spark.apache.org/docs/latest/spark-standalone.html>.
>
> It requires a master node (can be started using
> SPARK_HOME/sbin/start-master.sh script) and at least one worker node (can
> be started using SPARK_HOME/sbin/start-slave.sh script).SparkConf should
> use master node address to create (spark://host:port)
>
> Thanks!
>
> Gangadhar
> From: Li Jin >
> Date: Friday, March 24, 2017 at 3:43 PM
> To: "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: EXT: Multiple cores/executors in Pyspark standalone mode
>
> Hi,
>
> I am wondering does pyspark standalone (local) mode support multi
> cores/executors?
>
> Thanks,
> Li
>


Re: EXT: Multiple cores/executors in Pyspark standalone mode

2017-03-24 Thread Kadam, Gangadhar (GE Aviation, Non-GE)
In Local Mode  all processes are executed inside a single JVM.
Application is started in a local mode by setting master to local, local[*] or 
local[n].
spark.executor.cores and spark.executor.cores are not applicable in the local 
mode because there is only one embedded executor.


In Standalone mode, you need  standalone Spark 
cluster.

It requires a master node (can be started using SPARK_HOME/sbin/start-master.sh 
script) and at least one worker node (can be started using 
SPARK_HOME/sbin/start-slave.sh script).SparkConf should use master node address 
to create (spark://host:port)

Thanks!

Gangadhar
From: Li Jin >
Date: Friday, March 24, 2017 at 3:43 PM
To: "user@spark.apache.org" 
>
Subject: EXT: Multiple cores/executors in Pyspark standalone mode

Hi,

I am wondering does pyspark standalone (local) mode support multi 
cores/executors?

Thanks,
Li

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Multiple cores/executors in Pyspark standalone mode

2017-03-24 Thread Li Jin
Hi,

I am wondering does pyspark standalone (local) mode support multi
cores/executors?

Thanks,
Li


Re: unable to stream kafka messages

2017-03-24 Thread Michael Armbrust
Encoders can only map data into an object if those columns already exist.
When we are reading from Kafka, we just get a binary blob and you'll need
to help Spark parse that first.  Assuming your data is stored in JSON it
should be pretty straight forward.

streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .withColumn("message", from_json(col("value").cast("string"),
tweetSchema)) // cast the binary value to a string and parse it as json
  .select("message.*") // unnest the json
  .as(Encoders.bean(Tweet.class)) // only required if you want to use
lambda functions on the data using this class

Here is some more info on working with JSON and other semi-structured
formats

.

On Fri, Mar 24, 2017 at 10:49 AM, kaniska  wrote:

> Hi,
>
> Currently , encountering the following exception while working with
> below-mentioned code snippet :
>
> > Please suggest the correct approach for reading the stream into a sql
> > schema.
> > If I add 'tweetSchema' while reading stream, it errors out with message -
> > we can not change static schema for kafka.
>
> 
> ---
>
> *exception*
>
> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
> '`location`' given input columns: [topic, timestamp, key, offset, value,
> timestampType, partition]*;
> at
> org.apache.spark.sql.catalyst.analysis.package$
> AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(
> CheckAnalysis.scala:77)
> 
> 
>
> *structured streaming code snippet*
>
> String bootstrapServers = "localhost:9092";
> String subscribeType = "subscribe";
> String topics = "events";
>
> StructType tweetSchema = new StructType()
> .add("tweetId", "string")
> .add("tweetText", "string")
> .add("location", "string")
> .add("timestamp", "string");
>
>SparkSession spark = SparkSession
>   .builder()
>   .appName("StreamProcessor")
>   .config("spark.master", "local")
>   .getOrCreate();
>
>   Dataset streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers",
> bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .as(Encoders.bean(Tweet.class));
>
>  streams.createOrReplaceTempView("streamsData");
>
>String sql = "SELECT location,  COUNT(*) as count FROM
> streamsData
> GROUP BY location";
>Dataset countsByLocation = spark.sql(sql);
>
> StreamingQuery query = countsByLocation.writeStream()
>   .outputMode("complete")
>   .format("console")
>   .start();
>
> query.awaitTermination();
> 
> --
>
> *Tweet *
>
> Tweet.java - has public constructor and getter / setter methods
>
> public class Tweet implements Serializable{
>
> private String tweetId;
> private String tweetText;
> private String location;
> private String timestamp;
>
> public Tweet(){
>
> }
> .
>
> 
> 
>
> *pom.xml *
>
>
> 
> org.apache.spark
> spark-core_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-streaming_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-streaming-
> kafka-0-8_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-sql_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-sql-kafka-0-10_2.10
> 2.1.0
> 
> 
> 
>
>

Re: how to read object field within json file

2017-03-24 Thread Michael Armbrust
I'm not sure you can parse this as an Array, but you can hint to the parser
that you would like to treat source as a map instead of as a struct.  This
is a good strategy when you have dynamic columns in your data.

Here is an example of the schema you can use to parse this JSON and also
how to use explode to turn it into separate rows
.
This blog post has more on working with semi-structured data in Spark

.

On Thu, Mar 23, 2017 at 2:49 PM, Yong Zhang  wrote:

> That's why your "source" should be defined as an Array[Struct] type (which
> makes sense in this case, it has an undetermined length  , so you can
> explode it and get the description easily.
>
> Now you need write your own UDF, maybe can do what you want.
>
> Yong
>
> --
> *From:* Selvam Raman 
> *Sent:* Thursday, March 23, 2017 5:03 PM
> *To:* user
> *Subject:* how to read object field within json file
>
> Hi,
>
> {
> "id": "test1",
> "source": {
> "F1": {
>   "id": "4970",
>   "eId": "F1",
>   "description": "test1",
> },
> "F2": {
>   "id": "5070",
>   "eId": "F2",
>   "description": "test2",
> },
> "F3": {
>   "id": "5170",
>   "eId": "F3",
>   "description": "test3",
> },
> "F4":{}
>   etc..
>   "F999":{}
> }
>
> I am having bzip json files like above format.
> some json row contains two objects within source(like F1 and F2), sometime
> five(F1,F2,F3,F4,F5),etc. So the final schema will contains combination of
> all objects for the source field.
>
> Now, every row will contain n number of objects but only some contains
> valid records.
> how can i retreive the value of "description" in "source" field.
>
> source.F1.description - returns the result but how can i get all
> description result for every row..(something like this
> "source.*.description").
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


unable to stream kafka messages

2017-03-24 Thread kaniska
Hi,

Currently , encountering the following exception while working with
below-mentioned code snippet :

> Please suggest the correct approach for reading the stream into a sql
> schema.
> If I add 'tweetSchema' while reading stream, it errors out with message -
> we can not change static schema for kafka.

---

*exception*

Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
'`location`' given input columns: [topic, timestamp, key, offset, value,
timestampType, partition]*;
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)


*structured streaming code snippet*

String bootstrapServers = "localhost:9092";
String subscribeType = "subscribe";
String topics = "events";

StructType tweetSchema = new StructType()
.add("tweetId", "string")
.add("tweetText", "string")
.add("location", "string")
.add("timestamp", "string");

   SparkSession spark = SparkSession
  .builder()
  .appName("StreamProcessor")
  .config("spark.master", "local")
  .getOrCreate();

  Dataset streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", 
bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .as(Encoders.bean(Tweet.class));

 streams.createOrReplaceTempView("streamsData");
   
   String sql = "SELECT location,  COUNT(*) as count FROM 
streamsData
GROUP BY location";
   Dataset countsByLocation = spark.sql(sql);

StreamingQuery query = countsByLocation.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
--

*Tweet *

Tweet.java - has public constructor and getter / setter methods

public class Tweet implements Serializable{

private String tweetId;
private String tweetText;
private String location;
private String timestamp;

public Tweet(){

}
.  



*pom.xml *



org.apache.spark
spark-core_2.10
2.1.0


org.apache.spark
spark-streaming_2.10
2.1.0


org.apache.spark
spark-streaming-kafka-0-8_2.10
2.1.0


org.apache.spark
spark-sql_2.10
2.1.0


org.apache.spark
spark-sql-kafka-0-10_2.10
2.1.0





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Application kill from UI do not propagate exception

2017-03-24 Thread Noorul Islam Kamal Malmiyoda
Hi all,

I am trying to trap UI kill event of a spark application from driver.
Some how the exception thrown is not propagated to the driver main
program. See for example using spark-shell below.

Is there a way to get hold of this event and shutdown the driver program?

Regards,
Noorul


spark@spark1:~/spark-2.1.0/sbin$ spark-shell --master spark://10.29.83.162:7077
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
17/03/23 15:16:47 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
17/03/23 15:16:53 WARN ObjectStore: Failed to get database
global_temp, returning NoSuchObjectException
Spark context Web UI available at http://10.29.83.162:4040
Spark context available as 'sc' (master = spark://10.29.83.162:7077,
app id = app-20170323151648-0002).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 17/03/23 15:17:28 ERROR StandaloneSchedulerBackend: Application
has been killed. Reason: Master removed our application: KILLED
17/03/23 15:17:28 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster
scheduler: Master removed our application: KILLED
at 
org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:459)
at 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:139)
at 
org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254)
at 
org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:168)
at 
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
at 
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@25b8f9d2

scala>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Yong Zhang
Not sure if anyone else here can help you. But if I were you, I will adjust 
SPARK_DAEMON_MEMORY to 2g, to bump the worker to 2G. Even though the worker's 
responsibility is very limited, but in today's world, who knows. Give 2g a try 
to see if the problem goes away.


BTW, in our production, I set the worker to 2g, and never experienced any OOM 
from workers. Our cluster is live for more than 1 year, and we also use Spark 
1.6.2 on production.


Yong



From: Behroz Sikander 
Sent: Friday, March 24, 2017 9:29 AM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Yea we also didn't find anything related to this online.

Are you aware of any memory leaks in worker in 1.6.2 spark which might be 
causing this ?
Do you know of any documentation which explains all the tasks that a worker is 
performing ? Maybe we can get some clue from there.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:21 PM, Yong Zhang 
> wrote:

I never experienced worker OOM or very rarely see this online. So my guess that 
you have to generate the heap dump file to analyze it.


Yong



From: Behroz Sikander >
Sent: Friday, March 24, 2017 9:15 AM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Thank you for the response.

Yes, I am sure because the driver was working fine. Only 2 workers went down 
with OOM.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:12 PM, Yong Zhang 
> wrote:

I am not 100% sure, but normally "dispatcher-event-loop" OOM means the driver 
OOM. Are you sure your workers OOM?


Yong



From: bsikander >
Sent: Friday, March 24, 2017 5:48 AM
To: user@spark.apache.org
Subject: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Spark version: 1.6.2
Hadoop: 2.6.0

Cluster:
All VMS are deployed on AWS.
1 Master (t2.large)
1 Secondary Master (t2.large)
5 Workers (m4.xlarge)
Zookeeper (t2.large)

Recently, 2 of our workers went down with out of memory exception.
java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)

Both of these worker processes were in hanged state. We restarted them to
bring them back to normal state.

Here is the complete exception
https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]

Worker 
crashing
gist.github.com
Worker crashing




Master's spark-default.conf file:
https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]

Default Configuration file for 
MASTER
gist.github.com
Default Configuration file for MASTER




Master's spark-env.sh
https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0

Slave's spark-default.conf file:
https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c

So, what could be the reason of our workers crashing due to OutOfMemory ?
How can we avoid that in future.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-overhead-limit-execeeded-tp28535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Behroz Sikander
Yea we also didn't find anything related to this online.

Are you aware of any memory leaks in worker in 1.6.2 spark which might be
causing this ?
Do you know of any documentation which explains all the tasks that a worker
is performing ? Maybe we can get some clue from there.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:21 PM, Yong Zhang  wrote:

> I never experienced worker OOM or very rarely see this online. So my guess
> that you have to generate the heap dump file to analyze it.
>
>
> Yong
>
>
> --
> *From:* Behroz Sikander 
> *Sent:* Friday, March 24, 2017 9:15 AM
> *To:* Yong Zhang
> *Cc:* user@spark.apache.org
> *Subject:* Re: [Worker Crashing] OutOfMemoryError: GC overhead limit
> execeeded
>
> Thank you for the response.
>
> Yes, I am sure because the driver was working fine. Only 2 workers went
> down with OOM.
>
> Regards,
> Behroz
>
> On Fri, Mar 24, 2017 at 2:12 PM, Yong Zhang  wrote:
>
>> I am not 100% sure, but normally "dispatcher-event-loop" OOM means the
>> driver OOM. Are you sure your workers OOM?
>>
>>
>> Yong
>>
>>
>> --
>> *From:* bsikander 
>> *Sent:* Friday, March 24, 2017 5:48 AM
>> *To:* user@spark.apache.org
>> *Subject:* [Worker Crashing] OutOfMemoryError: GC overhead limit
>> execeeded
>>
>> Spark version: 1.6.2
>> Hadoop: 2.6.0
>>
>> Cluster:
>> All VMS are deployed on AWS.
>> 1 Master (t2.large)
>> 1 Secondary Master (t2.large)
>> 5 Workers (m4.xlarge)
>> Zookeeper (t2.large)
>>
>> Recently, 2 of our workers went down with out of memory exception.
>> java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)
>>
>> Both of these worker processes were in hanged state. We restarted them to
>> bring them back to normal state.
>>
>> Here is the complete exception
>> https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
>> 
>> Worker crashing
>> 
>> gist.github.com
>> Worker crashing
>>
>>
>>
>> Master's spark-default.conf file:
>> https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
>> 
>> Default Configuration file for MASTER
>> 
>> gist.github.com
>> Default Configuration file for MASTER
>>
>>
>>
>> Master's spark-env.sh
>> https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0
>>
>> Slave's spark-default.conf file:
>> https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c
>>
>> So, what could be the reason of our workers crashing due to OutOfMemory ?
>> How can we avoid that in future.
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-ov
>> erhead-limit-execeeded-tp28535.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Yong Zhang
I never experienced worker OOM or very rarely see this online. So my guess that 
you have to generate the heap dump file to analyze it.


Yong



From: Behroz Sikander 
Sent: Friday, March 24, 2017 9:15 AM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Thank you for the response.

Yes, I am sure because the driver was working fine. Only 2 workers went down 
with OOM.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:12 PM, Yong Zhang 
> wrote:

I am not 100% sure, but normally "dispatcher-event-loop" OOM means the driver 
OOM. Are you sure your workers OOM?


Yong



From: bsikander >
Sent: Friday, March 24, 2017 5:48 AM
To: user@spark.apache.org
Subject: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Spark version: 1.6.2
Hadoop: 2.6.0

Cluster:
All VMS are deployed on AWS.
1 Master (t2.large)
1 Secondary Master (t2.large)
5 Workers (m4.xlarge)
Zookeeper (t2.large)

Recently, 2 of our workers went down with out of memory exception.
java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)

Both of these worker processes were in hanged state. We restarted them to
bring them back to normal state.

Here is the complete exception
https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]

Worker 
crashing
gist.github.com
Worker crashing




Master's spark-default.conf file:
https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]

Default Configuration file for 
MASTER
gist.github.com
Default Configuration file for MASTER




Master's spark-env.sh
https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0

Slave's spark-default.conf file:
https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c

So, what could be the reason of our workers crashing due to OutOfMemory ?
How can we avoid that in future.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-overhead-limit-execeeded-tp28535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: spark-submit config via file

2017-03-24 Thread Yong Zhang
Of course it is possible.


You can always to set any configurations in your application using API, instead 
of pass in through the CLI.


val sparkConf = new 
SparkConf().setAppName(properties.get("appName")).set("master", 
properties.get("master")).set(xxx, properties.get("xxx"))

Your error is your environment problem.

Yong

From: , Roy 
Sent: Friday, March 24, 2017 7:38 AM
To: user
Subject: spark-submit config via file

Hi,

I am trying to deploy spark job by using spark-submit which has bunch of 
parameters like

spark-submit --class StreamingEventWriterDriver --master yarn --deploy-mode 
cluster --executor-memory 3072m --executor-cores 4 --files streaming.conf 
spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf "streaming.conf"

I was looking a way to put all these flags in the file to pass to spark-submit 
to make my spark-submitcommand simple like this

spark-submit --class StreamingEventWriterDriver --master yarn --deploy-mode 
cluster --properties-file properties.conf --files streaming.conf 
spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf "streaming.conf"

properties.conf has following contents


spark.executor.memory 3072m

spark.executor.cores 4


But I am getting following error


17/03/24 11:36:26 INFO Client: Use hdfs cache file as spark.yarn.archive for 
HDP, 
hdfsCacheFile:hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

17/03/24 11:36:26 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for 
Delete operation as thread count 0 is <= 1

17/03/24 11:36:26 INFO AzureFileSystemThreadPoolExecutor: Time taken for Delete 
operation is: 1 ms with threads: 0

17/03/24 11:36:27 INFO Client: Deleted staging directory 
wasb://a...@abc.blob.core.windows.net/user/sshuser/.sparkStaging/application_1488402758319_0492

Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no host: 
hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:154)

at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2791)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)

at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2825)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)

at 
org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:364)

at 
org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:480)

at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:552)

at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:881)

at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:170)

at org.apache.spark.deploy.yarn.Client.run(Client.scala:1218)

at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1277)

at org.apache.spark.deploy.yarn.Client.main(Client.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:745)

at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

17/03/24 11:36:27 INFO MetricsSystemImpl: Stopping azure-file-system metrics 
system...

Anyone know is this is even possible ?


Thanks...

Roy


Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Behroz Sikander
Thank you for the response.

Yes, I am sure because the driver was working fine. Only 2 workers went
down with OOM.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:12 PM, Yong Zhang  wrote:

> I am not 100% sure, but normally "dispatcher-event-loop" OOM means the
> driver OOM. Are you sure your workers OOM?
>
>
> Yong
>
>
> --
> *From:* bsikander 
> *Sent:* Friday, March 24, 2017 5:48 AM
> *To:* user@spark.apache.org
> *Subject:* [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded
>
> Spark version: 1.6.2
> Hadoop: 2.6.0
>
> Cluster:
> All VMS are deployed on AWS.
> 1 Master (t2.large)
> 1 Secondary Master (t2.large)
> 5 Workers (m4.xlarge)
> Zookeeper (t2.large)
>
> Recently, 2 of our workers went down with out of memory exception.
> java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)
>
> Both of these worker processes were in hanged state. We restarted them to
> bring them back to normal state.
>
> Here is the complete exception
> https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
> 
> Worker crashing
> 
> gist.github.com
> Worker crashing
>
>
>
> Master's spark-default.conf file:
> https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
> 
> Default Configuration file for MASTER
> 
> gist.github.com
> Default Configuration file for MASTER
>
>
>
> Master's spark-env.sh
> https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0
>
> Slave's spark-default.conf file:
> https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c
>
> So, what could be the reason of our workers crashing due to OutOfMemory ?
> How can we avoid that in future.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-
> overhead-limit-execeeded-tp28535.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Yong Zhang
I am not 100% sure, but normally "dispatcher-event-loop" OOM means the driver 
OOM. Are you sure your workers OOM?


Yong



From: bsikander 
Sent: Friday, March 24, 2017 5:48 AM
To: user@spark.apache.org
Subject: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

Spark version: 1.6.2
Hadoop: 2.6.0

Cluster:
All VMS are deployed on AWS.
1 Master (t2.large)
1 Secondary Master (t2.large)
5 Workers (m4.xlarge)
Zookeeper (t2.large)

Recently, 2 of our workers went down with out of memory exception.
java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)

Both of these worker processes were in hanged state. We restarted them to
bring them back to normal state.

Here is the complete exception
https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]

Worker 
crashing
gist.github.com
Worker crashing




Master's spark-default.conf file:
https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
[https://avatars1.githubusercontent.com/u/4642104?v=3=400]

Default Configuration file for 
MASTER
gist.github.com
Default Configuration file for MASTER




Master's spark-env.sh
https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0

Slave's spark-default.conf file:
https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c

So, what could be the reason of our workers crashing due to OutOfMemory ?
How can we avoid that in future.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-overhead-limit-execeeded-tp28535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark-submit config via file

2017-03-24 Thread , Roy
Hi,

I am trying to deploy spark job by using spark-submit which has bunch of
parameters like

spark-submit --class StreamingEventWriterDriver --master yarn --deploy-mode
cluster --executor-memory 3072m --executor-cores 4 --files streaming.conf
spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf "streaming.conf"

I was looking a way to put all these flags in the file to pass to
spark-submit to make my spark-submitcommand simple like this

spark-submit --class StreamingEventWriterDriver --master yarn --deploy-mode
cluster --properties-file properties.conf --files streaming.conf
spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf "streaming.conf"

properties.conf has following contents


spark.executor.memory 3072m

spark.executor.cores 4


But I am getting following error


17/03/24 11:36:26 INFO Client: Use hdfs cache file as spark.yarn.archive
for HDP,
hdfsCacheFile:hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

17/03/24 11:36:26 WARN AzureFileSystemThreadPoolExecutor: Disabling threads
for Delete operation as thread count 0 is <= 1

17/03/24 11:36:26 INFO AzureFileSystemThreadPoolExecutor: Time taken for
Delete operation is: 1 ms with threads: 0

17/03/24 11:36:27 INFO Client: Deleted staging directory wasb://
a...@abc.blob.core.windows.net/user/sshuser/.sparkStaging/application_1488402758319_0492

Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no
host: hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:154)

at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2791)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)

at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2825)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)

at
org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:364)

at org.apache.spark.deploy.yarn.Client.org
$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:480)

at
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:552)

at
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:881)

at
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:170)

at org.apache.spark.deploy.yarn.Client.run(Client.scala:1218)

at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1277)

at org.apache.spark.deploy.yarn.Client.main(Client.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:745)

at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)

at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

17/03/24 11:36:27 INFO MetricsSystemImpl: Stopping azure-file-system
metrics system...

Anyone know is this is even possible ?


Thanks...

Roy


Re: Spark 2.0.2 : Hang at "org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)"

2017-03-24 Thread Ravindra
Also noticed that there are 8 - "dispatcher-event-loop-0  7" and 8 -
"map-output-dispatcher-0  7" all waiting at the same location in the
code that is -
*sun.misc.Unsafe.park(Native Method)*
*java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)*
*java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)*
*java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)*
org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

So clearly there is a race condition. May be only option is to avoid it...
but how ??


On Fri, Mar 24, 2017 at 5:40 PM Ravindra  wrote:

> Hi All,
>
> My Spark job hangs here... Looking into the thread dump I noticed that it
> hangs here (stack trace given below) on the count action on dataframe
> (given below). Data is very small. Its actually not more than even 10 rows.
>
> I noticed some JIRAs about this issue but all are resolved-closed in
> previous versions.
>
> Its running with 1 executor. Also noticed that the Storage tab is empty so
> no dataframe is cached.
>
> Looking into the DAGScheduler, I notice its stuck at runJob, probably its
> trying to run tasks concurrently and waiting here
>
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
> *org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)*
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> org.apache.spark.rdd.RDD.collect(RDD.scala:911)
>
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
>
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
>
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
> org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
> org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
> org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
> org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
>
>
> Spark Properties
> NameValue
> spark.app.id local-1490350724879
> spark.app.name TestJob
> spark.default.parallelism 1
> spark.driver.allowMultipleContexts true
> spark.driver.host localhost
> spark.driver.memory 4g
> spark.driver.port 63405
> spark.executor.id driver
> spark.executor.memory 4g
> spark.hadoop.validateOutputSpecs false
> spark.master local[2]
> spark.scheduler.mode FIFO
> spark.sql.catalogImplementation hive
> spark.sql.crossJoin.enabled true
> spark.sql.shuffle.partitions 1
> spark.sql.warehouse.dir /tmp/hive/spark-warehouse
> spark.ui.enabled true
> spark.yarn.executor.memoryOverhead 2048
>
> Its count action on the given below dataframe -
> == Parsed Logical Plan ==
> Project [hash_composite_keys#27440L, id#27441, name#27442, master#27439,
> created_time#27448]
> +- Filter (rn#27493 = 1)
>+- Project [hash_composite_keys#27440L, id#27441, name#27442,
> master#27439, created_time#27448, rn#27493]
>   +- Project [hash_composite_keys#27440L, id#27441, name#27442,
> master#27439, created_time#27448, rn#27493, rn#27493]
>  +- Window [rownumber()
> windowspecdefinition(hash_composite_keys#27440L, created_time#27448 ASC,
> ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#27493],
> [hash_composite_keys#27440L], [created_time#27448 ASC]
> +- Project [hash_composite_keys#27440L, id#27441, 

Spark 2.0.2 : Hang at "org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)"

2017-03-24 Thread Ravindra
Hi All,

My Spark job hangs here... Looking into the thread dump I noticed that it
hangs here (stack trace given below) on the count action on dataframe
(given below). Data is very small. Its actually not more than even 10 rows.

I noticed some JIRAs about this issue but all are resolved-closed in
previous versions.

Its running with 1 executor. Also noticed that the Storage tab is empty so
no dataframe is cached.

Looking into the DAGScheduler, I notice its stuck at runJob, probably its
trying to run tasks concurrently and waiting here

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
*org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)*
org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
org.apache.spark.rdd.RDD.collect(RDD.scala:911)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
org.apache.spark.sql.Dataset.count(Dataset.scala:2226)


Spark Properties
NameValue
spark.app.id local-1490350724879
spark.app.name TestJob
spark.default.parallelism 1
spark.driver.allowMultipleContexts true
spark.driver.host localhost
spark.driver.memory 4g
spark.driver.port 63405
spark.executor.id driver
spark.executor.memory 4g
spark.hadoop.validateOutputSpecs false
spark.master local[2]
spark.scheduler.mode FIFO
spark.sql.catalogImplementation hive
spark.sql.crossJoin.enabled true
spark.sql.shuffle.partitions 1
spark.sql.warehouse.dir /tmp/hive/spark-warehouse
spark.ui.enabled true
spark.yarn.executor.memoryOverhead 2048

Its count action on the given below dataframe -
== Parsed Logical Plan ==
Project [hash_composite_keys#27440L, id#27441, name#27442, master#27439,
created_time#27448]
+- Filter (rn#27493 = 1)
   +- Project [hash_composite_keys#27440L, id#27441, name#27442,
master#27439, created_time#27448, rn#27493]
  +- Project [hash_composite_keys#27440L, id#27441, name#27442,
master#27439, created_time#27448, rn#27493, rn#27493]
 +- Window [rownumber()
windowspecdefinition(hash_composite_keys#27440L, created_time#27448 ASC,
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#27493],
[hash_composite_keys#27440L], [created_time#27448 ASC]
+- Project [hash_composite_keys#27440L, id#27441, name#27442,
master#27439, created_time#27448]
   +- Union
  :- Project [hash_composite_keys#27440L, id#27441,
name#27442, master#27439, cast(0 as timestamp) AS created_time#27448]
  :  +- Project [hash_composite_keys#27440L, id#27441,
name#27442, master#27439]
  : +- SubqueryAlias table1
  :+-
Relation[hash_composite_keys#27440L,id#27441,name#27442,master#27439]
parquet
  +- Project [hash_composite_keys#27391L, id#27397,
name#27399, master#27401, created_time#27414]
 +- Project [id#27397, name#27399,
hash_composite_keys#27391L, master#27401, cast(if
(isnull(created_time#27407L)) null else UDF(created_time#27407L) as
timestamp) AS created_time#27414]
+- Project [id#27397, name#27399,
hash_composite_keys#27391L, master#27401, 1490350732895 AS
created_time#27407L]
   +- Project [id#27397, name#27399,
hash_composite_keys#27391L, master AS master#27401]
  +- Aggregate 

[Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread bsikander
Spark version: 1.6.2
Hadoop: 2.6.0

Cluster:
All VMS are deployed on AWS.
1 Master (t2.large)
1 Secondary Master (t2.large)
5 Workers (m4.xlarge)
Zookeeper (t2.large)

Recently, 2 of our workers went down with out of memory exception. 
java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)

Both of these worker processes were in hanged state. We restarted them to
bring them back to normal state.

Here is the complete exception 
https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91

Master's spark-default.conf file: 
https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d

Master's spark-env.sh
https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0

Slave's spark-default.conf file:
https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c

So, what could be the reason of our workers crashing due to OutOfMemory ?
How can we avoid that in future.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-overhead-limit-execeeded-tp28535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



skipping header in multiple files

2017-03-24 Thread nayan sharma
Hi,
I wanted to skip all the headers of CSVs present in a directory.
After searching on Google I got to know that it can be done using 
sc.wholetextfiles.

Can any one suggest me how to do that in Scala.?

Thanks & Regards,
Nayan Sharma
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

2017-03-24 Thread Georg Heiler
Maybe an udf to flatten is an interesting option as well.
http://stackoverflow.com/q/42888711/2587904 would a uadf very more
performant?
shyla deshpande  schrieb am Fr. 24. März 2017 um
04:04:

> Thanks a million Yong. Great help!!! It solved my problem.
>
> On Thu, Mar 23, 2017 at 6:00 PM, Yong Zhang  wrote:
>
> Change:
>
> val arrayinput = input.getAs[Array[String]](0)
>
> to:
>
> val arrayinput = input.getAs[*Seq*[String]](0)
>
>
> Yong
>
>
> --
> *From:* shyla deshpande 
> *Sent:* Thursday, March 23, 2017 8:18 PM
> *To:* user
> *Subject:* Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!
>
> This is my input data. The UDAF needs to aggregate the goals for a team
> and return a map that  gives the count for every goal in the team.
> I am getting the following error
>
> java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
> cannot be cast to [Ljava.lang.String;
> at com.whil.common.GoalAggregator.update(GoalAggregator.scala:27)
>
> +--+--+
> |teamid|goals |
> +--+--+
> |t1|[Goal1, Goal2]|
> |t1|[Goal1, Goal3]|
> |t2|[Goal1, Goal2]|
> |t3|[Goal2, Goal3]|
> +--+--+
>
> root
>  |-- teamid: string (nullable = true)
>  |-- goals: array (nullable = true)
>  ||-- element: string (containsNull = true)
>
> /Calling the UDAF//
>
> object TestUDAF {
>   def main(args: Array[String]): Unit = {
>
> val spark = SparkSession
>   .builder
>   .getOrCreate()
>
> val sc: SparkContext = spark.sparkContext
> val sqlContext = spark.sqlContext
>
> import sqlContext.implicits._
>
> val data = Seq(
>   ("t1", Seq("Goal1", "Goal2")),
>   ("t1", Seq("Goal1", "Goal3")),
>   ("t2", Seq("Goal1", "Goal2")),
>   ("t3", Seq("Goal2", "Goal3"))).toDF("teamid","goals")
>
> data.show(truncate = false)
> data.printSchema()
>
> import spark.implicits._
>
> val sumgoals = new GoalAggregator
> val result = data.groupBy("teamid").agg(sumgoals(col("goals")))
>
> result.show(truncate = false)
>
>   }
> }
>
> ///UDAF/
>
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
>
> class GoalAggregator extends UserDefinedAggregateFunction{
>
>   override def inputSchema: org.apache.spark.sql.types.StructType =
>   StructType(StructField("value", ArrayType(StringType)) :: Nil)
>
>   override def bufferSchema: StructType = StructType(
>   StructField("combined", MapType(StringType,IntegerType)) :: Nil
>   )
>
>   override def dataType: DataType = MapType(StringType,IntegerType)
>
>   override def deterministic: Boolean = true
>
>   override def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer.update(0, Map[String, Integer]())
>   }
>
>   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> val mapbuf = buffer.getAs[Map[String, Int]](0)
> val arrayinput = input.getAs[Array[String]](0)
> val result = mapbuf ++ arrayinput.map(goal => {
>   val cnt  = mapbuf.get(goal).getOrElse(0) + 1
>   goal -> cnt
> })
> buffer.update(0, result)
>   }
>
>   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 
> {
> val map1 = buffer1.getAs[Map[String, Int]](0)
> val map2 = buffer2.getAs[Map[String, Int]](0)
> val result = map1 ++ map2.map { case (k,v) =>
>   val cnt = map1.get(k).getOrElse(0) + 1
>   k -> cnt
> }
> buffer1.update(0, result)
>   }
>
>   override def evaluate(buffer: Row): Any = {
> buffer.getAs[Map[String, Int]](0)
>   }
> }
>
>
>
>
>