RE: Could Spark batch processing live within Spark Streaming?

2015-06-12 Thread prajod.vettiyattil
Hi Raj,

What you need seems to be an event based initiation of a DStream. Have not seen 
one yet. There are many types of DStreams that Spark implements. You can also 
implement your own. InputDStream is a close match for your requirement.

See this for the available options with InputDStream:
https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html
You could use a QueueInputDStream. It will wait for a message to arrive at a 
configured Queue before it can start processing.

Another requirements could be to send an even from one SparkContext(your batch 
context) to another(your streaming context). I have not seen a build in API 
based solution for this. Broadcast 
variables(https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)
 are tied to the sc which created it. There are a couple of adhoc approaches I 
have seen: 
http://mail-archives.apache.org/mod_mbox/spark-dev/201410.mbox/%3ccadtdqqk-wv9xdeeb-qh3biltjf0a46a5kuqmt2njzwvwuhg...@mail.gmail.com%3E

Prajod




From: diplomatic Guru [mailto:diplomaticg...@gmail.com]
Sent: 11 June 2015 18:55
To: user@spark.apache.org
Subject: Could Spark batch processing live within Spark Streaming?

Hello all,

I was wondering if it is possible to have a high latency batch processing job 
coexists within Spark Streaming job? If it's possible then could we share the 
state of the batch job with the Spark Streaming job?

For example when the long-running batch computation is complete, could we 
inform that Spark streaming that batch job is complete?

Kind regards,

Raj
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


Re: Reading Really Big File Stream from HDFS

2015-06-12 Thread Saisai Shao
Using sc.textFile will also read the file from HDFS one by one line through
iterator, don't need to fit all into memory, even you have small size of
memory, it still can be worked.

2015-06-12 13:19 GMT+08:00 SLiZn Liu sliznmail...@gmail.com:

 Hmm, you have a good point. So should I load the file by `sc.textFile()`
 and specify a high number of partitions, and the file is then split into
 partitions in memory across the cluster?

 On Thu, Jun 11, 2015 at 9:27 PM ayan guha guha.a...@gmail.com wrote:

 Why do you need to use stream in this use case? 50g need not to be in
 memory. Give it a try with high number of partitions.
 On 11 Jun 2015 23:09, SLiZn Liu sliznmail...@gmail.com wrote:

 Hi Spark Users,

 I'm trying to load a literally big file (50GB when compressed as gzip
 file, stored in HDFS) by receiving a DStream using `ssc.textFileStream`, as
 this file cannot be fitted in my memory. However, it looks like no RDD will
 be received until I copy this big file to a prior-specified location on
 HDFS. Ideally, I'd like read this file by a small number of lines at a
 time, but receiving a file stream requires additional writing to HDFS. Any
 idea to achieve this?

 BEST REGARDS,
 Todd Leo




Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng Lian
Thanks for the extra details and explanations Chaocai, will try to 
reproduce this when I got chance.


Cheng

On 6/12/15 3:44 PM, 姜超才 wrote:
I said OOM occurred on slave node, because I monitored memory 
utilization during the query task, on driver, very few memory was 
ocupied. And i remember i have ever seen the OOM stderr log on slave 
node.


But recently there seems no OOM log on slave node.
Follow the cmd 、data 、env and the code I gave you, the OOM can 100% 
repro on cluster mode.


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian l...@databricks.com
*收件人:* 姜超才 jiangchao...@haiyisoft.com, Hester wang 
hester9...@gmail.com, user@spark.apache.org
*主题:* Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more 
than 1,000,000 rows.

*日期:* 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your 
last comment saying The OOM or lose heartbeat was occurred on slave 
node. Because from the log files you attached at first, those OOM 
actually happens on driver side (Thrift server log only contains log 
lines from driver side). Did you see OOM from executor stderr output? 
I ask this because there are still a large portion of users are using 
1.3, and we may want deliver a fix if there does exist bugs that 
causes unexpected OOM.


Cheng

On 6/12/15 3:14 PM, 姜超才 wrote:

Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
*发件人:* 姜超才
*收件人:* Cheng Lian , Hester wang ,
*主题:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian
*收件人:* 姜超才 , Hester wang ,
*主题:* Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't 
have access to a cluster for now) but couldn't reproduce this issue. 
Your program just executed smoothly... :-/


Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode 
works for you?  Will investigate this with a cluster when I get chance.


Cheng

On 6/10/15 5:19 PM, 姜超才 wrote:
When set spark.sql.thriftServer.incrementalCollect and set driver 
memory to 7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the 
result set ( while rs.hasNext ), it can quickly get the OOM: java 
heap space. See attachment.


/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true 
--conf spark.shuffle.manager=sort --conf 
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path 
/usr/local/hive/lib/classes12.jar --conf 
spark.sql.thriftServer.incrementalCollect=true


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian
*收件人:* 姜超才 , Hester wang ,
*主题:* Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a 
compressed copy (don't cc user@spark.apache.org)?


Cheng

On 6/10/15 4:23 PM, 姜超才 wrote:

Hi Lian,

Thanks for your quick response.

I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 
rows changed from OOM::GC overhead limit exceeded to  lost 
worker heartbeat after 120s.


I will try  to set spark.sql.thriftServer.incrementalCollect and 
continue increase driver memory to 7G, and will send the result to 
you.


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian
*收件人:* Hester wang ,
*主题:* Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:15:47 (Wed)

Hi Xiaohan,

Would you please try to set 
spark.sql.thriftServer.incrementalCollect to true and 
increasing driver memory size? In this way, HiveThriftServer2 uses 
RDD.toLocalIterator rather than RDD.collect().iterator to return 
the result set. The key difference is that RDD.toLocalIterator 
retrieves a single partition at a time, thus avoid holding the 
whole result set on driver side. The memory issue happens on driver 
side 

RE: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng, Hao
Not sure if Spark RDD will provide API to fetch the record one by one from the 
final result set, instead of the pulling them all / (or whole partition data) 
and fit in the driver memory.

Seems a big change.

From: Cheng Lian [mailto:l...@databricks.com]
Sent: Friday, June 12, 2015 3:51 PM
To: 姜超才; Hester wang; user@spark.apache.org
Subject: Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

Thanks for the extra details and explanations Chaocai, will try to reproduce 
this when I got chance.

Cheng
On 6/12/15 3:44 PM, 姜超才 wrote:
I said OOM occurred on slave node, because I monitored memory utilization 
during the query task, on driver, very few memory was ocupied. And i remember i 
have ever seen the OOM stderr log on slave node.

But recently there seems no OOM log on slave node.
Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on 
cluster mode.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian l...@databricks.commailto:l...@databricks.com
收件人: 姜超才 jiangchao...@haiyisoft.commailto:jiangchao...@haiyisoft.com, 
Hester wang hester9...@gmail.commailto:hester9...@gmail.com, 
user@spark.apache.orgmailto:user@spark.apache.org
主题: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment 
saying The OOM or lose heartbeat was occurred on slave node. Because from the 
log files you attached at first, those OOM actually happens on driver side 
(Thrift server log only contains log lines from driver side). Did you see OOM 
from executor stderr output? I ask this because there are still a large portion 
of users are using 1.3, and we may want deliver a fix if there does exist bugs 
that causes unexpected OOM.

Cheng
On 6/12/15 3:14 PM, 姜超才 wrote:
Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
发件人: 姜超才
收件人: Cheng Lian , Hester wang ,
主题: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: 姜超才 , Hester wang ,
主题: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have 
access to a cluster for now) but couldn't reproduce this issue. Your program 
just executed smoothly... :-/

Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode works for you? 
 Will investigate this with a cluster when I get chance.

Cheng
On 6/10/15 5:19 PM, 姜超才 wrote:
When set spark.sql.thriftServer.incrementalCollect and set driver memory to 
7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the result set ( 
while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment.

/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf 
spark.shuffle.manager=sort --conf 
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 
--conf spark.sql.thriftServer.incrementalCollect=true

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: 姜超才 , Hester wang ,
主题: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a compressed 
copy (don't cc user@spark.apache.orgmailto:user@spark.apache.org)?

Cheng
On 6/10/15 4:23 PM, 姜超才 wrote:
Hi Lian,

Thanks for your quick response.






I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 rows changed 
from OOM::GC overhead limit exceeded to  lost worker heartbeat after 120s.


  I will try  to set 
spark.sql.thriftServer.incrementalCollect and continue increase driver memory 
to 7G, and will send the result to you.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: 

Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng Lian
My guess the reason why local mode is OK while standalone cluster 
doesn't work is that in cluster mode, task results are serialized and 
sent to driver side. Driver need to deserialize the result, and thus 
occupies much more memory then local mode (where task result 
de/serialization is not performed).


Cheng

On 6/12/15 4:17 PM, Cheng, Hao wrote:


Not sure if Spark Core will provide API to fetch the record one by one 
from the block manager, instead of the pulling them all into the 
driver memory.


*From:*Cheng Lian [mailto:l...@databricks.com]
*Sent:* Friday, June 12, 2015 3:51 PM
*To:* 姜超才; Hester wang; user@spark.apache.org
*Subject:* Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when 
fetching more than 1,000,000 rows.


Thanks for the extra details and explanations Chaocai, will try to 
reproduce this when I got chance.


Cheng

On 6/12/15 3:44 PM, 姜超才 wrote:

I said OOM occurred on slave node, because I monitored memory
utilization during the query task, on driver, very few memory was
ocupied. And i remember i have ever seen the OOM stderr log on
slave node.

But recently there seems no OOM log on slave node.

Follow the cmd 、data 、env and the code I gave you, the OOM can
100% repro on cluster mode.

Thanks,

SuperJ


- 原始邮件信息 -
*发件人**:* Cheng Lian l...@databricks.com
mailto:l...@databricks.com
*收件人**:* 姜超才 jiangchao...@haiyisoft.com
mailto:jiangchao...@haiyisoft.com, Hester wang
hester9...@gmail.com mailto:hester9...@gmail.com,
user@spark.apache.org mailto:user@spark.apache.org
*主题**:* Re: 回复: Re: 回 复: Re: 回复: Re: Met OOM when fetching
more than 1,000,000 rows.
*日期**:* 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your
last comment saying The OOM or lose heartbeat was occurred on
slave node. Because from the log files you attached at first,
those OOM actually happens on driver side (Thrift server log only
contains log lines from driver side). Did you see OOM from
executor stderr output? I ask this because there are still a large
portion of users are using 1.3, and we may want deliver a fix if
there does exist bugs that causes unexpected OOM.

Cheng

On 6/12/15 3:14 PM, 姜超才 wrote:

Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信 息 -
*发件人**:* 姜超才
*收件人**:* Cheng Lian , Hester wang ,
*主题**:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching
more than 1,000,000 rows.
*日期**:* 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信 息 -
*发件人**:* Cheng Lian
*收件人**:* 姜超才 , Hester wang ,
*主题**:* Re: 回复: Re: 回复: Re: Met OOM when fetching more
than 1,000,000 rows.
*日期**:* 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop
(don't have access to a cluster for now) but couldn't
reproduce this issue. Your program just executed smoothly... :-/

Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local
mode works for you?  Will investigate this with a cluster when
I get chance.

Cheng

On 6/10/15 5:19 PM, 姜超才 wrote:

When set spark.sql.thriftServer.incrementalCollect and
set driver memory to 7G, Things seems stable and simple:

It can quickly run through the query line, but when
traversal the result set ( while rs.hasNext ), it can
quickly get the OOM: java heap space. See attachment.

/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh
--master spark://cx-spark-001:7077 --conf
spark.executor.memory=4g --conf spark.driver.memory=7g
--conf spark.shuffle.consolidateFiles=true --conf
spark.shuffle.manager=sort --conf
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit
--conf spark.file.transferTo=false --conf
spark.akka.timeout=2000 --conf
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8
--conf spark.kryoserializer.buffer.mb=256 --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.akka.frameSize=512 

Re: Spark Streaming reads from stdin or output from command line utility

2015-06-12 Thread Tathagata Das
Is it a lot of data that is expected to come through stdin? I mean is it
even worth parallelizing the computation using something like Spark
Streaming?

On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo heath...@fb.com wrote:

  Thanks for your reply! In my use case, it would be stream from only one
 stdin. Also I'm working with Scala.
 It would be great if you could talk about multi stdin case as well!
 Thanks.

   From: Tathagata Das t...@databricks.com
 Date: Thursday, June 11, 2015 at 8:11 PM
 To: Heath Guo heath...@fb.com
 Cc: user user@spark.apache.org
 Subject: Re: Spark Streaming reads from stdin or output from command line
 utility

   Are you going to receive data from one stdin from one machine, or many
 stdins on many machines?


 On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.com wrote:

 Hi, I'm new to Spark Streaming, and I want to create a application where
 Spark Streaming could create DStream from stdin. Basically I have a
 command
 line utility that generates stream data, and I'd like to pipe data into
 DStream. What's the best way to do that? I thought rdd.pipe() could help,
 but it seems that requires an rdd in the first place, which does not
 apply.
 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html
 https://urldefense.proofpoint.com/v1/url?u=http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlk=ZVNjlDMF0FElm4dQtryO4A%3D%3D%0Ar=4Z2U8tLm1orBgymimfryIw%3D%3D%0Am=4O1SseOzl0OsOY1s4%2B3jfsvy21wseYOJS0gxhf1IYc8%3D%0As=3df5e3f1e40970c1cb5191b7e3d6c9957c86993d2ac1f2d7fb6b622c7ebeccdd
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng Lian

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your last 
comment saying The OOM or lose heartbeat was occurred on slave node. 
Because from the log files you attached at first, those OOM actually 
happens on driver side (Thrift server log only contains log lines from 
driver side). Did you see OOM from executor stderr output? I ask this 
because there are still a large portion of users are using 1.3, and we 
may want deliver a fix if there does exist bugs that causes unexpected OOM.


Cheng

On 6/12/15 3:14 PM, 姜超才 wrote:

Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
*发件人:* 姜超才 jiangchao...@haiyisoft.com
*收件人:* Cheng Lian l...@databricks.com, Hester wang 
hester9...@gmail.com, user@spark.apache.org
*主题:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian
*收件人:* 姜超才 , Hester wang ,
*主题:* Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't 
have access to a cluster for now) but couldn't reproduce this issue. 
Your program just executed smoothly... :-/


Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode works 
for you?  Will investigate this with a cluster when I get chance.


Cheng

On 6/10/15 5:19 PM, 姜超才 wrote:
When set spark.sql.thriftServer.incrementalCollect and set driver 
memory to 7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the 
result set ( while rs.hasNext ), it can quickly get the OOM: java 
heap space. See attachment.


/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true 
--conf spark.shuffle.manager=sort --conf 
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path 
/usr/local/hive/lib/classes12.jar --conf 
spark.sql.thriftServer.incrementalCollect=true


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian
*收件人:* 姜超才 , Hester wang ,
*主题:* Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a 
compressed copy (don't cc user@spark.apache.org)?


Cheng

On 6/10/15 4:23 PM, 姜超才 wrote:

Hi Lian,

Thanks for your quick response.

I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 
rows changed from OOM::GC overhead limit exceeded to  lost worker 
heartbeat after 120s.


I will try  to set spark.sql.thriftServer.incrementalCollect and 
continue increase driver memory to 7G, and will send the result to you.


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian
*收件人:* Hester wang ,
*主题:* Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:15:47 (Wed)

Hi Xiaohan,

Would you please try to set 
spark.sql.thriftServer.incrementalCollect to true and increasing 
driver memory size? In this way, HiveThriftServer2 uses 
RDD.toLocalIterator rather than RDD.collect().iterator to return the 
result set. The key difference is that RDD.toLocalIterator retrieves 
a single partition at a time, thus avoid holding the whole result 
set on driver side. The memory issue happens on driver side rather 
than executor side, so tuning executor memory size doesn't help.


Cheng

On 6/10/15 3:46 PM, Hester wang wrote:

Hi Lian,


I met a SparkSQL problem. I really appreciate it if you could give 
me some help! Below is the detailed description of the problem, for 
more information, attached are the original code and the log that 
you may need.


Problem:
I want to query my table which stored in Hive through the SparkSQL 
JDBC interface.

And want to fetch more than 1,000,000 rows. But met OOM.
sql = select * from TEMP_ADMIN_150601_01 limit XXX ;

My Env:
5 Nodes = One master + 4 workers,  1000M Network Switch ,  Redhat 6.5
Each node: 8G RAM, 500G Harddisk
Java 1.6, Scala 2.10.4, Hadoop 

RE: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng, Hao
Not sure if Spark Core will provide API to fetch the record one by one from the 
block manager, instead of the pulling them all into the driver memory.

From: Cheng Lian [mailto:l...@databricks.com]
Sent: Friday, June 12, 2015 3:51 PM
To: 姜超才; Hester wang; user@spark.apache.org
Subject: Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

Thanks for the extra details and explanations Chaocai, will try to reproduce 
this when I got chance.

Cheng
On 6/12/15 3:44 PM, 姜超才 wrote:
I said OOM occurred on slave node, because I monitored memory utilization 
during the query task, on driver, very few memory was ocupied. And i remember i 
have ever seen the OOM stderr log on slave node.

But recently there seems no OOM log on slave node.
Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on 
cluster mode.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian l...@databricks.commailto:l...@databricks.com
收件人: 姜超才 jiangchao...@haiyisoft.commailto:jiangchao...@haiyisoft.com, 
Hester wang hester9...@gmail.commailto:hester9...@gmail.com, 
user@spark.apache.orgmailto:user@spark.apache.org
主题: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment 
saying The OOM or lose heartbeat was occurred on slave node. Because from the 
log files you attached at first, those OOM actually happens on driver side 
(Thrift server log only contains log lines from driver side). Did you see OOM 
from executor stderr output? I ask this because there are still a large portion 
of users are using 1.3, and we may want deliver a fix if there does exist bugs 
that causes unexpected OOM.

Cheng
On 6/12/15 3:14 PM, 姜超才 wrote:
Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
发件人: 姜超才
收件人: Cheng Lian , Hester wang ,
主题: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: 姜超才 , Hester wang ,
主题: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have 
access to a cluster for now) but couldn't reproduce this issue. Your program 
just executed smoothly... :-/

Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode works for you? 
 Will investigate this with a cluster when I get chance.

Cheng
On 6/10/15 5:19 PM, 姜超才 wrote:
When set spark.sql.thriftServer.incrementalCollect and set driver memory to 
7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the result set ( 
while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment.

/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf 
spark.shuffle.manager=sort --conf 
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 
--conf spark.sql.thriftServer.incrementalCollect=true

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: 姜超才 , Hester wang ,
主题: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a compressed 
copy (don't cc user@spark.apache.orgmailto:user@spark.apache.org)?

Cheng
On 6/10/15 4:23 PM, 姜超才 wrote:
Hi Lian,

Thanks for your quick response.






I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 rows changed 
from OOM::GC overhead limit exceeded to  lost worker heartbeat after 120s.


  I will try  to set 
spark.sql.thriftServer.incrementalCollect and continue increase driver memory 
to 7G, and will send the result to you.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: Hester wang ,
主题: Re: Met OOM when fetching 

Re: Spark 1.4 release date

2015-06-12 Thread Todd Nist
It was released yesterday.

On Friday, June 12, 2015, ayan guha guha.a...@gmail.com wrote:

 Hi

 When is official spark 1.4 release date?
 Best
 Ayan



Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Saisai Shao
Scala KafkaRDD uses a trait to handle this problem, but it is not so easy
and straightforward in Python, where we need to have a specific API to
handle this, I'm not sure is there any simple workaround to fix this, maybe
we should think carefully about it.

2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com:


 Thanks, Jerry. That's what I suspected based on the code I looked at. Any
 pointers on what is needed to build in this support would be great. This is
 critical to the project we are currently working on.

 Thanks!


 On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets are
 no more persisted in Zookeeper but rather within Spark itself. If you want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so that
 we can continue from where we left off after a code deployment. In other
 words, we need exactly-once processing guarantees across code deployments.
 Spark does not support any state persistence across deployments so this is
 something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't
 been able to figure out how to get the offsets from the RDD. Looks like 
 the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit








If not stop StreamingContext gracefully, will checkpoint data be consistent?

2015-06-12 Thread Haopu Wang
This is a quick question about Checkpoint. The question is: if the
StreamingContext is not stopped gracefully, will the checkpoint be
consistent?
Or I should always gracefully shutdown the application even in order to
use the checkpoint?

Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.4 release date

2015-06-12 Thread ayan guha
Hi

When is official spark 1.4 release date?
Best
Ayan


Re: Spark Streaming reads from stdin or output from command line utility

2015-06-12 Thread Heath Guo
Yes, it is lots of data, and the utility I'm working with prints out infinite 
real time data stream. Thanks.


From: Tathagata Das t...@databricks.commailto:t...@databricks.com
Date: Thursday, June 11, 2015 at 11:43 PM
To: Heath Guo heath...@fb.commailto:heath...@fb.com
Cc: user user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark Streaming reads from stdin or output from command line 
utility

Is it a lot of data that is expected to come through stdin? I mean is it even 
worth parallelizing the computation using something like Spark Streaming?

On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo 
heath...@fb.commailto:heath...@fb.com wrote:
Thanks for your reply! In my use case, it would be stream from only one stdin. 
Also I'm working with Scala.
It would be great if you could talk about multi stdin case as well! Thanks.

From: Tathagata Das t...@databricks.commailto:t...@databricks.com
Date: Thursday, June 11, 2015 at 8:11 PM
To: Heath Guo heath...@fb.commailto:heath...@fb.com
Cc: user user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark Streaming reads from stdin or output from command line 
utility

Are you going to receive data from one stdin from one machine, or many stdins 
on many machines?


On Thu, Jun 11, 2015 at 7:25 PM, foobar 
heath...@fb.commailto:heath...@fb.com wrote:
Hi, I'm new to Spark Streaming, and I want to create a application where
Spark Streaming could create DStream from stdin. Basically I have a command
line utility that generates stream data, and I'd like to pipe data into
DStream. What's the best way to do that? I thought rdd.pipe() could help,
but it seems that requires an rdd in the first place, which does not apply.
Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlhttps://urldefense.proofpoint.com/v1/url?u=http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlk=ZVNjlDMF0FElm4dQtryO4A%3D%3D%0Ar=4Z2U8tLm1orBgymimfryIw%3D%3D%0Am=4O1SseOzl0OsOY1s4%2B3jfsvy21wseYOJS0gxhf1IYc8%3D%0As=3df5e3f1e40970c1cb5191b7e3d6c9957c86993d2ac1f2d7fb6b622c7ebeccdd
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org





Re: Spark Streaming reads from stdin or output from command line utility

2015-06-12 Thread Gerard Maas
Would using the socketTextStream and `yourApp | nc -lk port` work?? Not
sure how resilient the socket receiver is though. I've been playing with it
for a little demo and I don't understand yet its reconnection behavior.

Although I would think that putting some elastic buffer in between would be
a good idea to decouple producer from consumer. Kafka would be my first
choice.

-kr, Gerard.

On Fri, Jun 12, 2015 at 8:46 AM, Heath Guo heath...@fb.com wrote:

  Yes, it is lots of data, and the utility I'm working with prints out
 infinite real time data stream. Thanks.


   From: Tathagata Das t...@databricks.com
 Date: Thursday, June 11, 2015 at 11:43 PM

 To: Heath Guo heath...@fb.com
 Cc: user user@spark.apache.org
 Subject: Re: Spark Streaming reads from stdin or output from command line
 utility

   Is it a lot of data that is expected to come through stdin? I mean is
 it even worth parallelizing the computation using something like Spark
 Streaming?

 On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo heath...@fb.com wrote:

   Thanks for your reply! In my use case, it would be stream from only
 one stdin. Also I'm working with Scala.
 It would be great if you could talk about multi stdin case as well!
 Thanks.

   From: Tathagata Das t...@databricks.com
 Date: Thursday, June 11, 2015 at 8:11 PM
 To: Heath Guo heath...@fb.com
 Cc: user user@spark.apache.org
 Subject: Re: Spark Streaming reads from stdin or output from command
 line utility

Are you going to receive data from one stdin from one machine, or
 many stdins on many machines?


 On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.com wrote:

 Hi, I'm new to Spark Streaming, and I want to create a application where
 Spark Streaming could create DStream from stdin. Basically I have a
 command
 line utility that generates stream data, and I'd like to pipe data into
 DStream. What's the best way to do that? I thought rdd.pipe() could help,
 but it seems that requires an rdd in the first place, which does not
 apply.
 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html
 https://urldefense.proofpoint.com/v1/url?u=http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlk=ZVNjlDMF0FElm4dQtryO4A%3D%3D%0Ar=4Z2U8tLm1orBgymimfryIw%3D%3D%0Am=4O1SseOzl0OsOY1s4%2B3jfsvy21wseYOJS0gxhf1IYc8%3D%0As=3df5e3f1e40970c1cb5191b7e3d6c9957c86993d2ac1f2d7fb6b622c7ebeccdd
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: BigDecimal problem in parquet file

2015-06-12 Thread Cheng Lian

On 6/10/15 8:53 PM, Bipin Nag wrote:


Hi Cheng,

I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading 
an existing parquet file, then repartitioning and saving it. Doing 
this gives the error. The code for this doesn't look like causing  
problem. I have a feeling the source - the existing parquet is the 
culprit.


I created that parquet using a jdbcrdd (pulled from microsoft sql 
server). First I saved jdbcrdd as an objectfile on disk. Then loaded 
it again, made a dataframe from it using a schema then saved it as a 
parquet.


Following is the code :
For saving jdbcrdd:
 name - fullqualifiedtablename
 pk - string for primarykey
 pklast - last id to pull
val myRDD = new JdbcRDD( sc, () =
DriverManager.getConnection(url,username,password) ,
SELECT * FROM  + name +  WITH (NOLOCK) WHERE ? = +pk+ 
and +pk+ = ?,

1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
myRDD.saveAsObjectFile(rawdata/+name);

For applying schema and saving the parquet:
val myschema = schemamap(name)
val myrdd = 
sc.objectFile[Array[Object]](/home/bipin/rawdata/+name).map(x = 
org.apache.spark.sql.Row(x:_*))


Have you tried to print out |x| here to check its contents? My guess is 
that |x| actually contains unit values. For example, the follow Spark 
shell code can reproduce a similar exception:


|import  org.apache.spark.sql.types._
import  org.apache.spark.sql.Row

val  schema  =  StructType(StructField(dec,DecimalType(10,0)) ::Nil)
val  rdd  =  sc.parallelize(1  to10).map(_ =Array(())).map(arr =Row(arr: _*))
val  df  =  sqlContext.createDataFrame(rdd, schema)

df.saveAsParquetFile(file:///tmp/foo)
|


val actualdata = sqlContext.createDataFrame(myrdd, myschema)
actualdata.saveAsParquetFile(/home/bipin/stageddata/+name)

Schema structtype can be made manually, though I pull table's metadata 
and make one. It is a simple string translation (see sql docs 
https://msdn.microsoft.com/en-us/library/ms378878%28v=sql.110%29.aspx and/or 
spark datatypes 
https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#data-types)


That is how I created the parquet file. Any help to solve the issue is 
appreciated.

Thanks
Bipin


On 9 June 2015 at 20:44, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Would you please provide a snippet that reproduce this issue? What
version of Spark were you using?

Cheng

On 6/9/15 8:18 PM, bipin wrote:

Hi,
When I try to save my data frame as a parquet file I get the
following
error:

java.lang.ClassCastException: scala.runtime.BoxedUnit cannot
be cast to
org.apache.spark.sql.types.Decimal
at

org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
at

org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
at

org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
at

org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
at

parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at
org.apache.spark.sql.parquet.ParquetRelation2.org

http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
at

org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
at

org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

How to fix this problem ?



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Optimizing Streaming from Websphere MQ

2015-06-12 Thread Chaudhary, Umesh
Hi,
I have created a Custom Receiver in Java which receives data from Websphere MQ 
and I am only writing the received records on HDFS.

I have referred many forums for optimizing speed of spark streaming 
application. Here I am listing a few:


* Spark 
Officialhttp://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

* VIrdatahttp://www.virdata.com/tuning-spark/

*  TD's Slide (A bit Old but 
Useful)http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617

I got mainly two point for my applicability :


* giving batch interval as 1 sec

* Controlling spark.streaming.blockInterval =200ms

* inputStream.repartition(3)

But that did not improve my actual speed (records/sec) of receiver which is MAX 
5-10 records /sec. This is way less from my expectation.
Am I missing something?

Regards,
Umesh Chaudhary

This message, including any attachments, is the property of Sears Holdings 
Corporation and/or one of its subsidiaries. It is confidential and may contain 
proprietary or legally privileged information. If you are not the intended 
recipient, please delete it without reading the contents. Thank you.


How to use Window Operations with kafka Direct-API?

2015-06-12 Thread ZIGEN
Hi, I'm using Spark Streaming(1.3.1).
I want to get exactly-once messaging from Kafka and use Window operations of
DStraem,

When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
Direct-API
java.lang.ClassCastException occurs as follows.

--- stacktrace --
java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
at
org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
at
org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)

 
--- my source ---

JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
jssc.checkpoint(checkpoint);

JavaPairInputDStreamString, String messages =
KafkaUtils.createDirectStream
 (jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet);

JavaPairDStreamString, Listlt;String pairDS = messages.mapToPair(...);

JavaPairDStreamString, Listlt;String windowDs =
pairDS.reduceByKeyAndWindow(new Function2Listlt;String, ListString,
ListString() {
@Override
public ListString call(ListString list1, ListString list2) throws
Exception {
...
}
}, windowDuration, slideDuration);

windowDs.foreachRDD(new FunctionJavaPairRDDlt;String,Listlt;String,
Void() {

@Override
public Void call(JavaPairRDDString, Listlt;String rdd) throws 
Exception
{


OffsetRange[] offsetsList = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges(); 
// ClassCastException occurred 

KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams));
for (OffsetRange offsets : offsetsList) {

TopicAndPartition topicAndPartition = new
TopicAndPartition(offsets.topic(), offsets.partition());

HashMapTopicAndPartition, Object map = new 
HashMapTopicAndPartition,
Object();
map.put(topicAndPartition, offsets.untilOffset());
kc.setConsumerOffsets(group1, toScalaMap(map));
}

return null;
}
});

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to parquet 1.6.0

2015-06-12 Thread Cheng Lian
At the time 1.3.x was released, 1.6.0 hasn't been released yet. And we 
didn't have enough time to upgrade and test Parquet 1.6.0 for Spark 
1.4.0. But we've already upgraded Parquet to 1.7.0 (which is exactly the 
same as 1.6.0 with package name renamed from com.twitter to 
org.apache.parquet) on master branch recently.


Cheng

On 6/12/15 6:16 PM, Eric Eijkelenboom wrote:

Hi

What is the reason that Spark still comes with Parquet 1.6.0rc3? It 
seems like newer Parquet versions are available (e.g. 1.6.0). This 
would fix problems with ‘spark.sql.parquet.filterPushdown’, which 
currently is disabled by default, because of a bug in Parquet 1.6.0rc3.


Thanks!

Eric




Re: BigDecimal problem in parquet file

2015-06-12 Thread Bipin Nag
Hi Cheng,

Yes, some rows contain unit instead of decimal values. I believe some rows
from original source I had don't have any value i.e. it is null. And that
shows up as unit. How does the spark-sql or parquet handle null in place of
decimal values, assuming that field is nullable. I will have to change it
properly.

Thanks for helping out.
Bipin

On 12 June 2015 at 14:57, Cheng Lian lian.cs@gmail.com wrote:

  On 6/10/15 8:53 PM, Bipin Nag wrote:

   Hi Cheng,

  I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an
 existing parquet file, then repartitioning and saving it. Doing this gives
 the error. The code for this doesn't look like causing  problem. I have a
 feeling the source - the existing parquet is the culprit.

 I created that parquet using a jdbcrdd (pulled from microsoft sql server).
 First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made
 a dataframe from it using a schema then saved it as a parquet.

  Following is the code :
  For saving jdbcrdd:
   name - fullqualifiedtablename
   pk - string for primarykey
   pklast - last id to pull
  val myRDD = new JdbcRDD( sc, () =
 DriverManager.getConnection(url,username,password) ,
 SELECT * FROM  + name +  WITH (NOLOCK) WHERE ? = +pk+ and
 +pk+ = ?,
 1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
 myRDD.saveAsObjectFile(rawdata/+name);

  For applying schema and saving the parquet:
 val myschema = schemamap(name)
 val myrdd =
 sc.objectFile[Array[Object]](/home/bipin/rawdata/+name).map(x =
 org.apache.spark.sql.Row(x:_*))

   Have you tried to print out x here to check its contents? My guess is
 that x actually contains unit values. For example, the follow Spark shell
 code can reproduce a similar exception:

 import org.apache.spark.sql.types._import org.apache.spark.sql.Row
 val schema = StructType(StructField(dec, DecimalType(10, 0)) :: Nil)val rdd 
 = sc.parallelize(1 to 10).map(_ = Array(())).map(arr = Row(arr: _*))val df 
 = sqlContext.createDataFrame(rdd, schema)

 df.saveAsParquetFile(file:///tmp/foo)

val actualdata = sqlContext.createDataFrame(myrdd, myschema)
 actualdata.saveAsParquetFile(/home/bipin/stageddata/+name)

  Schema structtype can be made manually, though I pull table's metadata
 and make one. It is a simple string translation (see sql docs
 https://msdn.microsoft.com/en-us/library/ms378878%28v=sql.110%29.aspx
 and/or spark datatypes
 https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#data-types
 )

  That is how I created the parquet file. Any help to solve the issue is
 appreciated.
  Thanks
  Bipin


 On 9 June 2015 at 20:44, Cheng Lian lian.cs@gmail.com wrote:

 Would you please provide a snippet that reproduce this issue? What
 version of Spark were you using?

 Cheng

 On 6/9/15 8:18 PM, bipin wrote:

 Hi,
 When I try to save my data frame as a parquet file I get the following
 error:

 java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
 org.apache.spark.sql.types.Decimal
 at

 org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
 at

 parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
 at
 parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
 at
 parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
 at
 org.apache.spark.sql.parquet.ParquetRelation2.org
 $apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 How to fix this problem ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




​



Re: Spark 1.4 release date

2015-06-12 Thread ayan guha
Thanks a lot.
On 12 Jun 2015 19:46, Todd Nist tsind...@gmail.com wrote:

 It was released yesterday.

 On Friday, June 12, 2015, ayan guha guha.a...@gmail.com wrote:

 Hi

 When is official spark 1.4 release date?
 Best
 Ayan




Re: Spark 1.4 release date

2015-06-12 Thread Guru Medasani
Here is a spark 1.4 release blog by data bricks.

https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html 
https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html


Guru Medasani
gdm...@gmail.com



 On Jun 12, 2015, at 7:08 AM, ayan guha guha.a...@gmail.com wrote:
 
 Thanks a lot.
 
 On 12 Jun 2015 19:46, Todd Nist tsind...@gmail.com 
 mailto:tsind...@gmail.com wrote:
 It was released yesterday.
 
 On Friday, June 12, 2015, ayan guha guha.a...@gmail.com 
 mailto:guha.a...@gmail.com wrote:
 Hi
 
 When is official spark 1.4 release date?
 Best
 Ayan
 



Upgrade to parquet 1.6.0

2015-06-12 Thread Eric Eijkelenboom
Hi 

What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems like 
newer Parquet versions are available (e.g. 1.6.0). This would fix problems with 
‘spark.sql.parquet.filterPushdown’, which currently is disabled by default, 
because of a bug in Parquet 1.6.0rc3.

Thanks! 

Eric

Re: Upgrade to parquet 1.6.0

2015-06-12 Thread Eric Eijkelenboom
Great, thanks for the extra info! 

 On 12 Jun 2015, at 12:41, Cheng Lian lian.cs@gmail.com wrote:
 
 At the time 1.3.x was released, 1.6.0 hasn't been released yet. And we didn't 
 have enough time to upgrade and test Parquet 1.6.0 for Spark 1.4.0. But we've 
 already upgraded Parquet to 1.7.0 (which is exactly the same as 1.6.0 with 
 package name renamed from com.twitter to org.apache.parquet) on master branch 
 recently.
 
 Cheng
 
 On 6/12/15 6:16 PM, Eric Eijkelenboom wrote:
 Hi 
 
 What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems 
 like newer Parquet versions are available (e.g. 1.6.0). This would fix 
 problems with ‘spark.sql.parquet.filterPushdown’, which currently is 
 disabled by default, because of a bug in Parquet 1.6.0rc3.
 
 Thanks! 
 
 Eric
 



Re: How to use Window Operations with kafka Direct-API?

2015-06-12 Thread zigen
Hi Shao,

Thank you for your quick prompt.
I was disappointed.
I will try window operations with Receiver-based 
Approach(KafkaUtils.createStream).

Thank you again,
ZIGEN


2015/06/12 17:18、Saisai Shao sai.sai.s...@gmail.com のメッセージ:

 I think you could not use offsetRange in such way, when you transform a 
 DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is 
 changed into normal RDD, but offsetRange is a specific attribute for 
 KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will meet 
 such exception.
 
 you could only do something like:
 
 directKafkaInputDStream.foreachRDD { rdd =
rdd.asInstanceOf[HasOffsetRanges]
   ...
 }
 
 Apply foreachRDD directly on DirectKafkaInputDStream.
 
 
 
 
 
 
 
 2015-06-12 16:10 GMT+08:00 ZIGEN dbviewer.zi...@gmail.com:
 Hi, I'm using Spark Streaming(1.3.1).
 I want to get exactly-once messaging from Kafka and use Window operations of
 DStraem,
 
 When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
 Direct-API
 java.lang.ClassCastException occurs as follows.
 
 --- stacktrace --
 java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
 be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
 at
 org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
 at
 org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)
 
 
 --- my source ---
 
 JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
 jssc.checkpoint(checkpoint);
 
 JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream
  (jssc, String.class, String.class, StringDecoder.class,
 StringDecoder.class, kafkaParams, topicsSet);
 
 JavaPairDStreamString, Listlt;String pairDS = messages.mapToPair(...);
 
 JavaPairDStreamString, Listlt;String windowDs =
 pairDS.reduceByKeyAndWindow(new Function2Listlt;String, ListString,
 ListString() {
 @Override
 public ListString call(ListString list1, ListString list2) 
 throws
 Exception {
 ...
 }
 }, windowDuration, slideDuration);
 
 windowDs.foreachRDD(new FunctionJavaPairRDDlt;String,Listlt;String,
 Void() {
 
 @Override
 public Void call(JavaPairRDDString, Listlt;String rdd) throws 
 Exception
 {
 
 
 OffsetRange[] offsetsList = ((HasOffsetRanges) 
 rdd.rdd()).offsetRanges();
 // ClassCastException occurred
 
 KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams));
 for (OffsetRange offsets : offsetsList) {
 
 TopicAndPartition topicAndPartition = new
 TopicAndPartition(offsets.topic(), offsets.partition());
 
 HashMapTopicAndPartition, Object map = new 
 HashMapTopicAndPartition,
 Object();
 map.put(topicAndPartition, offsets.untilOffset());
 kc.setConsumerOffsets(group1, toScalaMap(map));
 }
 
 return null;
 }
 });
 
 Thanks!
 
 
 
 
 --
 View this message in context: 
 

Scheduling and node affinity

2015-06-12 Thread Brian Candler
I would like to know if Spark has any facility by which particular tasks 
can be scheduled to run on chosen nodes.


The use case: we have a large custom-format database. It is partitioned 
and the segments are stored on local SSD on multiple nodes. Incoming 
queries are matched against the database; this involves either sending 
each key to the correct node, or sending a batch to all nodes 
simultaneously, where the queries are filtered and processed against one 
segment each, and the results are merged at the end.


Currently we are doing this with htcondor using a DAG to define the 
workflow and requirements expressions to match particular jobs to 
particular databases, but it's coarse-grained and more suited for batch 
processing than real-time, as well as being cumbersome to define and manage.


I wonder whether Spark would suit this workflow, and if so how?

It seems that either we would need to schedule parts of our jobs on the 
appropriate nodes, which I can't see how to do:

http://spark.apache.org/docs/latest/job-scheduling.html

Or possibly we could define our partitioned database as a custom type of 
RDD - however then we would need to define operations which work on two 
RDDs simultaneously (i.e. the static database and the incoming set of 
queries) which doesn't seem to fit Spark well AFAICS.


Any other ideas how we could approach this, either with Spark or 
suggestions for other frameworks to look at? (We would actually prefer 
non-Java frameworks but are happy to look at all options)


Thanks,

Brian Candler.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread algermissen1971
Cody,

On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:

 There are several database apis that use a thread local or singleton 
 reference to a connection pool (we use ScalikeJDBC currently, but there are 
 others).
  
 You can use mapPartitions earlier in the chain to make sure the connection 
 pool is set up on that executor, then use it inside updateStateByKey
 

Thanks. You are saying I should just make an arbitrary use of the ‘connection’ 
to invoke the ‘lazy’. E.g. like this:

object SomeDB {

  lazy val conn = new SomeDB( “some serializable config)

}


Then somewhere else:

theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
  SomeDb.conn.init
  pair
   }
)).updateStateByKey[Session](myUpdateFunction _)


An in myUpdateFunction

def myUpdateFunction( …) {

SomeDb.conn.store(  … )

}


Correct?

Jan




 On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
 Hi,
 
 I have a scenario with spark streaming, where I need to write to a database 
 from within updateStateByKey[1].
 
 That means that inside my update function I need a connection.
 
 I have so far understood that I should create a new (lazy) connection for 
 every partition. But since I am not working in foreachRDD I wonder where I 
 can iterate over the partitions.
 
 Should I use mapPartitions() somewhere up the chain?
 
 Jan
 
 
 
 [1] The use case being saving ‘done' sessions during web tracking.
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Extracting k-means cluster values along with centers?

2015-06-12 Thread Minnow Noir
Greetings.

I have been following some of the tutorials online for Spark k-means
clustering.  I would like to be able to just dump all the cluster values
and their centroids to text file so I can explore the data.  I have the
clusters as such:

val clusters = KMeans.train(parsedData, numClusters, numIterations)

clusters
res2: org.apache.spark.mllib.clustering.KMeansModel =
org.apache.spark.mllib.clustering.KMeansModel@59de440b

Is there a way to build something akin to a key value RDD that has the
center as the key and the array of values associated with that center as
the value? I don't see anything in the tutorials, API docs, or the
Learning book for how to do this.

Thank you


--jars not working?

2015-06-12 Thread Jonathan Coveney
Spark version is 1.3.0 (will upgrade as soon as we upgrade past mesos
0.19.0)...

Regardless, I'm running into a really weird situation where when I pass
--jars to bin/spark-shell I can't reference those classes on the repl. Is
this expected? The logs even tell me that my jars have been added, and yet
the classes inside of them are not available.

Am I missing something obvious?


Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread algermissen1971


On 12 Jun 2015, at 23:19, Cody Koeninger c...@koeninger.org wrote:

 A. No, it's called once per partition.  Usually you have more partitions than 
 executors, so it will end up getting called multiple times per executor.  But 
 you can use a lazy val, singleton, etc to make sure the setup only takes 
 place once per JVM.
 
 B.  I cant speak to the specifics there ... but as long as you're making sure 
 the setup gets called at most once per executor, before the work that needs 
 it ... should be ok.
 

Great thanks so much - 

(I guess I am not yet clear about the relationship of partition / executor / 
stage, but I get the idea.)

Jan


 On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com 
 wrote:
 
 On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote:
 
  Close.  the mapPartitions call doesn't need to do anything at all to the 
  iter.
 
  mapPartitions { iter =
SomeDb.conn.init
iter
  }
 
 Yes, thanks!
 
 Maybe you can confirm two more things and then you helped me make a giant 
 leap today:
 
 a) When using spark streaming, will this happen exactly once per executor? I 
 mean: is mapPartitions called once per executor for the lifetime of the 
 stream?
 
 Or should I rather think once per stage?
 
 
 b) I actually need an ActorSystem and FlowMaterializer (for making an 
 Akka-HTTP request to store the data), not a DB connection - I presume this 
 does not changethe concept?
 
 
 Jan
 
 
 
 
  On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 
  algermissen1...@icloud.com wrote:
  Cody,
 
  On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:
 
   There are several database apis that use a thread local or singleton 
   reference to a connection pool (we use ScalikeJDBC currently, but there 
   are others).
  
   You can use mapPartitions earlier in the chain to make sure the 
   connection pool is set up on that executor, then use it inside 
   updateStateByKey
  
 
  Thanks. You are saying I should just make an arbitrary use of the 
  ‘connection’ to invoke the ‘lazy’. E.g. like this:
 
  object SomeDB {
 
lazy val conn = new SomeDB( “some serializable config)
 
  }
 
 
  Then somewhere else:
 
  theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
SomeDb.conn.init
pair
 }
  )).updateStateByKey[Session](myUpdateFunction _)
 
 
  An in myUpdateFunction
 
  def myUpdateFunction( …) {
 
  SomeDb.conn.store(  … )
 
  }
 
 
  Correct?
 
  Jan
 
 
 
 
   On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
   algermissen1...@icloud.com wrote:
   Hi,
  
   I have a scenario with spark streaming, where I need to write to a 
   database from within updateStateByKey[1].
  
   That means that inside my update function I need a connection.
  
   I have so far understood that I should create a new (lazy) connection for 
   every partition. But since I am not working in foreachRDD I wonder where 
   I can iterate over the partitions.
  
   Should I use mapPartitions() somewhere up the chain?
  
   Jan
  
  
  
   [1] The use case being saving ‘done' sessions during web tracking.
  
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
  
 
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Juan Rodríguez Hortalá
Hi,

If you want I would be happy to work in this. I have worked with
KafkaUtils.createDirectStream before, in a pull request that wasn't
accepted https://github.com/apache/spark/pull/5367. I'm fluent with Python
and I'm starting to feel comfortable with Scala, so if someone opens a JIRA
I can take it.

Greetings,

Juan Rodriguez


2015-06-12 15:59 GMT+02:00 Cody Koeninger c...@koeninger.org:

 The scala api has 2 ways of calling createDirectStream.  One of them
 allows you to pass a message handler that gets full access to the kafka
 MessageAndMetadata, including offset.

 I don't know why the python api was developed with only one way to call
 createDirectStream, but the first thing I'd look at would be adding that
 functionality back in.  If someone wants help creating a patch for that,
 just let me know.

 Dealing with offsets on a per-message basis may not be as efficient as
 dealing with them on a batch basis using the HasOffsetRanges interface...
 but if efficiency was a primary concern, you probably wouldn't be using
 Python anyway.

 On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Scala KafkaRDD uses a trait to handle this problem, but it is not so easy
 and straightforward in Python, where we need to have a specific API to
 handle this, I'm not sure is there any simple workaround to fix this, maybe
 we should think carefully about it.

 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com:


 Thanks, Jerry. That's what I suspected based on the code I looked at.
 Any pointers on what is needed to build in this support would be great.
 This is critical to the project we are currently working on.

 Thanks!


 On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets are
 no more persisted in Zookeeper but rather within Spark itself. If you want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so
 that we can continue from where we left off after a code deployment. In
 other words, we need exactly-once processing guarantees across code
 deployments. Spark does not support any state persistence across
 deployments so this is something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, 
 why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but
 haven't been able to figure out how to get the offsets from the RDD. 
 Looks
 like the documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit










Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
A. No, it's called once per partition.  Usually you have more partitions
than executors, so it will end up getting called multiple times per
executor.  But you can use a lazy val, singleton, etc to make sure the
setup only takes place once per JVM.

B.  I cant speak to the specifics there ... but as long as you're making
sure the setup gets called at most once per executor, before the work that
needs it ... should be ok.

On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com
 wrote:


 On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote:

  Close.  the mapPartitions call doesn't need to do anything at all to the
 iter.
 
  mapPartitions { iter =
SomeDb.conn.init
iter
  }

 Yes, thanks!

 Maybe you can confirm two more things and then you helped me make a giant
 leap today:

 a) When using spark streaming, will this happen exactly once per executor?
 I mean: is mapPartitions called once per executor for the lifetime of the
 stream?

 Or should I rather think once per stage?


 b) I actually need an ActorSystem and FlowMaterializer (for making an
 Akka-HTTP request to store the data), not a DB connection - I presume this
 does not changethe concept?


 Jan



 
  On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 
 algermissen1...@icloud.com wrote:
  Cody,
 
  On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:
 
   There are several database apis that use a thread local or singleton
 reference to a connection pool (we use ScalikeJDBC currently, but there are
 others).
  
   You can use mapPartitions earlier in the chain to make sure the
 connection pool is set up on that executor, then use it inside
 updateStateByKey
  
 
  Thanks. You are saying I should just make an arbitrary use of the
 ‘connection’ to invoke the ‘lazy’. E.g. like this:
 
  object SomeDB {
 
lazy val conn = new SomeDB( “some serializable config)
 
  }
 
 
  Then somewhere else:
 
  theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
SomeDb.conn.init
pair
 }
  )).updateStateByKey[Session](myUpdateFunction _)
 
 
  An in myUpdateFunction
 
  def myUpdateFunction( …) {
 
  SomeDb.conn.store(  … )
 
  }
 
 
  Correct?
 
  Jan
 
 
 
 
   On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
   Hi,
  
   I have a scenario with spark streaming, where I need to write to a
 database from within updateStateByKey[1].
  
   That means that inside my update function I need a connection.
  
   I have so far understood that I should create a new (lazy) connection
 for every partition. But since I am not working in foreachRDD I wonder
 where I can iterate over the partitions.
  
   Should I use mapPartitions() somewhere up the chain?
  
   Jan
  
  
  
   [1] The use case being saving ‘done' sessions during web tracking.
  
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
  
 
 




Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
Close.  the mapPartitions call doesn't need to do anything at all to the
iter.

mapPartitions { iter =
  SomeDb.conn.init
  iter
}

On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com
 wrote:

 Cody,

 On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:

  There are several database apis that use a thread local or singleton
 reference to a connection pool (we use ScalikeJDBC currently, but there are
 others).
 
  You can use mapPartitions earlier in the chain to make sure the
 connection pool is set up on that executor, then use it inside
 updateStateByKey
 

 Thanks. You are saying I should just make an arbitrary use of the
 ‘connection’ to invoke the ‘lazy’. E.g. like this:

 object SomeDB {

   lazy val conn = new SomeDB( “some serializable config)

 }


 Then somewhere else:

 theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
   SomeDb.conn.init
   pair
}
 )).updateStateByKey[Session](myUpdateFunction _)


 An in myUpdateFunction

 def myUpdateFunction( …) {

 SomeDb.conn.store(  … )

 }


 Correct?

 Jan




  On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
  Hi,
 
  I have a scenario with spark streaming, where I need to write to a
 database from within updateStateByKey[1].
 
  That means that inside my update function I need a connection.
 
  I have so far understood that I should create a new (lazy) connection
 for every partition. But since I am not working in foreachRDD I wonder
 where I can iterate over the partitions.
 
  Should I use mapPartitions() somewhere up the chain?
 
  Jan
 
 
 
  [1] The use case being saving ‘done' sessions during web tracking.
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 




Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread algermissen1971

On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote:

 Close.  the mapPartitions call doesn't need to do anything at all to the iter.
 
 mapPartitions { iter =
   SomeDb.conn.init
   iter
 }

Yes, thanks!

Maybe you can confirm two more things and then you helped me make a giant leap 
today:

a) When using spark streaming, will this happen exactly once per executor? I 
mean: is mapPartitions called once per executor for the lifetime of the stream?

Or should I rather think once per stage?


b) I actually need an ActorSystem and FlowMaterializer (for making an Akka-HTTP 
request to store the data), not a DB connection - I presume this does not 
changethe concept?


Jan



 
 On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com 
 wrote:
 Cody,
 
 On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote:
 
  There are several database apis that use a thread local or singleton 
  reference to a connection pool (we use ScalikeJDBC currently, but there are 
  others).
 
  You can use mapPartitions earlier in the chain to make sure the connection 
  pool is set up on that executor, then use it inside updateStateByKey
 
 
 Thanks. You are saying I should just make an arbitrary use of the 
 ‘connection’ to invoke the ‘lazy’. E.g. like this:
 
 object SomeDB {
 
   lazy val conn = new SomeDB( “some serializable config)
 
 }
 
 
 Then somewhere else:
 
 theTrackingEvents.map(toPairs).mapPartitions(iter = iter.map( pair = {
   SomeDb.conn.init
   pair
}
 )).updateStateByKey[Session](myUpdateFunction _)
 
 
 An in myUpdateFunction
 
 def myUpdateFunction( …) {
 
 SomeDb.conn.store(  … )
 
 }
 
 
 Correct?
 
 Jan
 
 
 
 
  On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
  algermissen1...@icloud.com wrote:
  Hi,
 
  I have a scenario with spark streaming, where I need to write to a database 
  from within updateStateByKey[1].
 
  That means that inside my update function I need a connection.
 
  I have so far understood that I should create a new (lazy) connection for 
  every partition. But since I am not working in foreachRDD I wonder where I 
  can iterate over the partitions.
 
  Should I use mapPartitions() somewhere up the chain?
 
  Jan
 
 
 
  [1] The use case being saving ‘done' sessions during web tracking.
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to use spark for map-reduce flow to filter N columns, top M rows of all csv files under a folder?

2015-06-12 Thread Rex X
To be concrete, say we have a folder with thousands of tab-delimited csv
files with following attributes format (each csv file is about 10GB):

idnameaddresscity...
1Mattadd1LA...
2Willadd2LA...
3Lucyadd3SF...
...

And we have a lookup table based on name above

namegender
MattM
LucyF
...

Now we are interested to output from top 1000 rows of each csv file into
following format:

idnamegender
1MattM
...

Can we use pyspark to efficiently handle this?


Re: [Spark-1.4.0] NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer

2015-06-12 Thread Tao Li
Anyone met the same problem like me?

2015-06-12 23:40 GMT+08:00 Tao Li litao.bupt...@gmail.com:

 Hi all:

 I complied new spark 1.4.0 version today. But when I run WordCount demo,
 it throws NoSuchMethodError *java.lang.NoSuchMethodError:
 com.fasterxml.jackson.module.scala.deser.BigDecimalDeserialize*r.

 I found the default *fasterxml.jackson.version* is *2.4.4*. It's there
 any wrong with the jackson version?



Re: how to use a properties file from a url in spark-submit

2015-06-12 Thread Gary Ogden
Turns out one of the other developers wrapped the jobs in script and did a
cd to another folder in the script before executing spark-submit.

On 12 June 2015 at 14:20, Matthew Jones mle...@gmail.com wrote:

 Hmm either spark-submit isn't picking up the relative path or Chronos is
 not setting your working directory to your sandbox. Try using cd
 $MESOS_SANDBOX  spark-submit --properties-file props.properties

 On Fri, Jun 12, 2015 at 12:32 PM Gary Ogden gog...@gmail.com wrote:

 That's a great idea. I did what you suggested and added the url to the
 props file in the uri of the json. The properties file now shows up in the
 sandbox.  But when it goes to run spark-submit  with --properties-file
 props.properties   it fails to find it:

 Exception in thread main java.lang.IllegalArgumentException: requirement 
 failed: Properties file props.properties does not exist


 On 11 June 2015 at 22:17, Matthew Jones mle...@gmail.com wrote:

 If you are using chronos you can just put the url in the task json and
 chronos will download it into your sandbox. Then just use spark-submit
 --properties-file app.properties.

 On Thu, 11 Jun 2015 15:52 Marcelo Vanzin van...@cloudera.com wrote:

 That's not supported. You could use wget / curl to download the file to
 a temp location before running spark-submit, though.

 On Thu, Jun 11, 2015 at 12:48 PM, Gary Ogden gog...@gmail.com wrote:

 I have a properties file that is hosted at a url. I would like to be
 able to use the url in the --properties-file parameter when submitting a
 job to mesos using spark-submit via chronos

 I would rather do this than use a file on the local server.

 This doesn't seem to work though when submitting from chronos:

 bin/spark-submit --properties-file
 http://server01/props/app.properties

 Inside the properties file:
 spark.executor.memory=256M
 spark.cores.max=1
 spark.shuffle.consolidateFiles=true
 spark.task.cpus=1
 spark.deploy.defaultCores=1
 spark.driver.cores=1
 spark.scheduler.mode=FAIR

 So how do I specify a properties file in a url?




 --
 Marcelo





Re: Reading file from S3, facing java.lang.NoClassDefFoundError: org/jets3t/service/ServiceException

2015-06-12 Thread Akhil Das
Looks like your spark is not able to pick up the HADOOP_CONF. To fix this,
you can actually add jets3t-0.9.0.jar to the classpath
(sc.addJar(/path/to/jets3t-0.9.0.jar).

Thanks
Best Regards

On Thu, Jun 11, 2015 at 6:44 PM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 I tried to read a csv file from amazon s3, but I get the following
 exception which I have no clue how to solve this. I tried both spark 1.3.1
 and 1.2.1, but no success.  Any idea how to solve this is appreciated.


 best,
 /Shahab

 the code:

 val hadoopConf=sc.hadoopConfiguration;

  hadoopConf.set(fs.s3.impl,
 org.apache.hadoop.fs.s3native.NativeS3FileSystem)

  hadoopConf.set(fs.s3.awsAccessKeyId, aws_access_key_id)

  hadoopConf.set(fs.s3.awsSecretAccessKey, aws_secret_access_key)

  val csv = sc.textFile(s3n://mybucket/info.csv)  // original file

  val data = csv.map(line = line.split(,).map(elem = elem.trim)) //lines
 in rows


 Here is the exception I faced:

 Exception in thread main java.lang.NoClassDefFoundError:
 org/jets3t/service/ServiceException

 at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(
 NativeS3FileSystem.java:280)

 at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(
 NativeS3FileSystem.java:270)

 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)

 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)

 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)

 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)

 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)

 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

 at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(
 FileInputFormat.java:256)

 at org.apache.hadoop.mapred.FileInputFormat.listStatus(
 FileInputFormat.java:228)

 at org.apache.hadoop.mapred.FileInputFormat.getSplits(
 FileInputFormat.java:304)

 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)

 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
 MapPartitionsRDD.scala:32)

 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
 MapPartitionsRDD.scala:32)

 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)

 at org.apache.spark.rdd.RDD.count(RDD.scala:1006)



Re: spark stream and spark sql with data warehouse

2015-06-12 Thread Akhil Das
This is a good start, if you haven't read it already
http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

Thanks
Best Regards

On Thu, Jun 11, 2015 at 8:17 PM, 唐思成 jadetan...@qq.com wrote:

 Hi all:
We are trying to using spark to do some real time data processing.
 I need do some sql-like query and analytical tasks with the real time data
 against historical normalized data stored in data bases. Is there anyone
 has done this kind of work or design? Any suggestion or material would be
 truly welcomed.





[Spark 1.4.0] java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem implementation

2015-06-12 Thread Peter Haumer


Hello.
I used to be able to run debug my Spark apps in Eclipse for Spark 1.3.1 by
creating a launch and setting the vm var -Dspark.master=local[4].
I am not playing with the new 1.4 and trying out some of my simple samples,
which all fail with the same exception as shown below. Running them with
spark-submit works fine.

Anybody has any hints for getting it to work in the IDE again? It seems to
be related to loading input files, which path I provide via the main args
and the load via sc.textFile() in Java8. Are there any new options that I
missed to tell the app to use the local file system?

Exception in thread main java.lang.UnsupportedOperationException: Not
implemented by the TFS FileSystem implementation
at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:213)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(
FileSystem.java:2401)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(
FileSystem.java:2411)
at org.apache.hadoop.fs.FileSystem.createFileSystem(
FileSystem.java:2428)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(
FileSystem.java:2467)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:166)
at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(
JobConf.java:653)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(
FileInputFormat.java:389)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(
FileInputFormat.java:362)
at org.apache.spark.SparkContext$$anonfun$28.apply(
SparkContext.scala:762)
at org.apache.spark.SparkContext$$anonfun$28.apply(
SparkContext.scala:762)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(
HadoopRDD.scala:172)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(
HadoopRDD.scala:172)
at scala.Option.map(Option.scala:145)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:172)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:196)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219
)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217
)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219
)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217
)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219
)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217
)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219
)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217
)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1535)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:900)
at org.apache.spark.api.java.JavaRDDLike$class.reduce(
JavaRDDLike.scala:357)
at org.apache.spark.api.java.AbstractJavaRDDLike.reduce(
JavaRDDLike.scala:46)
at com.databricks.apps.logs.LogAnalyzer.main(LogAnalyzer.java:60)



Thanks and best regards,
Peter Haumer.

Re: Limit Spark Shuffle Disk Usage

2015-06-12 Thread Akhil Das
You can disable shuffle spill (spark.shuffle.spill
http://spark.apache.org/docs/latest/configuration.html#shuffle-behavior)
if you are having enough memory to hold that much data. I believe adding
more resources would be your only choice.

Thanks
Best Regards

On Thu, Jun 11, 2015 at 9:46 PM, Al M alasdair.mcbr...@gmail.com wrote:

 I am using Spark on a machine with limited disk space.  I am using it to
 analyze very large (100GB to 1TB per file) data sets stored in HDFS.  When
 I
 analyze these datasets, I will run groups, joins and cogroups.  All of
 these
 operations mean lots of shuffle files written to disk.

 Unfortunately what happens is my disk fills up very quickly (I only have
 40GB free).  Then my process dies because I don't have enough space on
 disk.
 I don't want to write my shuffles to HDFS because it's already pretty full.
 The shuffle files are cleared up between runs, but this doesnt help when a
 single run requires 300GB+ shuffle disk space.

 Is there any way that I can limit the amount of disk space used by my
 shuffles?  I could set up a cron job to delete old shuffle files whilst the
 job is still running, but I'm concerned that they are left there for a good
 reason.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-tp23279.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: --jars not working?

2015-06-12 Thread Akhil Das
You can verify if the jars are shipped properly by looking at the driver UI
(running on 4040) Environment tab.

Thanks
Best Regards

On Sat, Jun 13, 2015 at 12:43 AM, Jonathan Coveney jcove...@gmail.com
wrote:

 Spark version is 1.3.0 (will upgrade as soon as we upgrade past mesos
 0.19.0)...

 Regardless, I'm running into a really weird situation where when I pass
 --jars to bin/spark-shell I can't reference those classes on the repl. Is
 this expected? The logs even tell me that my jars have been added, and yet
 the classes inside of them are not available.

 Am I missing something obvious?



Re: takeSample() results in two stages

2015-06-12 Thread Imran Rashid
It launches two jobs because it doesn't know ahead of time how big your RDD
is, so it doesn't know what the sampling rate should be.  After counting
all the records, it can determine what the sampling rate should be -- then
it does another pass through the data, sampling by the rate its just
determined.

Note that this suggests: (a) if you know the size of your RDD ahead of
time, you could eliminate that first pass and (b) since you end up
computing the input RDD twice, it may make sense to cache it.

On Thu, Jun 11, 2015 at 11:43 AM, barmaley o...@solver.com wrote:

 I've observed interesting behavior in Spark 1.3.1, the reason for which is
 not clear.

 Doing something as simple as sc.textFile(...).takeSample(...) always
 results in two stages:Spark's takeSample() results in two stages

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n23280/Capture.jpg
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/takeSample-results-in-two-stages-tp23280.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Optimizing Streaming from Websphere MQ

2015-06-12 Thread Akhil Das
How many cores are you allocating for your job? And how many receivers are
you having? It would be good if you can post your custom receiver code, it
will help people to understand it better and shed some light.

Thanks
Best Regards

On Fri, Jun 12, 2015 at 12:58 PM, Chaudhary, Umesh 
umesh.chaudh...@searshc.com wrote:

  Hi,

 I have created a Custom Receiver in Java which receives data from
 Websphere MQ and I am only writing the received records on HDFS.



 I have referred many forums for optimizing speed of spark streaming
 application. Here I am listing a few:



 · Spark Official
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

 · VIrdata http://www.virdata.com/tuning-spark/

 ·  TD’s Slide (A bit Old but Useful)
 http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617



 I got mainly two point for my applicability :



 · giving batch interval as 1 sec

 · Controlling “spark.streaming.blockInterval” =200ms

 · inputStream.repartition(3)



 But that did not improve my actual speed (records/sec) of receiver which
 is MAX 5-10 records /sec. This is way less from my expectation.

 Am I missing something?



 Regards,

 Umesh Chaudhary
  This message, including any attachments, is the property of Sears
 Holdings Corporation and/or one of its subsidiaries. It is confidential and
 may contain proprietary or legally privileged information. If you are not
 the intended recipient, please delete it without reading the contents.
 Thank you.



Re: Re: How to keep a SQLContext instance alive in a spark streaming application's life cycle?

2015-06-12 Thread Tathagata Das
BTW, in Spark 1.4 announced today, I added SQLContext.getOrCreate. So you
dont need to create the singleton yourself.

On Wed, Jun 10, 2015 at 3:21 AM, Sergio Jiménez Barrio 
drarse.a...@gmail.com wrote:

 Note: CCing user@spark.apache.org


 First, you must check if the RDD is empty:

  messages.foreachRDD { rdd =
  if (!rdd.isEmpty) { }}

 Now, you can obtain the instance of a SQLContext:

 val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)




 *Optional*
 In this moment, I like work with DataFrame. I convert RDD to DataFrame. I
 see that you recive a JSON:

 val df :DataFrame = sqlContext.jsonRDD(message,
 getSchema(getSchemaStr)).toDF()


 My getSchema function create a Schema of my JSON:

 def getSchemaStr() :String = feature1 feature2 ...

 def getSchema(schema: String) :StructType = StructType (schema.split(
 ).map(fieldName = StructField(fieldName, StringType, true)))

 I hope you helps.

 Regards.



 2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] 
 ml-node+s1001560n23226...@n3.nabble.com:

 I don't know why, you said “Why? I tried this solution and works fine.”
 means your SQLContext instance alive all the streaming application’s life
 time, rather than one bath duration ? My code as below:


 object SQLContextSingleton extends java.io.Serializable{
   @transient private var instance: SQLContext = null

   // Instantiate SQLContext on demand
   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
 if (instance == null) {
   instance = new SQLContext(sparkContext)
 }
 instance
   }
 }

 // type_-typex, id_-id, url_-url
 case class  (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends 
 Serializable
 case class Count(x: Int)

 @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
 ssc.checkpoint(.)

 val kafkaParams = Map(metadata.broker.list - 10.20.30.40:9092,)
 @transient val dstream = KafkaUtils.createDirectStream[String, String, 
 StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name))
 @transient val dddstream= newsIdDStream.map(x = x._2).flatMap(x = 
 x.split(\n))

 dddstream.foreachRDD { rdd =
 
 SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable(ttable)
 val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql(SELECT 
 COUNT(*) FROM ttable)
 ret.foreach{ x = println(x(0)) }
 }

 ssc.start()
 ssc.awaitTermination()






 在 2015-06-09 17:41:44,drarse [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23226i=0 写道:

 Why? I  tried  this solution and works fine.

 El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] 
 [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23219i=0 escribió:

 Hi drarse, thanks for replying, the way you said use a singleton object
 does not work




 在 2015-06-09 16:24:25,drarse [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23218i=0 写道:

 The best way is create a singleton object like:

 object SQLContextSingleton {
   @transient private var instance: SQLContext = null

   // Instantiate SQLContext on demand
   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
 if (instance == null) {
   instance = new SQLContext(sparkContext)
 }
 instance
   }}

  You have more information in the programming guide:

 https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations



 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23216i=0:

 I used SQLContext in a spark streaming application as blew:

 

 case class topic_name (f1: Int, f2: Int)

 val sqlContext = new SQLContext(sc)
 @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
 ssc.checkpoint(.)
 val theDStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name))

 theDStream.map(x = x._2).foreach { rdd =
   sqlContext.jsonRDD(newsIdRDD).registerTempTable(topic_name)
   sqlContext.sql(select count(*) from topic_name).foreach { x =
 WriteToFile(file_path, x(0).toString)
   }
 }

 ssc.start()
 ssc.awaitTermination()
 


 I found i could only get every 5 seconds's count of message, because
 The lifetime of this temporary table is tied to the SQLContext that was
 used to create this DataFrame, i guess every 5 seconds, a new sqlContext
 will be create and the temporary table can only alive just 5 seconds, i
 want to the sqlContext and the temporary table alive all the streaming
 application's life cycle, how to do it?

 Thanks~

 --
  If you reply to 

Parquet Multiple Output

2015-06-12 Thread Xin Liu
Hi,

I have a scenario where I'd like to store a RDD using parquet format in
many files, which corresponds to days, such as 2015/01/01, 2015/02/02, etc.

So far I used this method

http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

to store text files (then I have to read text files and convert to parquet
and store again). Anyone has tried to store many parquet files from one RDD?

Thanks,
Xin


Re: Spark SQL and Skewed Joins

2015-06-12 Thread Michael Armbrust

 2. Does 1.3.2 or 1.4 have any enhancements that can help?   I tried to use
 1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is
 available, would any of the JOIN enhancements help this situation?


I would try Spark 1.4 after running SET
spark.sql.planner.sortMergeJoin=true.  Please report back if this works
for you.


Are there ways to restrict what parameters users can set for a Spark job?

2015-06-12 Thread YaoPau
For example, Hive lets you set a whole bunch of parameters (# of reducers, #
of mappers, size of reducers, cache size, max memory to use for a join),
while Impala gives users a much smaller subset of parameters to work with,
which makes it nice to give to a BI team.

Is there a way to restrict which parameters a user can set for a Spark job? 
Maybe to cap the # of executors, or cap the memory for each executor, or to
enforce a default setting no matter what parameters are used.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Are-there-ways-to-restrict-what-parameters-users-can-set-for-a-Spark-job-tp23301.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Hi Andrew,

Thanks a lot! Indeed, it doesn't start with spark, the following properties
are read by implementation of the driver rather than spark conf:

--conf spooky.root=s3n://spooky- \
--conf spooky.checkpoint=s3://spooky-checkpoint \

This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to set
the same properties?

Yours Peng

On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote:

 Hi Peng,

 Setting properties through --conf should still work in Spark 1.4. From the
 warning it looks like the config you are trying to set does not start with
 the prefix spark.. What is the config that you are trying to set?

 -Andrew

 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au:

 In Spark 1.3.x, the system property of the driver can be set by --conf
 option, shared between setting spark properties and system properties.

 In Spark 1.4.0 this feature is removed, the driver instead log the
 following
 warning:

 Warning: Ignoring non-spark config property: xxx.xxx=v

 How do set driver's system property in 1.4.0? Is there a reason it is
 removed without a deprecation warning?

 Thanks a lot for your advices.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Resource allocation configurations for Spark on Yarn

2015-06-12 Thread Jim Green
Hi Team,

Sharing one article which summarize the Resource allocation configurations
for Spark on Yarn:
Resource allocation configurations for Spark on Yarn
http://www.openkb.info/2015/06/resource-allocation-configurations-for.html

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Dynamic allocator requests -1 executors

2015-06-12 Thread Patrick Woody
Hey all,

I've recently run into an issue where spark dynamicAllocation has asked for
-1 executors from YARN. Unfortunately, this raises an exception that kills
the executor-allocation thread and the application can't request more
resources.

Has anyone seen this before? It is spurious and the application usually
works, but when this gets hit it becomes unusable when getting stuck at
minimum YARN resources.

Stacktrace below.

Thanks!
-Pat

470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught
exception in thread spark-dynamic-executor-allocation-0
471 ! java.lang.IllegalArgumentException: Attempted to request a negative
number of executor(s) -1 from the cluster manager. Please specify a
positive number!
472 ! at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338)
~[spark-core_2.10-1.3.1.jar:1.
473 ! at
org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137)
~[spark-core_2.10-1.3.1.jar:1.3.1]
474 ! at
org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
~[spark-core_2.10-1.3.1.jar:1.3.1]
475 ! at
org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
~[spark-core_2.10-1.3.1.jar:1.3.1]
476 ! at 
org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
~[spark-core_2.10-1.3.1.j
477 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
478 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
479 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
480 ! at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
~[spark-core_2.10-1.3.1.jar:1.3.1]
481 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
[spark-core_2.10-1.3.1.jar:1.3.1]
482 ! at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_71]
483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_71]
484 ! at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_71]
485 ! at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_71]
486 ! at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
487 ! at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]


[Spark] What is the most efficient way to do such a join and column manipulation?

2015-06-12 Thread Rex X
Hi,

I want to use spark to select N columns, top M rows of all csv files under
a folder.

To be concrete, say we have a folder with thousands of tab-delimited csv
files with following attributes format (each csv file is about 10GB):

idnameaddresscity...
1Mattadd1LA...
2Willadd2LA...
3Lucyadd3SF...
...

And we have a lookup table based on name above

namegender
MattM
LucyF
...

Now we are interested to output from top 100K rows of each csv file into
following format:

idnamegender
1MattM
...

Can we use pyspark to efficiently handle this?


Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Ted Yu
This is the SPARK JIRA which introduced the warning:

[SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties
in spark-shell and spark-submit

On Fri, Jun 12, 2015 at 4:34 PM, Peng Cheng rhw...@gmail.com wrote:

 Hi Andrew,

 Thanks a lot! Indeed, it doesn't start with spark, the following
 properties are read by implementation of the driver rather than spark conf:

 --conf spooky.root=s3n://spooky- \
 --conf spooky.checkpoint=s3://spooky-checkpoint \

 This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to
 set the same properties?

 Yours Peng

 On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote:

 Hi Peng,

 Setting properties through --conf should still work in Spark 1.4. From
 the warning it looks like the config you are trying to set does not start
 with the prefix spark.. What is the config that you are trying to set?

 -Andrew

 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au:

 In Spark 1.3.x, the system property of the driver can be set by --conf
 option, shared between setting spark properties and system properties.

 In Spark 1.4.0 this feature is removed, the driver instead log the
 following
 warning:

 Warning: Ignoring non-spark config property: xxx.xxx=v

 How do set driver's system property in 1.4.0? Is there a reason it is
 removed without a deprecation warning?

 Thanks a lot for your advices.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Amit Ramesh
Hi Juan,

I have created a ticket for this:
https://issues.apache.org/jira/browse/SPARK-8337

Thanks!
Amit


On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.com wrote:

 Hi,

 If you want I would be happy to work in this. I have worked with
 KafkaUtils.createDirectStream before, in a pull request that wasn't
 accepted https://github.com/apache/spark/pull/5367. I'm fluent with
 Python and I'm starting to feel comfortable with Scala, so if someone opens
 a JIRA I can take it.

 Greetings,

 Juan Rodriguez


 2015-06-12 15:59 GMT+02:00 Cody Koeninger c...@koeninger.org:

 The scala api has 2 ways of calling createDirectStream.  One of them
 allows you to pass a message handler that gets full access to the kafka
 MessageAndMetadata, including offset.

 I don't know why the python api was developed with only one way to call
 createDirectStream, but the first thing I'd look at would be adding that
 functionality back in.  If someone wants help creating a patch for that,
 just let me know.

 Dealing with offsets on a per-message basis may not be as efficient as
 dealing with them on a batch basis using the HasOffsetRanges interface...
 but if efficiency was a primary concern, you probably wouldn't be using
 Python anyway.

 On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Scala KafkaRDD uses a trait to handle this problem, but it is not so
 easy and straightforward in Python, where we need to have a specific API to
 handle this, I'm not sure is there any simple workaround to fix this, maybe
 we should think carefully about it.

 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com:


 Thanks, Jerry. That's what I suspected based on the code I looked at.
 Any pointers on what is needed to build in this support would be great.
 This is critical to the project we are currently working on.

 Thanks!


 On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add 
 this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets 
 are
 no more persisted in Zookeeper but rather within Spark itself. If you 
 want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so
 that we can continue from where we left off after a code deployment. In
 other words, we need exactly-once processing guarantees across code
 deployments. Spark does not support any state persistence across
 deployments so this is something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
  wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, 
 why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but
 haven't been able to figure out how to get the offsets from the RDD. 
 Looks
 like the documentation is yet to be updated to include Python examples 
 (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit











Re: NullPointerException with functions.rand()

2015-06-12 Thread Ted Yu
Created PR and verified the example given by Justin works with the change:
https://github.com/apache/spark/pull/6793

Cheers

On Wed, Jun 10, 2015 at 7:15 PM, Ted Yu yuzhih...@gmail.com wrote:

 Looks like the NPE came from this line:
   @transient protected lazy val rng = new XORShiftRandom(seed +
 TaskContext.get().partitionId())

 Could TaskContext.get() be null ?

 On Wed, Jun 10, 2015 at 6:15 PM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 I am using 1.4.0 and found the following weird behavior.

 This case works fine:

 scala sc.parallelize(Seq((1,2), (3, 100))).toDF.withColumn(index,
 rand(30)).show()
 +--+---+---+
 |_1| _2|  index|
 +--+---+---+
 | 1|  2| 0.6662967911724369|
 | 3|100|0.35734504984676396|
 +--+---+---+

 However, when I use sqlContext.createDataFrame instead, I get a NPE:

 scala sqlContext.createDataFrame(Seq((1,2), (3,
 100))).withColumn(index, rand(30)).show()
 java.lang.NullPointerException
 at
 org.apache.spark.sql.catalyst.expressions.RDG.rng$lzycompute(random.scala:39)
 at org.apache.spark.sql.catalyst.expressions.RDG.rng(random.scala:39)
 ..


 Does any one know why?

 Thanks.

 Justin

 --
 View this message in context: NullPointerException with functions.rand()
 http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-with-functions-rand-tp23267.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





Re: Parquet Multiple Output

2015-06-12 Thread Cheng Lian
Spark 1.4 supports dynamic partitioning, you can first convert your RDD 
to a DataFrame and then save the contents partitioned by date column. 
Say you have a DataFrame df containing three columns a, b, and c, you 
may have something like this:


df.write.partitionBy(a, 
b).mode(overwrite).parquet(path/to/file)


Cheng

On 6/13/15 5:31 AM, Xin Liu wrote:

Hi,

I have a scenario where I'd like to store a RDD using parquet format 
in many files, which corresponds to days, such as 2015/01/01, 
2015/02/02, etc.


So far I used this method

http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

to store text files (then I have to read text files and convert to 
parquet and store again). Anyone has tried to store many parquet files 
from one RDD?


Thanks,
Xin




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Thanks all for your information. Andrew, I dig out one of your old post
which is relevant:

http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-td5798.html

But didn't mention how to supply the properties that don't start with spark.

On 12 June 2015 at 19:39, Ted Yu yuzhih...@gmail.com wrote:

 This is the SPARK JIRA which introduced the warning:

 [SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties
 in spark-shell and spark-submit

 On Fri, Jun 12, 2015 at 4:34 PM, Peng Cheng rhw...@gmail.com wrote:

 Hi Andrew,

 Thanks a lot! Indeed, it doesn't start with spark, the following
 properties are read by implementation of the driver rather than spark conf:

 --conf spooky.root=s3n://spooky- \
 --conf spooky.checkpoint=s3://spooky-checkpoint \

 This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to
 set the same properties?

 Yours Peng

 On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote:

 Hi Peng,

 Setting properties through --conf should still work in Spark 1.4. From
 the warning it looks like the config you are trying to set does not start
 with the prefix spark.. What is the config that you are trying to set?

 -Andrew

 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au:

 In Spark 1.3.x, the system property of the driver can be set by --conf
 option, shared between setting spark properties and system properties.

 In Spark 1.4.0 this feature is removed, the driver instead log the
 following
 warning:

 Warning: Ignoring non-spark config property: xxx.xxx=v

 How do set driver's system property in 1.4.0? Is there a reason it is
 removed without a deprecation warning?

 Thanks a lot for your advices.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Spark SQL and Skewed Joins

2015-06-12 Thread Jon Walton
Greetings,

I am trying to implement a classic star schema ETL pipeline using Spark
SQL, 1.2.1.  I am running into problems with shuffle joins, for those
dimension tables which have skewed keys and are too large to let Spark
broadcast them.

I have a few questions

1. Can I split my queries so a unique, skewed key gets processed by by
multiple reducer steps?   I have tried this (using a UNION) but I am always
left with the 199/200 executors complete, which times out and even starts
throwing memory errors.   That single executor is processing 95% of the 80G
fact table for the single skewed key.

2. Does 1.3.2 or 1.4 have any enhancements that can help?   I tried to use
1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is
available, would any of the JOIN enhancements help this situation?

3. Do you have suggestions for memory config if I wanted to broadcast 2G
dimension tables?   Is this even feasible?   Do table broadcasts wind up in
the heap or in dedicated storage space?

Thanks for your help,

Jon


Reliable SQS Receiver for Spark Streaming

2015-06-12 Thread Michal Čizmazia
I would like to have a Spark Streaming SQS Receiver which deletes SQS
messages only after they were successfully stored on S3.

For this a Custom Receiver can be implemented with the semantics of
the Reliable Receiver.

The store(multiple-records) call blocks until the given records have
been stored and replicated inside Spark.

If the write-ahead logs are enabled, all the data received from a
receiver gets written into a write ahead log in the configuration
checkpoint directory. The checkpoint directory can be pointed to S3.

After the store(multiple-records) blocking call finishes, are the
records already stored in the checkpoint directory (and thus can be
safely deleted from SQS)?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: BigDecimal problem in parquet file

2015-06-12 Thread Davies Liu
Maybe it's related to a bug, which is fixed by
https://github.com/apache/spark/pull/6558 recently.

On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag bipin@gmail.com wrote:
 Hi Cheng,

 Yes, some rows contain unit instead of decimal values. I believe some rows
 from original source I had don't have any value i.e. it is null. And that
 shows up as unit. How does the spark-sql or parquet handle null in place of
 decimal values, assuming that field is nullable. I will have to change it
 properly.

 Thanks for helping out.
 Bipin

 On 12 June 2015 at 14:57, Cheng Lian lian.cs@gmail.com wrote:

 On 6/10/15 8:53 PM, Bipin Nag wrote:

 Hi Cheng,

 I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an
 existing parquet file, then repartitioning and saving it. Doing this gives
 the error. The code for this doesn't look like causing  problem. I have a
 feeling the source - the existing parquet is the culprit.

 I created that parquet using a jdbcrdd (pulled from microsoft sql server).
 First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made a
 dataframe from it using a schema then saved it as a parquet.

 Following is the code :
 For saving jdbcrdd:
  name - fullqualifiedtablename
  pk - string for primarykey
  pklast - last id to pull
 val myRDD = new JdbcRDD( sc, () =
 DriverManager.getConnection(url,username,password) ,
 SELECT * FROM  + name +  WITH (NOLOCK) WHERE ? = +pk+ and
 +pk+ = ?,
 1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
 myRDD.saveAsObjectFile(rawdata/+name);

 For applying schema and saving the parquet:
 val myschema = schemamap(name)
 val myrdd =
 sc.objectFile[Array[Object]](/home/bipin/rawdata/+name).map(x =
 org.apache.spark.sql.Row(x:_*))

 Have you tried to print out x here to check its contents? My guess is that
 x actually contains unit values. For example, the follow Spark shell code
 can reproduce a similar exception:

 import org.apache.spark.sql.types._
 import org.apache.spark.sql.Row

 val schema = StructType(StructField(dec, DecimalType(10, 0)) :: Nil)
 val rdd = sc.parallelize(1 to 10).map(_ = Array(())).map(arr = Row(arr:
 _*))
 val df = sqlContext.createDataFrame(rdd, schema)

 df.saveAsParquetFile(file:///tmp/foo)

 val actualdata = sqlContext.createDataFrame(myrdd, myschema)
 actualdata.saveAsParquetFile(/home/bipin/stageddata/+name)

 Schema structtype can be made manually, though I pull table's metadata and
 make one. It is a simple string translation (see sql docs and/or spark
 datatypes)

 That is how I created the parquet file. Any help to solve the issue is
 appreciated.
 Thanks
 Bipin


 On 9 June 2015 at 20:44, Cheng Lian lian.cs@gmail.com wrote:

 Would you please provide a snippet that reproduce this issue? What
 version of Spark were you using?

 Cheng

 On 6/9/15 8:18 PM, bipin wrote:

 Hi,
 When I try to save my data frame as a parquet file I get the following
 error:

 java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
 org.apache.spark.sql.types.Decimal
 at

 org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
 at

 org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
 at

 parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
 at
 parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
 at
 parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
 at

 org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
 at

 org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 How to fix this problem ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Broadcast value

2015-06-12 Thread Yasemin Kaya
Hi,

I am taking Broadcast value from file. I want to use it creating Rating
Object (ALS) .
But I am getting null. Here is my code
https://gist.github.com/yaseminn/d6afd0263f6db6ea4ec5 :

At lines 17  18 is ok but 19 returns null so 21 returns me error. Why I
don't understand.Do you have any idea ?


Best,
yasemin



-- 
hiç ender hiç


Writing data to hbase using Sparkstreaming

2015-06-12 Thread Vamshi Krishna
Hi I am trying to write data that is produced from kafka commandline
producer for some topic. I am facing problem and unable to proceed. Below
is my code which I am creating a jar and running through spark-submit on
spark-shell. Am I doing wrong inside foreachRDD() ? What is wrong with
 SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)   line in below error
message?



SparkConf sparkConf = new
SparkConf().setAppName(JavaKafkaDemo).setMaster(local).setSparkHome(/Users/kvk/softwares/spark-1.3.1-bin-hadoop2.4);
// Create the context with a 1 second batch size
JavaStreamingContext jsc = new
JavaStreamingContext(sparkConf, new Duration(5000));

int numThreads = 2;
MapString, Integer topicMap = new HashMapString,
Integer();
   // topicMap.put(viewTopic, numThreads);
topicMap.put(nonview, numThreads);

JavaPairReceiverInputDStreamString, String messages =
KafkaUtils.createStream(jsc, localhost,
ViewConsumer, topicMap);

JavaDStreamString lines = messages.map(new
FunctionTuple2String, String, String() {
@Override
public String call(Tuple2String, String tuple2) {
return tuple2._2();
}
});

lines.foreachRDD(new FunctionJavaRDDString, Void() {
 @Override
 public Void call(JavaRDDString
stringJavaRDD) throws Exception {

 JavaPairRDDImmutableBytesWritable, Put hbasePuts =
stringJavaRDD.mapToPair(
 new PairFunctionString,
ImmutableBytesWritable, Put() {
 @Override
 public
Tuple2ImmutableBytesWritable, Put call(String line) throws Exception {

 Put put = new
Put(Bytes.toBytes(Rowkey + Math.random()));

 put.addColumn(Bytes.toBytes(firstFamily), Bytes.toBytes(firstColumn),
Bytes.toBytes(line+fc));
 return new
Tuple2ImmutableBytesWritable, Put(new ImmutableBytesWritable(), put);
 }
 });

 // save to HBase- Spark built-in
API method

 
hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration());
 return null;
 }
 }
);
jsc.start();
jsc.awaitTermination();





I see below error on spark-shell.


./bin/spark-submit --class SparkKafkaDemo --master local
/Users/kvk/IntelliJWorkspace/HbaseDemo/HbaseDemo.jar

Exception in thread main org.apache.spark.SparkException: Task not
serializable

at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)

at org.apache.spark.rdd.RDD.map(RDD.scala:286)

at
org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113)

at
org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46)

at SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)

at SparkKafkaDemo$2.call(SparkKafkaDemo.java:60)

at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)

at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

at scala.util.Try$.apply(Try.scala:161)

at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at

Re: How to pass arguments dynamically, that needs to be used in executors

2015-06-12 Thread gaurav sharma
Thanks Todd, that solved my problem.

Regards,
Gaurav
(please excuse spelling mistakes)
Sent from phone
On Jun 11, 2015 6:42 PM, Todd Nist tsind...@gmail.com wrote:

 Hi Gaurav,

 Seems like you could use a broadcast variable for this if I understand
 your use case.  Create it in the driver based on the CommandLineArguments
 and then use it in the workers.


 https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

 So something like:

 BroadcastInteger cmdLineArg = sc.broadcast(Inetger.parseInd(args[12]));

 Then just reference the broadcast variable in you workers.  It will get
 shipped once to all nodes in the cluster and can be referenced by them.

 HTH.

 -Todd

 On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma sharmagaura...@gmail.com
 wrote:

 Hi,

 I am using Kafka Spark cluster for real time aggregation analytics use
 case in production.

 Cluster details
 6 nodes, each node running 1 Spark and kafka processes each.
 Node1  - 1 Master , 1 Worker, 1 Driver,
  1 Kafka process
 Node 2,3,4,5,6 - 1 Worker prcocess each
  1 Kafka process each

 Spark version 1.3.0
 Kafka Veriosn 0.8.1

 I am using Kafka Directstream for Kafka Spark Integration.
 Analytics code is written in using Spark Java API.

 Problem Statement :

   I want to accept a paramter as command line argument, and pass on
 to the executors.
   (want to use the paramter in rdd.foreach method which is executed
 on executor)

   I understand that when driver is started, only the jar is
 transported to all the Workers.
   But i need to use the dynamically passed command line argument in
 the reduce operation executed on executors.


 Code Snippets for better understanding my problem :

 public class KafkaReconcilationJob {

 private static Logger logger =
 Logger.getLogger(KafkaReconcilationJob.class);
  public static void main(String[] args) throws Exception {
   CommandLineArguments.CLICK_THRESHOLD =
 Integer.parseInt(args[12]);
 --- I want to use this
 command line argument
 }

 }



 JavaRDDAggregatedAdeStats adeAggregatedFilteredData =
 adeAudGeoAggDataRdd.filter(new FunctionAggregatedAdeStats, Boolean() {
 @Override
 public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception
 {
 if(adeAggregatedObj.getImpr()  CommandLineArguments.IMPR_THRESHOLD ||
 adeAggregatedObj.getClick()  CommandLineArguments.CLICK_THRESHOLD){
 return true;
 }else {
 return false;
 }
 }
 });



 The above mentioned Filter operation gets executed on executor which has
 0 as the value of the static field CommandLineArguments.CLICK_THRESHOLD


 Regards,
 Gaurav





Re: how to use a properties file from a url in spark-submit

2015-06-12 Thread Gary Ogden
That's a great idea. I did what you suggested and added the url to the
props file in the uri of the json. The properties file now shows up in the
sandbox.  But when it goes to run spark-submit  with --properties-file
props.properties   it fails to find it:

Exception in thread main java.lang.IllegalArgumentException:
requirement failed: Properties file props.properties does not exist


On 11 June 2015 at 22:17, Matthew Jones mle...@gmail.com wrote:

 If you are using chronos you can just put the url in the task json and
 chronos will download it into your sandbox. Then just use spark-submit
 --properties-file app.properties.

 On Thu, 11 Jun 2015 15:52 Marcelo Vanzin van...@cloudera.com wrote:

 That's not supported. You could use wget / curl to download the file to a
 temp location before running spark-submit, though.

 On Thu, Jun 11, 2015 at 12:48 PM, Gary Ogden gog...@gmail.com wrote:

 I have a properties file that is hosted at a url. I would like to be
 able to use the url in the --properties-file parameter when submitting a
 job to mesos using spark-submit via chronos

 I would rather do this than use a file on the local server.

 This doesn't seem to work though when submitting from chronos:

 bin/spark-submit --properties-file http://server01/props/app.properties


 Inside the properties file:
 spark.executor.memory=256M
 spark.cores.max=1
 spark.shuffle.consolidateFiles=true
 spark.task.cpus=1
 spark.deploy.defaultCores=1
 spark.driver.cores=1
 spark.scheduler.mode=FAIR

 So how do I specify a properties file in a url?




 --
 Marcelo




Re: ClassCastException: BlockManagerId cannot be cast to [B

2015-06-12 Thread davidkl
Hello,

Just in case someone finds the same issue, it was caused by running the
streaming app with different version of the cluster jars (the uber jar
contained both yarn and spark).

Regards

J



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-BlockManagerId-cannot-be-cast-to-B-tp23276p23296.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-12 Thread Steve Loughran
These are both really good posts: you should try and get them in to the 
documentation.

with anything implementing dynamicness, there are some fun problems

(a) detecting the delays in the workflow. There's some good ideas here
(b) deciding where to address it. That means you need to monitor the entire 
pipeline —which you should be doing in production anyway.
(c) choosing the action. More nodes, more memory  CPU (not that useful for 
Java code, even when YARN adds support for dynamic container resize)
(d) choosing the size of the action. In a shared cluster, extra resources for 
one app comes at the expense of others. If you have pre-emption turned on in 
YARN, the scheduler can take containers off lower priority work, which 
automates a lot of this decision making. That will lose other work though, so 
to justify it you'd better hang on those containers
(e) deciding if/when to hand things back. Scaling things down can be very 
expensive if lots of state has to get rebuilt elsewhere.

I think Apache Helix from LinkedIn has done some good work here -worth looking 
at to see what lessons  code to lift. And as you'd expect, it sits right 
behind Kafka in production. I think it gets away with low delays to scale 
up/down and relying on low rebuild costs. [In the work I've been doing with 
colleagues on dynamic HBase and Accumulo clusters, we've not attempted to do 
any autoscale, because scale down is an expensive decision...we're focusing on 
liveness detection and reaction, then publishing the metrics needed to allow 
people or cross-application tools to make the decision)

On 12 Jun 2015, at 04:38, Dmitry Goldenberg 
dgoldenberg...@gmail.commailto:dgoldenberg...@gmail.com wrote:

Yes, Tathagata, thank you.

For #1, the 'need detection', one idea we're entertaining is timestamping the 
messages coming into the Kafka topics. The consumers would check the interval 
between the time they get the message and that message origination timestamp. 
As Kafka topics start to fill up more, we would presumably see longer and 
longer wait times (delays) for messages to be getting processed by the 
consumers.  The consumers would then start firing off critical events into an 
Event Analyzer/Aggregator which would decide that more resources are needed, 
then ask the Provisioning Component to allocate N new machines.

We do want to set maxRatePerPartition in order to not overwhelm the consumers 
and run out of memory.  Machine provisioning may take a while, and if left with 
no maxRate guards, our consumers could run out of memory.

Since there are no receivers, if the cluster gets a new executor, it will 
automatically start getting used to run tasks... no need to do anything 
further.  This is great, actually. We were wondering whether we'd need to 
restart the consumers once the new machines have been added. Tathagata's point 
implies, as I read it, that no further orchestration is needed, the load will 
start getting redistributed automatically. This makes implementation of 
autoscaling a lot simpler, as far as #3.

One issue that's not yet been covered much is the scenario when *fewer* cluster 
resources become required (a system load valley rather than a peak). To detect 
a low volume, we'd need to measure the throughput in messages per second over 
time.  Real low volumes would cause firing off of critical events signaling to 
the Analyzer that machines could be decommissioned.

If machines are being decommissioned, it would seem that the consumers would 
need to get acquiesced (allowed to process any current batch, then shut down), 
then they would restart themselves or be restarted. Thoughts on this?

There is also a hefty #4 here which is the hysteresis of this, where the 
system operates adaptively and learns over time, remembering the history of 
cluster expansions and contractions and allowing a certain slack for letting 
things cool down or heat up more gradually; also not contracting or expanding 
too frequently.  PID controllers  and thermostat types of design patterns have 
been mentioned before in this discussion.


If you look at the big cloud apps, they dynamically reallocate VM images based 
on load history, with Netflix being the poster user: Hadoop work in the quiet 
hours, user interaction evenings and weekends. Excluding special events 
(including holidays), there's a lot of regularity over time, which lets you 
predict workload in advance.  It's like your thermostat knowing fridays are 
cold and it should crank up the heating in advance.






On Thu, Jun 11, 2015 at 11:08 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Let me try to add some clarity in the different thought directions that's going 
on in this thread.

1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES?

If there are not rate limits set up, the most reliable way to detect whether 
the current Spark cluster is being insufficient to handle the data load is to 
use the StreamingListner interface which 

Re: How to use Window Operations with kafka Direct-API?

2015-06-12 Thread Cody Koeninger
The other thing to keep in mind about spark window operations against Kafka
is that spark streaming is based on current system clock, not the time
embedded in your messages.

So you're going to get a fundamentally wrong answer from a window operation
after a failure / restart, regardless of whether you're using the
createStream or createDirectStream api.

On Fri, Jun 12, 2015 at 9:14 AM, Cody Koeninger c...@koeninger.org wrote:

 Casting to HasOffsetRanges would be meaningless anyway if done after an
 operation that changes partitioning.

 You can still use the messageHandler argument to createDirectStream to get
 access to offsets on a per-message basis.

 Also, it doesn't look like what you're doing is particularly concerned
 about transactional correctness (since you're saving offsets to a kafka api
 backed by zookeeper), so you can try doing a transform as the first step in
 your stream, and casting to HasOffsetRanges there.



 On Fri, Jun 12, 2015 at 5:03 AM, zigen dbviewer.zi...@gmail.com wrote:

 Hi Shao,

 Thank you for your quick prompt.
 I was disappointed.
 I will try window operations with Receiver-based
 Approach(KafkaUtils.createStream).

 Thank you again,
 ZIGEN


 2015/06/12 17:18、Saisai Shao sai.sai.s...@gmail.com のメッセージ:

 I think you could not use offsetRange in such way, when you transform a
 DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is
 changed into normal RDD, but offsetRange is a specific attribute for
 KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will
 meet such exception.

 you could only do something like:

 directKafkaInputDStream.foreachRDD { rdd =
rdd.asInstanceOf[HasOffsetRanges]
   ...
 }

 Apply foreachRDD directly on DirectKafkaInputDStream.







 2015-06-12 16:10 GMT+08:00 ZIGEN dbviewer.zi...@gmail.com:

 Hi, I'm using Spark Streaming(1.3.1).
 I want to get exactly-once messaging from Kafka and use Window
 operations of
 DStraem,

 When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
 Direct-API
 java.lang.ClassCastException occurs as follows.

 --- stacktrace --
 java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD
 cannot
 be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
 at

 org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
 at

 org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
 at

 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
 at

 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
 at

 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at

 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at

 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)


 --- my source ---

 JavaStreamingContext jssc = new JavaStreamingContext(_ctx,
 batchInterval);
 jssc.checkpoint(checkpoint);

 JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream
  (jssc, String.class, String.class, StringDecoder.class,
 StringDecoder.class, kafkaParams, topicsSet);

 JavaPairDStreamString, Listlt;String pairDS =
 messages.mapToPair(...);

 JavaPairDStreamString, Listlt;String windowDs =
 pairDS.reduceByKeyAndWindow(new Function2Listlt;String, ListString,
 ListString() {
 @Override
 public ListString call(ListString list1, ListString list2)
 throws
 Exception {
 ...
 }
 }, windowDuration, slideDuration);

 windowDs.foreachRDD(new 

RE: How to set spark master URL to contain domain name?

2015-06-12 Thread Wang, Ningjun (LNG-NPV)
I think the problem is that in my local etc/hosts file, I have

10.196.116.95 WIN02

I will remove it and try. Thanks for the help.

Ningjun

From: prajod.vettiyat...@wipro.com [mailto:prajod.vettiyat...@wipro.com]
Sent: Friday, June 12, 2015 1:44 AM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: RE: How to set spark master URL to contain domain name?

Hi Ningjun,

This is probably a configuration difference between WIN01 and WIN02.

Execute: ipconfig/all on the windows command line on both machines and compare 
them.

Also if you have a localhost entry in the hosts file, it should not have the  
wrong sequence: See the first answer in this link: 
http://stackoverflow.com/questions/6049260/fully-qualified-machine-name-java-with-etc-hosts

Regards,
Prajod

From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com]
Sent: 12 June 2015 01:32
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to set spark master URL to contain domain name?

I start spark master on windows using

bin\spark-class.cmd org.apache.spark.deploy.master.Master

Then I goto http://localhost:8080/ to find the master URL, it is

spark://WIN02:7077

Here WIN02 is my machine name. Why does it missing the domain name? If I start 
the spark master on other machines, the master URL will contain domain name, 
e.g.
spark://WIN01.mycompany.com:7077

Only on machine WIN02, the master URL does not contains the domain name. How 
can I config WIN02 so that spark master URL will also contain the domain name, 
e.g.
Spark://WIN02.mycompany.com

Thanks


Ningjun

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.comhttp://www.wipro.com


Spark Streaming - Can i BIND Spark Executor to Kafka Partition Leader

2015-06-12 Thread gaurav sharma
Hi,

I am using Kafka Spark cluster for real time aggregation analytics use case
in production.

*Cluster details*

*6 nodes*, each node running 1 Spark and kafka processes each.
Node1  - 1 Master , 1 Worker, 1 Driver,
   1 Kafka process
Node 2,3,4,5,6 - 1 Worker prcocess each  1
Kafka process each

Spark version 1.3.0
Kafka Veriosn 0.8.1

I am using *Kafka* *Directstream* for Kafka Spark Integration.
Analytics code is written in using Spark Java API.

*Problem Statement : *

  We are dealing with about *10 M records per hour*.
  My Spark Streaming Batch runs at *1 hour interval*( at 11:30 12:30
1:30 and so on)

  Since i am using Direct Stream, it reads all the data for past hour
at 11:30 12:30 1:30 and so on
  Though as of now it takes *about 3 minutes* to read the data with
Network bandwidth utilization of  *100-200 MBPS per node*( out of 6 node
Spark Cluster)

  Since i am running both Spark and Kafka on same machine
*  I WANT TO BIND MY SPARK EXECUTOR TO KAFKA PARTITION LEADER*, so as
to elliminate the Network bandwidth consumption of Spark.

  I understand that the number of partitions created on Spark for a
Direct Stream is equivalent to the number of partitions on Kafka, which is
the reason got a curiosity, perhaps there might be such a provision in
SPark.



Regards,
Gaurav


Issues with `when` in Column class

2015-06-12 Thread Chris Freeman
I’m trying to iterate through a list of Columns and create new Columns based on 
a condition. However, the when method keeps giving me errors that don’t quite 
make sense.

If I do `when(col === “abc”, 1).otherwise(0)` I get the following error at 
compile time:

[error] not found: value when

However, this works in the REPL just fine after I import 
org.apache.spark.sql.Column.

On the other hand, if I do `col.when(col === “abc”, 1).otherwise(0)`, it will 
compile successfully, but then at runtime, I get this error:

java.lang.IllegalArgumentException: when() can only be applied on a Column 
previously generated by when() function

This appears to be pretty circular logic. How can `when` only be applied to a 
Column previously generated by `when`? How would I call `when` in the first 
place?


Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread algermissen1971
Hi,

I have a scenario with spark streaming, where I need to write to a database 
from within updateStateByKey[1].

That means that inside my update function I need a connection.

I have so far understood that I should create a new (lazy) connection for every 
partition. But since I am not working in foreachRDD I wonder where I can 
iterate over the partitions.

Should I use mapPartitions() somewhere up the chain? 

Jan



[1] The use case being saving ‘done' sessions during web tracking.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark-1.4.0]jackson-databind conflict?

2015-06-12 Thread Earthson
I'm using Play-2.4 with play-json-2.4, It works fine with spark-1.3.1, but it
failed after I upgrade Spark to spark-1.4.0:(

sc.parallelize(1 to 1).count

code
[info]   com.fasterxml.jackson.databind.JsonMappingException: Could not find
creator property with name 'id' (in class
org.apache.spark.rdd.RDDOperationScope)
[info]  at [Source: {id:0,name:parallelize}; line: 1, column: 1]
[info]   at
com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
[info]   at
com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
[info]   at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
[info]   at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
[info]   at
com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
[info]   at
com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
/code



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-jackson-databind-conflict-tp23295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark-1.4.0]jackson-databind conflict?

2015-06-12 Thread Sean Owen
I see the same thing in an app that uses Jackson 2.5. Downgrading to
2.4 made it work. I meant to go back and figure out if there's
something that can be done to work around this in Spark or elsewhere,
but for now, harmonize your Jackson version at 2.4.x if you can.

On Fri, Jun 12, 2015 at 4:20 PM, Earthson earthson...@gmail.com wrote:
 I'm using Play-2.4 with play-json-2.4, It works fine with spark-1.3.1, but it
 failed after I upgrade Spark to spark-1.4.0:(

 sc.parallelize(1 to 1).count

 code
 [info]   com.fasterxml.jackson.databind.JsonMappingException: Could not find
 creator property with name 'id' (in class
 org.apache.spark.rdd.RDDOperationScope)
 [info]  at [Source: {id:0,name:parallelize}; line: 1, column: 1]
 [info]   at
 com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
 [info]   at
 com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
 [info]   at
 com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
 [info]   at
 com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
 [info]   at
 com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
 [info]   at
 com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
 [info]   at
 com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
 [info]   at
 com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
 [info]   at
 com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
 [info]   at
 com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
 /code



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-jackson-databind-conflict-tp23295.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issues with `when` in Column class

2015-06-12 Thread Yin Huai
Hi Chris,

Have you imported org.apache.spark.sql.functions._?

Thanks,

Yin

On Fri, Jun 12, 2015 at 8:05 AM, Chris Freeman cfree...@alteryx.com wrote:

  I’m trying to iterate through a list of Columns and create new Columns
 based on a condition. However, the when method keeps giving me errors that
 don’t quite make sense.

  If I do `when(col === “abc”, 1).otherwise(0)` I get the following error
 at compile time:

  [error] not found: value when

  However, this works in the REPL just fine after I import
 org.apache.spark.sql.Column.

  On the other hand, if I do `col.when(col === “abc”, 1).otherwise(0)`, it
 will compile successfully, but then at runtime, I get this error:

  java.lang.IllegalArgumentException: when() can only be applied on a
 Column previously generated by when() function

  This appears to be pretty circular logic. How can `when` only be applied
 to a Column previously generated by `when`? How would I call `when` in the
 first place?



[Spark-1.4.0] NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer

2015-06-12 Thread Tao Li
Hi all:

I complied new spark 1.4.0 version today. But when I run WordCount demo, it
throws NoSuchMethodError *java.lang.NoSuchMethodError:
com.fasterxml.jackson.module.scala.deser.BigDecimalDeserialize*r.

I found the default *fasterxml.jackson.version* is *2.4.4*. It's there
any wrong with the jackson version?


Apache Spark architecture

2015-06-12 Thread Vitalii Duk
Trying to find a complete documentation about an internal architecture of
Apache Spark, but have no results there.

For example I'm trying to understand next thing: Assume that we have 1Tb
text file on HDFS (3 nodes in a cluster, replication factor is 1). This
file will be spitted into 128Mb chunks and each chunk will be stored only
on one node. We run Spark Workers on these nodes. I know that Spark is
trying to work with data stored in HDFS on the same node (to avoid network
I/O). For example I'm trying to do a word count in this 1Tb text file.

Here I have next questions:

   1. Does Spark will load chuck (128Mb) into RAM, count words, and then
   delete it from memory and do it sequentially? What if there will be no
   available RAM?
   2. When does Spark will use not local data on HDFS?
   3. What if I will need to do more complex task, when a results of each
   iteration on each Worker need to be transferred to all other Workers
   (shuffling?), do I need to write them by my self to HDFS and then read
   them? For example I can't understand how does K-means clustering or
   Gradient descent works on Spark.

I will appreciate any link to Apache Spark architecture guide.

-- 
Best regards,
Vitalii Duk


Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
There are several database apis that use a thread local or singleton
reference to a connection pool (we use ScalikeJDBC currently, but there are
others).

You can use mapPartitions earlier in the chain to make sure the connection
pool is set up on that executor, then use it inside updateStateByKey

On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
algermissen1...@icloud.com wrote:

 Hi,

 I have a scenario with spark streaming, where I need to write to a
 database from within updateStateByKey[1].

 That means that inside my update function I need a connection.

 I have so far understood that I should create a new (lazy) connection for
 every partition. But since I am not working in foreachRDD I wonder where I
 can iterate over the partitions.

 Should I use mapPartitions() somewhere up the chain?

 Jan



 [1] The use case being saving ‘done' sessions during web tracking.


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Issues with `when` in Column class

2015-06-12 Thread Chris Freeman
That did it! Thanks!

From: Yin Huai
Date: Friday, June 12, 2015 at 10:31 AM
To: Chris Freeman
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Issues with `when` in Column class

Hi Chris,

Have you imported org.apache.spark.sql.functions._?

Thanks,

Yin

On Fri, Jun 12, 2015 at 8:05 AM, Chris Freeman 
cfree...@alteryx.commailto:cfree...@alteryx.com wrote:
I’m trying to iterate through a list of Columns and create new Columns based on 
a condition. However, the when method keeps giving me errors that don’t quite 
make sense.

If I do `when(col === “abc”, 1).otherwise(0)` I get the following error at 
compile time:

[error] not found: value when

However, this works in the REPL just fine after I import 
org.apache.spark.sql.Column.

On the other hand, if I do `col.when(col === “abc”, 1).otherwise(0)`, it will 
compile successfully, but then at runtime, I get this error:

java.lang.IllegalArgumentException: when() can only be applied on a Column 
previously generated by when() function

This appears to be pretty circular logic. How can `when` only be applied to a 
Column previously generated by `when`? How would I call `when` in the first 
place?



Re: how to use a properties file from a url in spark-submit

2015-06-12 Thread Matthew Jones
Hmm either spark-submit isn't picking up the relative path or Chronos is
not setting your working directory to your sandbox. Try using cd
$MESOS_SANDBOX  spark-submit --properties-file props.properties

On Fri, Jun 12, 2015 at 12:32 PM Gary Ogden gog...@gmail.com wrote:

 That's a great idea. I did what you suggested and added the url to the
 props file in the uri of the json. The properties file now shows up in the
 sandbox.  But when it goes to run spark-submit  with --properties-file
 props.properties   it fails to find it:

 Exception in thread main java.lang.IllegalArgumentException: requirement 
 failed: Properties file props.properties does not exist


 On 11 June 2015 at 22:17, Matthew Jones mle...@gmail.com wrote:

 If you are using chronos you can just put the url in the task json and
 chronos will download it into your sandbox. Then just use spark-submit
 --properties-file app.properties.

 On Thu, 11 Jun 2015 15:52 Marcelo Vanzin van...@cloudera.com wrote:

 That's not supported. You could use wget / curl to download the file to
 a temp location before running spark-submit, though.

 On Thu, Jun 11, 2015 at 12:48 PM, Gary Ogden gog...@gmail.com wrote:

 I have a properties file that is hosted at a url. I would like to be
 able to use the url in the --properties-file parameter when submitting a
 job to mesos using spark-submit via chronos

 I would rather do this than use a file on the local server.

 This doesn't seem to work though when submitting from chronos:

 bin/spark-submit --properties-file http://server01/props/app.properties


 Inside the properties file:
 spark.executor.memory=256M
 spark.cores.max=1
 spark.shuffle.consolidateFiles=true
 spark.task.cpus=1
 spark.deploy.defaultCores=1
 spark.driver.cores=1
 spark.scheduler.mode=FAIR

 So how do I specify a properties file in a url?




 --
 Marcelo





Re: Is it possible to see Spark jobs on MapReduce job history ? (running Spark on YARN cluster)

2015-06-12 Thread Steve Loughran

For that you need SPARK-1537 and the patch to go with it

It is still the spark web UI, it just hands off storage and retrieval of the 
history to the underlying Yarn timeline server, rather than through the 
filesystem. You'll get to see things as they go along too.

If you do want to try it, please have a go, and provide any feedback on the 
JIRA/pull request. I should warn: it needs Hadoop 2.6 (Apache,  HDP 2.2, 
CDH5.4), due to some API changes. While the patch is for 1.4+,  I already have 
a local branch with it applied to spark 1.3.1

 On 12 Jun 2015, at 03:01, Elkhan Dadashov elkhan8...@gmail.com wrote:
 
 Hi all,
 
 I wonder if anyone has used use MapReduce Job History to show Spark jobs.
 
 I can see my Spark jobs (Spark running on Yarn cluster) on Resource manager 
 (RM).
 
 I start Spark History server, and then through Spark's web-based user 
 interface I can monitor the cluster (and track cluster and job statistics). 
 Basically Yarn RM gets linked to Spark History server, which enables 
 monitoring.
 
 But instead of using Spark History Server , is it possible to see Spark jobs 
 on MapReduce job history ? (in addition to seeing them on RM)
 
 (I know through yarn logs -applicationId app ID we can get all logs after 
 Spark job has completed, but my concern is to see the logs and completed jobs 
  through common web ui - MapReduce Job History )
 
 Thanks in advance.
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Cody Koeninger
The scala api has 2 ways of calling createDirectStream.  One of them allows
you to pass a message handler that gets full access to the kafka
MessageAndMetadata, including offset.

I don't know why the python api was developed with only one way to call
createDirectStream, but the first thing I'd look at would be adding that
functionality back in.  If someone wants help creating a patch for that,
just let me know.

Dealing with offsets on a per-message basis may not be as efficient as
dealing with them on a batch basis using the HasOffsetRanges interface...
but if efficiency was a primary concern, you probably wouldn't be using
Python anyway.

On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com wrote:

 Scala KafkaRDD uses a trait to handle this problem, but it is not so easy
 and straightforward in Python, where we need to have a specific API to
 handle this, I'm not sure is there any simple workaround to fix this, maybe
 we should think carefully about it.

 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com:


 Thanks, Jerry. That's what I suspected based on the code I looked at. Any
 pointers on what is needed to build in this support would be great. This is
 critical to the project we are currently working on.

 Thanks!


 On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets are
 no more persisted in Zookeeper but rather within Spark itself. If you want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so that
 we can continue from where we left off after a code deployment. In other
 words, we need exactly-once processing guarantees across code deployments.
 Spark does not support any state persistence across deployments so this is
 something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't
 been able to figure out how to get the offsets from the RDD. Looks like 
 the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit









Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space

2015-06-12 Thread Josh Rosen


Sent from my phone

 On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian 
 sanjaysubraman...@yahoo.com.INVALID wrote:
 
 hey guys
 
 Using Hive and Impala daily intensively.
 Want to transition to spark-sql in CLI mode
 
 Currently in my sandbox I am using the Spark (standalone mode) in the CDH 
 distribution (starving developer version 5.3.3)
 3 datanode hadoop cluster
 32GB RAM per node
 8 cores per node
 
 spark 
 1.2.0+cdh5.3.3+371
 
 
 I am testing some stuff on one view and getting memory errors
 Possibly reason is default memory per executor showing on 18080 is 
 512M
 
 These options when used to start the spark-sql CLI does not seem to have any 
 effect 
 --total-executor-cores 12 --executor-memory 4G
 
 
 
 /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e  select distinct 
 isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view
 
 aers.aers_demo_view (7 million+ records)
 ===
 isr bigint  case id
 event_dtbigint  Event date
 age double  age of patient
 age_cod string  days,months years
 sex string  M or F
 yearint
 quarter int
 
 
 VIEW DEFINITION
 
 CREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS 
 `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, 
 `year` AS `year`, `quarter` AS `quarter` FROM (SELECT
`aers_demo_v1`.`isr`,
`aers_demo_v1`.`event_dt`,
`aers_demo_v1`.`age`,
`aers_demo_v1`.`age_cod`,
`aers_demo_v1`.`gndr_cod`,
`aers_demo_v1`.`year`,
`aers_demo_v1`.`quarter`
 FROM
   `aers`.`aers_demo_v1`
 UNION ALL
 SELECT
`aers_demo_v2`.`isr`,
`aers_demo_v2`.`event_dt`,
`aers_demo_v2`.`age`,
`aers_demo_v2`.`age_cod`,
`aers_demo_v2`.`gndr_cod`,
`aers_demo_v2`.`year`,
`aers_demo_v2`.`quarter`
 FROM
   `aers`.`aers_demo_v2`
 UNION ALL
 SELECT
`aers_demo_v3`.`isr`,
`aers_demo_v3`.`event_dt`,
`aers_demo_v3`.`age`,
`aers_demo_v3`.`age_cod`,
`aers_demo_v3`.`gndr_cod`,
`aers_demo_v3`.`year`,
`aers_demo_v3`.`quarter`
 FROM
   `aers`.`aers_demo_v3`
 UNION ALL
 SELECT
`aers_demo_v4`.`isr`,
`aers_demo_v4`.`event_dt`,
`aers_demo_v4`.`age`,
`aers_demo_v4`.`age_cod`,
`aers_demo_v4`.`gndr_cod`,
`aers_demo_v4`.`year`,
`aers_demo_v4`.`quarter`
 FROM
   `aers`.`aers_demo_v4`
 UNION ALL
 SELECT
`aers_demo_v5`.`primaryid` AS `ISR`,
`aers_demo_v5`.`event_dt`,
`aers_demo_v5`.`age`,
`aers_demo_v5`.`age_cod`,
`aers_demo_v5`.`gndr_cod`,
`aers_demo_v5`.`year`,
`aers_demo_v5`.`quarter`
 FROM
   `aers`.`aers_demo_v5`
 UNION ALL
 SELECT
`aers_demo_v6`.`primaryid` AS `ISR`,
`aers_demo_v6`.`event_dt`,
`aers_demo_v6`.`age`,
`aers_demo_v6`.`age_cod`,
`aers_demo_v6`.`sex` AS `GNDR_COD`,
`aers_demo_v6`.`year`,
`aers_demo_v6`.`quarter`
 FROM
   `aers`.`aers_demo_v6`) `aers_demo_view`
 
 
 
 
 
 
 
 15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a 
 user handler while handling an exception event ([id: 0x01b99855, 
 /10.0.0.19:58117 = /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: 
 Java heap space)
 java.lang.OutOfMemoryError: Java heap space
 at 
 org.jboss.netty.buffer.HeapChannelBuffer.init(HeapChannelBuffer.java:42)
 at 
 org.jboss.netty.buffer.BigEndianHeapChannelBuffer.init(BigEndianHeapChannelBuffer.java:34)
 at 
 org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)
 at 
 org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68)
 at 
 org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
 at 
 org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507)
 at 
 org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345)
 at 
 org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312)
 at 
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
 at 
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
 at 
 org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
 at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
 at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
 at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
 at 
 org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 15/06/11 08:36:40 ERROR Utils: Uncaught exception in thread 
 task-result-getter-0
 java.lang.OutOfMemoryError: GC 

Re: Optimisation advice for Avro-Parquet merge job

2015-06-12 Thread James Aley
Hey Kiran,

Thanks very much for the response. I left for vacation before I could try
this out, but I'll experiment once I get back and let you know how it goes.

Thanks!

James.

On 8 June 2015 at 12:34, kiran lonikar loni...@gmail.com wrote:

 It turns out my assumption on load and unionAll being blocking is not
 correct. They are transformations. So instead of just running only the load
 and unionAll in the run() methods, I think you will have to save the
 intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS
 like http://tachyon-project.org/) in the run() methods. The second for
 loop will also have to load from the intermediate parquet files. Then
 finally save the final dfInput[0] to the HDFS.

 I think this way of parallelizing will force the cluster to utilize the
 all the resources.

 -Kiran

 On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar loni...@gmail.com wrote:

 James,

 As I can see, there are three distinct parts to your program:

- for loop
- synchronized block
- final outputFrame.save statement

 Can you do a separate timing measurement by putting a simple
 System.currentTimeMillis() around these blocks to know how much they are
 taking and then try to optimize where it takes longest? In the second
 block, you may want to measure the time for the two statements. Improving
 this boils down to playing with spark settings.

 Now consider the first block: I think this is a classic case of merge
 sort or a reduce tree. You already tried to improve this by submitting jobs
 in parallel using executor pool/Callable etc.

 To further improve the parallelization, I suggest you use a reduce tree
 like approach. For example, lets say you want to compute sum of all
 elements of an array in parallel. The way its solved for a GPU like
 platform is you divide your input array initially in chunks of 2, compute
 those n/2 sums parallely on separate threads and save the result in the
 first of the two elements. In the next iteration, you compute n/4 sums
 parallely of the earlier sums and so on till you are left with only two
 elements whose sum gives you final sum.

 You are performing many sequential unionAll operations for inputs.size()
 avro files. Assuming the unionAll() on DataFrame is blocking (and not a
 simple transformation like on RDDs) and actually performs the union
 operation, you will certainly benefit by parallelizing this loop. You may
 change the loop to something like below:

 // pseudo code only
 int n = inputs.size()
 // initialize executor
 executor = new FixedThreadPoolExecutor(n/2)
 dfInput = new DataFrame[n/2]
 for(int i =0;i  n/2;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace with
 dfInput(i) in your code
 dfInput[i] = sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i +
 n/2), com.databricks.spark.avro))
 }
 });
 }

 executor.awaitTermination(0, TimeUnit.SECONDS)

 int steps = log(n)/log(2.0)
 for(s = 2; s  steps;s++) {
 int stride = n/(1  s); // n/(2^s)
 for(int i = 0;i  stride;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace
 with dfInput(i) and dfInput(i+stride) in your code
 dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
 }
 });
 }
 executor.awaitTermination(0, TimeUnit.SECONDS)
 }

 Let me know if it helped.

 -Kiran


 On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com
 wrote:

 Thanks for the confirmation! We're quite new to Spark, so a little
 reassurance is a good thing to have sometimes :-)

 The thing that's concerning me at the moment is that my job doesn't seem
 to run any faster with more compute resources added to the cluster, and
 this is proving a little tricky to debug. There are a lot of variables, so
 here's what we've tried already and the apparent impact. If anyone has any
 further suggestions, we'd love to hear!

 * Increase the minimum number of output files (targetPartitions
 above), so that input groups smaller than our minimum chunk size can still
 be worked on by more than one executor. This does measurably speed things
 up, but obviously it's a trade-off, as the original goal for this job is to
 merge our data into fewer, larger files.

 * Submit many jobs in parallel, by running the above code in a Callable,
 on an executor pool. This seems to help, to some extent, but I'm not sure
 what else needs to be configured alongside it -- driver threads, scheduling
 policy, etc. We set scheduling to FAIR when doing this, as that seemed
 like the right approach, but we're not 100% confident. It seemed to help
 quite substantially anyway, so perhaps this just needs further tuning?

 * Increasing executors, 

Re: Spark 1.4 release date

2015-06-12 Thread ayan guha
Thanks guys, my question must look like a stupid one today :) Looking
forward to test out 1.4.0, just downloaded it.

Congrats to the team for this much anticipate release.

On Fri, Jun 12, 2015 at 10:12 PM, Guru Medasani gdm...@gmail.com wrote:

 Here is a spark 1.4 release blog by data bricks.

 https://databricks.com/blog/2015/06/11/announcing-apache-spark-1-4.html


 Guru Medasani
 gdm...@gmail.com



 On Jun 12, 2015, at 7:08 AM, ayan guha guha.a...@gmail.com wrote:

 Thanks a lot.
 On 12 Jun 2015 19:46, Todd Nist tsind...@gmail.com wrote:

 It was released yesterday.

 On Friday, June 12, 2015, ayan guha guha.a...@gmail.com wrote:

 Hi

 When is official spark 1.4 release date?
 Best
 Ayan





-- 
Best Regards,
Ayan Guha


Re: Spark Java API and minimum set of 3rd party dependencies

2015-06-12 Thread Sean Owen
You don't add dependencies to your app -- you mark Spark as 'provided'
in the build and you rely on the deployed Spark environment to provide
it.

On Fri, Jun 12, 2015 at 7:14 PM, Elkhan Dadashov elkhan8...@gmail.com wrote:
 Hi all,

 We want to integrate Spark in our Java application using the Spark Java Api
 and run then on the Yarn clusters.

 If i want to run Spark on Yarn, which dependencies are must for including ?

 I looked at Spark POM which lists that Spark requires 50+ 3rd party
 dependencies.

 Is there minimum set of Spark dependencies which are necessary for Spark
 Java API  (for Spark client run on Yarn cluster) ?

 Thanks in advance.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
In Spark 1.3.x, the system property of the driver can be set by --conf
option, shared between setting spark properties and system properties.

In Spark 1.4.0 this feature is removed, the driver instead log the following
warning:

Warning: Ignoring non-spark config property: xxx.xxx=v

How do set driver's system property in 1.4.0? Is there a reason it is
removed without a deprecation warning?

Thanks a lot for your advices.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Andrew Or
Hi Peng,

Setting properties through --conf should still work in Spark 1.4. From the
warning it looks like the config you are trying to set does not start with
the prefix spark.. What is the config that you are trying to set?

-Andrew

2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au:

 In Spark 1.3.x, the system property of the driver can be set by --conf
 option, shared between setting spark properties and system properties.

 In Spark 1.4.0 this feature is removed, the driver instead log the
 following
 warning:

 Warning: Ignoring non-spark config property: xxx.xxx=v

 How do set driver's system property in 1.4.0? Is there a reason it is
 removed without a deprecation warning?

 Thanks a lot for your advices.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Exception when using CLUSTER BY or ORDER BY

2015-06-12 Thread Reynold Xin
Tom,

Can you file a JIRA and attach a small reproducible test case if possible?


On Tue, May 19, 2015 at 1:50 PM, Thomas Dudziak tom...@gmail.com wrote:

 Under certain circumstances that I haven't yet been able to isolate, I get
 the following error when doing a HQL query using HiveContext (Spark 1.3.1
 on Mesos, fine-grained mode). Is this a known problem or should I file a
 JIRA for it ?


 org.apache.spark.SparkException: Can only zip RDDs with same number of 
 elements in each partition
   at 
 org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at 
 org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56)
   at 
 org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259)
   at 
 org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257)
   at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
   at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:64)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)




Re: Spark Java API and minimum set of 3rd party dependencies

2015-06-12 Thread Elkhan Dadashov
Thanks for prompt response, Sean.

The issue is that we are restricted on dependencies we can include in our
project.

There are 2 issues while including dependencies:

1) there are several dependencies which we and Spark has, but the versions
are conflicting.
2) there are dependencies Spark has, and our project does not have.

How do we handle these 2 cases differently for including Spark dependencies
(direct and transitive ones)?

We need to include all dependencies (so there should not be 3rd party
transitive dependency) in our POM file, more like Apache Ivy style of
management of dependencies (which includes all transitive dependencies in
POM file) rather than Maven style.

Our main goal is: We want to integrate Spark in our Java application using
the Spark Java APIi and run then on the Yarn clusters.

Thanks a lot.


On Fri, Jun 12, 2015 at 11:17 AM, Sean Owen so...@cloudera.com wrote:

 You don't add dependencies to your app -- you mark Spark as 'provided'
 in the build and you rely on the deployed Spark environment to provide
 it.

 On Fri, Jun 12, 2015 at 7:14 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:
  Hi all,
 
  We want to integrate Spark in our Java application using the Spark Java
 Api
  and run then on the Yarn clusters.
 
  If i want to run Spark on Yarn, which dependencies are must for
 including ?
 
  I looked at Spark POM which lists that Spark requires 50+ 3rd party
  dependencies.
 
  Is there minimum set of Spark dependencies which are necessary for Spark
  Java API  (for Spark client run on Yarn cluster) ?
 
  Thanks in advance.
 




-- 

Best regards,
Elkhan Dadashov


Spark Java API and minimum set of 3rd party dependencies

2015-06-12 Thread Elkhan Dadashov
Hi all,

We want to integrate Spark in our Java application using the Spark Java Api
and run then on the Yarn clusters.

If i want to run Spark on Yarn, which dependencies are must for including ?

I looked at Spark POM
http://central.maven.org/maven2/org/apache/spark/spark-core_2.10/1.3.1/spark-core_2.10-1.3.1.pom
which lists that Spark requires 50+ 3rd party dependencies.

Is there minimum set of Spark dependencies which are necessary for Spark
Java API  (for Spark client run on Yarn cluster) ?

Thanks in advance.


log4j configuration ignored for some classes only

2015-06-12 Thread lomax0...@gmail.com
Hi all,

I am running spark standalone (local[*]), and have tried to cut back on
some of the logging noise from the framework by editing log4j.properties in
spark/conf.
The following lines are working as expected:

log4j.logger.org.apache.spark=WARN
log4j.logger.org.apache.spark.storage.BlockManager=ERROR

(I've even guaranteed that it's definitely using my configuration by adding
a prefix to the conversion pattern).

However, I am still getting log messages at INFO from classes like:
org.apache.spark.Logging$class(should be covered by the
org.apache.spark setting)
kafka.utils.Logging$class(when I add a similar setting for
kafka.utils)

I suspect it's because these are inner classes. It still happens even when
I go up a level and add configurations like log4j.logger.org=WARN.

Is this a known bug in log4j? Is there any known way to suppress these,
ideally through configuration rather than programmatically?

Many thanks


Fwd: Spark/PySpark errors on mysterious missing /tmp file

2015-06-12 Thread John Berryman
(This question is also present on StackOverflow
http://stackoverflow.com/questions/30656083/spark-pyspark-errors-on-mysterious-missing-tmp-file
)

I'm having issues with pyspark and a missing /tmp file. I've narrowed down
the behavior to a short snippet.

 a=sc.parallelize([(16646160,1)]) # yes, just a single element
 b=stuff # this is read in from a text file above - the contents are
shown below
 # b=sc.parallelize(b.collect())
 a.join(b).take(10)

This fails, but if I include the commented line (which should be the same
thing), then it succeeds. Here is the error:


---
Py4JJavaError Traceback (most recent call
last)
ipython-input-101-90fe86df7879 in module()
  3 b=stuff.map(lambda x:(16646160,1))
  4 #b=sc.parallelize(b.collect())
 5 a.join(b).take(10)
  6 b.take(10)

/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1109
   1110 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-  res = self.context.runJob(self, takeUpToNumLeft, p,
True)
   1112
   1113 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd,
partitionFunc, partitions, allowLocal)
816 # SparkContext#runJob.
817 mappedRDD = rdd.mapPartitions(partitionFunc)
-- 818 it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, javaPartitions, allowLocal)
819 return
list(mappedRDD._collect_iterator_through_file(it))
820

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
__call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer,
self.gateway_client,
-- 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling
{0}{1}{2}.\n'.
-- 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 1 in stage 210.0 failed 1 times, most recent failure: Lost task 1.0 in
stage 210.0 (TID 884, localhost):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File /usr/lib/spark/python/pyspark/worker.py, line 92, in main
command = pickleSer.loads(command.value)
  File /usr/lib/spark/python/pyspark/broadcast.py, line 106, in value
self._value = self.load(self._path)
  File /usr/lib/spark/python/pyspark/broadcast.py, line 87, in load
with open(path, 'rb', 1  20) as f:
IOError: [Errno 2] No such file or directory:
'/tmp/spark-4a8c591e-9192-4198-a608-c7daa3a5d494/tmpuzsAVM'

at
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at
org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468)
at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at

Re: spark-sql from CLI ---EXCEPTION: java.lang.OutOfMemoryError: Java heap space

2015-06-12 Thread Josh Rosen
It sounds like this might be caused by a memory configuration problem.  In 
addition to looking at the executor memory, I'd also bump up the driver memory, 
since it appears that your shell is running out of memory when collecting a 
large query result.

Sent from my phone

 On Jun 11, 2015, at 8:43 AM, Sanjay Subramanian 
 sanjaysubraman...@yahoo.com.INVALID wrote:
 
 hey guys
 
 Using Hive and Impala daily intensively.
 Want to transition to spark-sql in CLI mode
 
 Currently in my sandbox I am using the Spark (standalone mode) in the CDH 
 distribution (starving developer version 5.3.3)
 3 datanode hadoop cluster
 32GB RAM per node
 8 cores per node
 
 spark 
 1.2.0+cdh5.3.3+371
 
 
 I am testing some stuff on one view and getting memory errors
 Possibly reason is default memory per executor showing on 18080 is 
 512M
 
 These options when used to start the spark-sql CLI does not seem to have any 
 effect 
 --total-executor-cores 12 --executor-memory 4G
 
 
 
 /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e  select distinct 
 isr,event_dt,age,age_cod,sex,year,quarter from aers.aers_demo_view
 
 aers.aers_demo_view (7 million+ records)
 ===
 isr bigint  case id
 event_dtbigint  Event date
 age double  age of patient
 age_cod string  days,months years
 sex string  M or F
 yearint
 quarter int
 
 
 VIEW DEFINITION
 
 CREATE VIEW `aers.aers_demo_view` AS SELECT `isr` AS `isr`, `event_dt` AS 
 `event_dt`, `age` AS `age`, `age_cod` AS `age_cod`, `gndr_cod` AS `sex`, 
 `year` AS `year`, `quarter` AS `quarter` FROM (SELECT
`aers_demo_v1`.`isr`,
`aers_demo_v1`.`event_dt`,
`aers_demo_v1`.`age`,
`aers_demo_v1`.`age_cod`,
`aers_demo_v1`.`gndr_cod`,
`aers_demo_v1`.`year`,
`aers_demo_v1`.`quarter`
 FROM
   `aers`.`aers_demo_v1`
 UNION ALL
 SELECT
`aers_demo_v2`.`isr`,
`aers_demo_v2`.`event_dt`,
`aers_demo_v2`.`age`,
`aers_demo_v2`.`age_cod`,
`aers_demo_v2`.`gndr_cod`,
`aers_demo_v2`.`year`,
`aers_demo_v2`.`quarter`
 FROM
   `aers`.`aers_demo_v2`
 UNION ALL
 SELECT
`aers_demo_v3`.`isr`,
`aers_demo_v3`.`event_dt`,
`aers_demo_v3`.`age`,
`aers_demo_v3`.`age_cod`,
`aers_demo_v3`.`gndr_cod`,
`aers_demo_v3`.`year`,
`aers_demo_v3`.`quarter`
 FROM
   `aers`.`aers_demo_v3`
 UNION ALL
 SELECT
`aers_demo_v4`.`isr`,
`aers_demo_v4`.`event_dt`,
`aers_demo_v4`.`age`,
`aers_demo_v4`.`age_cod`,
`aers_demo_v4`.`gndr_cod`,
`aers_demo_v4`.`year`,
`aers_demo_v4`.`quarter`
 FROM
   `aers`.`aers_demo_v4`
 UNION ALL
 SELECT
`aers_demo_v5`.`primaryid` AS `ISR`,
`aers_demo_v5`.`event_dt`,
`aers_demo_v5`.`age`,
`aers_demo_v5`.`age_cod`,
`aers_demo_v5`.`gndr_cod`,
`aers_demo_v5`.`year`,
`aers_demo_v5`.`quarter`
 FROM
   `aers`.`aers_demo_v5`
 UNION ALL
 SELECT
`aers_demo_v6`.`primaryid` AS `ISR`,
`aers_demo_v6`.`event_dt`,
`aers_demo_v6`.`age`,
`aers_demo_v6`.`age_cod`,
`aers_demo_v6`.`sex` AS `GNDR_COD`,
`aers_demo_v6`.`year`,
`aers_demo_v6`.`quarter`
 FROM
   `aers`.`aers_demo_v6`) `aers_demo_view`
 
 
 
 
 
 
 
 15/06/11 08:36:36 WARN DefaultChannelPipeline: An exception was thrown by a 
 user handler while handling an exception event ([id: 0x01b99855, 
 /10.0.0.19:58117 = /10.0.0.19:52016] EXCEPTION: java.lang.OutOfMemoryError: 
 Java heap space)
 java.lang.OutOfMemoryError: Java heap space
 at 
 org.jboss.netty.buffer.HeapChannelBuffer.init(HeapChannelBuffer.java:42)
 at 
 org.jboss.netty.buffer.BigEndianHeapChannelBuffer.init(BigEndianHeapChannelBuffer.java:34)
 at 
 org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)
 at 
 org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68)
 at 
 org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
 at 
 org.jboss.netty.handler.codec.frame.FrameDecoder.newCumulationBuffer(FrameDecoder.java:507)
 at 
 org.jboss.netty.handler.codec.frame.FrameDecoder.updateCumulation(FrameDecoder.java:345)
 at 
 org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:312)
 at 
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
 at 
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
 at 
 org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
 at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
 at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
 at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
 at 
 org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at