Re: JAVA_HOME problem

2015-04-23 Thread Akhil Das
Isn't this related to this https://issues.apache.org/jira/browse/SPARK-6681

Thanks
Best Regards

On Fri, Apr 24, 2015 at 11:40 AM, sourabh chaki 
wrote:

> I am also facing the same problem with spark 1.3.0 and yarn-client and
> yarn-cluster mode. Launching yarn container failed and this is the error in
> stderr:
>
> Container: container_1429709079342_65869_01_01
>
> ===
> LogType: stderr
> LogLength: 61
> Log Contents:
> /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory
>
> LogType: stdout
> LogLength: 0
> Log Contents:
>
> I have added JAVA_HOME in hadoop-env.sh as well spark-env.sh
> grep JAVA_HOME /etc/hadoop/conf.cloudera.yarn/hadoop-env.sh
> export JAVA_HOME=/usr/java/default
> export PATH=$PATH:$JAVA_HOME/bin/java
>
> grep JAVA_HOME /var/spark/spark-1.3.0-bin-hadoop2.4/conf/spark-env.sh
> export JAVA_HOME="/usr/java/default"
>
> I could see another thread for the same problem but I dont see any
> solution.
>
> http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0
>  Any pointer will be helpful.
>
> Thanks
> Sourabh
>
>
> On Thu, Apr 2, 2015 at 1:23 PM, 董帅阳 <917361...@qq.com> wrote:
>
>> spark 1.3.0
>>
>>
>> spark@pc-zjqdyyn1:~> tail /etc/profile
>> export JAVA_HOME=/usr/jdk64/jdk1.7.0_45
>> export PATH=$PATH:$JAVA_HOME/bin
>>
>> #
>> # End of /etc/profile
>> #‍
>>
>>
>> But ERROR LOG
>>
>> Container: container_1427449644855_0092_02_01 on pc-zjqdyy04_45454
>> 
>> LogType: stderr
>> LogLength: 61
>> Log Contents:
>> /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory
>>
>> LogType: stdout
>> LogLength: 0
>> Log Contents:‍
>>
>
>


Re: problem writing to s3

2015-04-23 Thread Akhil Das
You should probably open a JIRA issue with this i think.

Thanks
Best Regards

On Fri, Apr 24, 2015 at 3:27 AM, Daniel Mahler  wrote:

> Hi Akhil
>
> I can confirm that the problem goes away when jsonRaw and jsonClean are in
> different s3 buckets.
>
> thanks
> Daniel
>
> On Thu, Apr 23, 2015 at 1:27 AM, Akhil Das 
> wrote:
>
>> Can you try writing to a different S3 bucket and confirm that?
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Apr 23, 2015 at 12:11 AM, Daniel Mahler 
>> wrote:
>>
>>> Hi Akhil,
>>>
>>> It works fine when outprefix is a hdfs:///localhost/... url.
>>>
>>> It looks to me as if there is something about spark writing to the same
>>> s3 bucket it is reading from.
>>>
>>> That is the only real difference between the 2 saveAsTextFile whet
>>> outprefix is on s3,
>>> inpath is also on s3 but in a different bucket, but jsonRaw and
>>> jsonClean are distinct directories in the same bucket.
>>> I do know know why that should be a problem though.
>>>
>>> I will rerun using s3 paths and send the log information.
>>>
>>> thanks
>>> Daniel
>>>
>>> thanks
>>> Daniel
>>>
>>> On Wed, Apr 22, 2015 at 1:45 AM, Akhil Das 
>>> wrote:
>>>
 Can you look in your worker logs and see whats happening in there? Are
 you able to write the same to your HDFS?

 Thanks
 Best Regards

 On Wed, Apr 22, 2015 at 4:45 AM, Daniel Mahler 
 wrote:

> I am having a strange problem writing to s3 that I have distilled to
> this minimal example:
>
> def jsonRaw = s"${outprefix}-json-raw"
> def jsonClean = s"${outprefix}-json-clean"
>
> val txt = sc.textFile(inpath)//.coalesce(shards, false)
> txt.count
>
> val res = txt.saveAsTextFile(jsonRaw)
>
> val txt2 = sc.textFile(jsonRaw +"/part-*")
> txt2.count
>
> txt2.saveAsTextFile(jsonClean)
>
> This code should simply copy files from inpath to jsonRaw and then
> from jsonRaw to jsonClean.
> This code executes all the way down to the last line where it hangs
> after creating the output directory contatining a _temporary_$folder but 
> no
> actual files not even temporary ones.
>
> `outputprefix` is and  bucket url, both jsonRaw and jsonClean are in
> the same bucket.
> Both calls .count succeed and return the same number. This means Spark
> can read from inpath and can both read from and write to jsonRaw. Since
> jsonClean is in the same bucket as jsonRaw and the final line does create
> the directory, I cannot think of any reason why the files should  not be
> written. If there were any access or url problems they should already
> manifest when writing jsonRaw.
>
> This problem is completely reproduceable with Spark 1.2.1 and 1.3.1
> The console output from the last line is
>
> scala> txt0.saveAsTextFile(jsonClean)
> 15/04/21 22:55:48 INFO storage.BlockManager: Removing broadcast 3
> 15/04/21 22:55:48 INFO storage.BlockManager: Removing block
> broadcast_3_piece0
> 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3_piece0
> of size 2024 dropped from memory (free 278251716)
> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
> broadcast_3_piece0 on ip-10-51-181-81.ec2.internal:45199 in memory (size:
> 2024.0 B, free: 265.4 MB)
> 15/04/21 22:55:48 INFO storage.BlockManagerMaster: Updated info of
> block broadcast_3_piece0
> 15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3
> 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3 of size
> 2728 dropped from memory (free 27825)
> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
> broadcast_3_piece0 on ip-10-166-129-153.ec2.internal:46671 in memory 
> (size:
> 2024.0 B, free: 13.8 GB)
> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
> broadcast_3_piece0 on ip-10-51-153-34.ec2.internal:51691 in memory (size:
> 2024.0 B, free: 13.8 GB)
> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
> broadcast_3_piece0 on ip-10-158-142-155.ec2.internal:54690 in memory 
> (size:
> 2024.0 B, free: 13.8 GB)
> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
> broadcast_3_piece0 on ip-10-61-144-7.ec2.internal:44849 in memory (size:
> 2024.0 B, free: 13.8 GB)
> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
> broadcast_3_piece0 on ip-10-69-77-180.ec2.internal:42417 in memory (size:
> 2024.0 B, free: 13.8 GB)
> 15/04/21 22:55:48 INFO spark.ContextCleaner: Cleaned broadcast 3
> 15/04/21 22:55:49 INFO spark.SparkContext: Starting job:
> saveAsTextFile at :38
> 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Got job 2
> (saveAsTextFile at :38) with 96 output partitions
> (allowLocal=false)
> 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Final stage: Stage
> 2(saveAsTextFile at :38)
> 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Parents o

Re: JAVA_HOME problem

2015-04-23 Thread sourabh chaki
I am also facing the same problem with spark 1.3.0 and yarn-client and
yarn-cluster mode. Launching yarn container failed and this is the error in
stderr:

Container: container_1429709079342_65869_01_01
===
LogType: stderr
LogLength: 61
Log Contents:
/bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

LogType: stdout
LogLength: 0
Log Contents:

I have added JAVA_HOME in hadoop-env.sh as well spark-env.sh
grep JAVA_HOME /etc/hadoop/conf.cloudera.yarn/hadoop-env.sh
export JAVA_HOME=/usr/java/default
export PATH=$PATH:$JAVA_HOME/bin/java

grep JAVA_HOME /var/spark/spark-1.3.0-bin-hadoop2.4/conf/spark-env.sh
export JAVA_HOME="/usr/java/default"

I could see another thread for the same problem but I dont see any
solution.
http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0
 Any pointer will be helpful.

Thanks
Sourabh


On Thu, Apr 2, 2015 at 1:23 PM, 董帅阳 <917361...@qq.com> wrote:

> spark 1.3.0
>
>
> spark@pc-zjqdyyn1:~> tail /etc/profile
> export JAVA_HOME=/usr/jdk64/jdk1.7.0_45
> export PATH=$PATH:$JAVA_HOME/bin
>
> #
> # End of /etc/profile
> #‍
>
>
> But ERROR LOG
>
> Container: container_1427449644855_0092_02_01 on pc-zjqdyy04_45454
> 
> LogType: stderr
> LogLength: 61
> Log Contents:
> /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory
>
> LogType: stdout
> LogLength: 0
> Log Contents:‍
>


RE: Re: problem with spark thrift server

2015-04-23 Thread Cheng, Hao
Hi, can you describe a little bit how the ThriftServer crashed, or steps to 
reproduce that? It’s probably a bug of ThriftServer.

Thanks,

From: guoqing0...@yahoo.com.hk [mailto:guoqing0...@yahoo.com.hk]
Sent: Friday, April 24, 2015 9:55 AM
To: Arush Kharbanda
Cc: user
Subject: Re: Re: problem with spark thrift server

Thanks for your reply , i would like to use Spark Thriftserver as JDBC SQL 
interface and the Spark application running on YARN . but the application was 
FINISHED when the Thriftserver crashed , all the cached table was lost .

Thriftserver start command:
start-thriftserver.sh --master yarn --executor-memory 20480m --executor-cores 2 
 --num-executors 20 --queue spark

My question is whether the Thriftserver has anyother more stable mode on YARN , 
like active standby in the Thriftserver .
Really appreciate for any suggestions and idea .
Thanks.

From: Arush Kharbanda
Date: 2015-04-23 18:40
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: problem with spark thrift server
Hi

What do you mean disable the driver? what are you trying to achieve.

Thanks
Arush

On Thu, Apr 23, 2015 at 12:29 PM, 
guoqing0...@yahoo.com.hk 
mailto:guoqing0...@yahoo.com.hk>> wrote:
Hi ,
I have a question about spark thrift server , i deployed the spark on yarn  and 
found if the spark driver disable , the spark application will be crashed on 
yarn.  appreciate for any suggestions and idea .

Thank you!



--

[Sigmoid Analytics]

Arush Kharbanda || Technical Teamlead

ar...@sigmoidanalytics.com || 
www.sigmoidanalytics.com


RE: gridsearch - python

2015-04-23 Thread Pagliari, Roberto
I know grid search with cross validation is not supported. However, I was 
wondering if there is something availalable for the time being.

Thanks,


From: Punyashloka Biswal [mailto:punya.bis...@gmail.com]
Sent: Thursday, April 23, 2015 9:06 PM
To: Pagliari, Roberto; user@spark.apache.org
Subject: Re: gridsearch - python

https://issues.apache.org/jira/browse/SPARK-7022.
Punya

On Thu, Apr 23, 2015 at 5:47 PM Pagliari, Roberto 
mailto:rpagli...@appcomsci.com>> wrote:
Can anybody point me to an example, if available, about gridsearch with python?

Thank you,



Re: Re: Is the Spark-1.3.1 support build with scala 2.8 ?

2015-04-23 Thread guoqing0...@yahoo.com.hk
Thank you very much for your suggestion.

Regards,
 
From: madhu phatak
Date: 2015-04-24 13:06
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: Is the Spark-1.3.1 support build with scala 2.8 ?
Hi,
AFAIK it's only build with 2.10 and 2.11.  You should integrate 
kafka_2.10.0-0.8.0 to make it work.




Regards,
Madhukara Phatak
http://datamantra.io/

On Fri, Apr 24, 2015 at 9:22 AM, guoqing0...@yahoo.com.hk 
 wrote:
Is the Spark-1.3.1 support build with scala 2.8 ?  Wether it can integrated 
with kafka_2.8.0-0.8.0 If build with scala 2.10 . 

Thanks.



Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-23 Thread Sourav Chandra
*bump*

On Thu, Apr 23, 2015 at 3:46 PM, Sourav Chandra <
sourav.chan...@livestream.com> wrote:

> HI TD,
>
> Some observations:
>
> 1. If I submit the application using spark-submit tool with *client as
> deploy mode* it works fine with single master and worker (driver, master
> and worker are running in same machine)
> 2. If I submit the application using spark-submit tool with client as
> deploy mode it *crashes after some time with  StackOverflowError* *single
> master and 2 workers* (driver, master and 1 worker is running in same
> machine, other
> worker is in different machine)
>  *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0
> (TID 5412)*
> *java.lang.StackOverflowError*
> *at
> java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)*
> *at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)*
> *at
> java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)*
> *at
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)*
> *at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)*
> *at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)*
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
> *at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
> *at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
> *at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
> *at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
> *at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
> *at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
> *at
> scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
> *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
> *at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
> *at java.lang.reflect.Method.invoke(Method.java:606)*
> *at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
> *at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
> *at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
> *at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
> *at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
> *at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
> *at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
> *at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
> *at
> scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
> *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
> *at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
> *at java.lang.reflect.Method.invoke(Method.java:606)*
> *at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
> *at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
> *at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
> *at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
> *at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
> *at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
> *at
> java.io.ObjectInp

Re: Is the Spark-1.3.1 support build with scala 2.8 ?

2015-04-23 Thread madhu phatak
Hi,
AFAIK it's only build with 2.10 and 2.11.  You should integrate
kafka_2.10.0-0.8.0
to make it work.




Regards,
Madhukara Phatak
http://datamantra.io/

On Fri, Apr 24, 2015 at 9:22 AM, guoqing0...@yahoo.com.hk <
guoqing0...@yahoo.com.hk> wrote:

> Is the Spark-1.3.1 support build with scala 2.8 ?  Wether it can
> integrated with kafka_2.8.0-0.8.0 If build with scala 2.10 .
>
> Thanks.
>


Is the Spark-1.3.1 support build with scala 2.8 ?

2015-04-23 Thread guoqing0...@yahoo.com.hk
Is the Spark-1.3.1 support build with scala 2.8 ?  Wether it can integrated 
with kafka_2.8.0-0.8.0 If build with scala 2.10 . 

Thanks.


Re: spark 1.3.0 strange log message

2015-04-23 Thread Terry Hole
Use this in spark conf: spark.ui.showConsoleProgress=false

Best Regards,

On Fri, Apr 24, 2015 at 11:23 AM, Henry Hung  wrote:

>  Dear All,
>
>
>
> When using spark 1.3.0 spark-submit with directing out and err to a log
> file, I saw some strange lines inside that looks like this:
>
> [Stage 0:>(0 + 2)
> / 120]
>
> [Stage 0:>(2 + 2)
> / 120]
>
> [Stage 0:==>  (6 + 2)
> / 120]
>
> [Stage 0:=>  (12 + 2)
> / 120]
>
> [Stage 0:=>  (20 + 2)
> / 120]
>
> [Stage 0:===>(24 + 2)
> / 120]
>
> [Stage 0:==> (32 + 2)
> / 120]
>
> [Stage 0:===>(42 + 2)
> / 120]
>
> [Stage 0:>   (52 + 2)
> / 120]
>
> [Stage 0:===>(59 + 2)
> / 120]
>
> [Stage 0:===>(68 + 2)
> / 120]
>
> [Stage 0:>   (78 + 3)
> / 120]
>
> [Stage 0:=>  (88 + 4)
> / 120]
>
> [Stage 0:=> (100 + 2)
> / 120]
>
> [Stage 0:==>(110 + 2)
> / 120]
>
>
>
>
>
> Here is my log4j property:
>
>
>
> # Set everything to be logged to the console
>
> log4j.rootCategory=WARN, console
>
> log4j.appender.console=org.apache.log4j.ConsoleAppender
>
> log4j.appender.console.target=System.err
>
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>
> log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
> %c{1}: %m%n
>
>
>
> # Settings to quiet third party logs that are too verbose
>
> log4j.logger.org.eclipse.jetty=WARN
>
> log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
>
> log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
>
> log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
>
>
>
>
>
> I want to know how to disable this kind of stage progress message?
>
>
>
> Best regards,
>
> Henry
>
> --
> The privileged confidential information contained in this email is
> intended for use only by the addressees as indicated by the original sender
> of this email. If you are not the addressee indicated in this email or are
> not responsible for delivery of the email to such a person, please kindly
> reply to the sender indicating this fact and delete all copies of it from
> your computer and network server immediately. Your cooperation is highly
> appreciated. It is advised that any unauthorized use of confidential
> information of Winbond is strictly prohibited; and any information in this
> email irrelevant to the official business of Winbond shall be deemed as
> neither given nor endorsed by Winbond.
>


spark 1.3.0 strange log message

2015-04-23 Thread Henry Hung
Dear All,

When using spark 1.3.0 spark-submit with directing out and err to a log file, I 
saw some strange lines inside that looks like this:
[Stage 0:>(0 + 2) / 120]
[Stage 0:>(2 + 2) / 120]
[Stage 0:==>  (6 + 2) / 120]
[Stage 0:=>  (12 + 2) / 120]
[Stage 0:=>  (20 + 2) / 120]
[Stage 0:===>(24 + 2) / 120]
[Stage 0:==> (32 + 2) / 120]
[Stage 0:===>(42 + 2) / 120]
[Stage 0:>   (52 + 2) / 120]
[Stage 0:===>(59 + 2) / 120]
[Stage 0:===>(68 + 2) / 120]
[Stage 0:>   (78 + 3) / 120]
[Stage 0:=>  (88 + 4) / 120]
[Stage 0:=> (100 + 2) / 120]
[Stage 0:==>(110 + 2) / 120]


Here is my log4j property:

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: 
%m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO


I want to know how to disable this kind of stage progress message?

Best regards,
Henry


The privileged confidential information contained in this email is intended for 
use only by the addressees as indicated by the original sender of this email. 
If you are not the addressee indicated in this email or are not responsible for 
delivery of the email to such a person, please kindly reply to the sender 
indicating this fact and delete all copies of it from your computer and network 
server immediately. Your cooperation is highly appreciated. It is advised that 
any unauthorized use of confidential information of Winbond is strictly 
prohibited; and any information in this email irrelevant to the official 
business of Winbond shall be deemed as neither given nor endorsed by Winbond.


Re: Re: problem with spark thrift server

2015-04-23 Thread guoqing0...@yahoo.com.hk
Thanks for your reply , i would like to use Spark Thriftserver as JDBC SQL 
interface and the Spark application running on YARN . but the application was 
FINISHED when the Thriftserver crashed , all the cached table was lost .

Thriftserver start command:
start-thriftserver.sh --master yarn --executor-memory 20480m --executor-cores 2 
 --num-executors 20 --queue spark

My question is whether the Thriftserver has anyother more stable mode on YARN , 
like active standby in the Thriftserver . 
Really appreciate for any suggestions and idea .
Thanks.
 
From: Arush Kharbanda
Date: 2015-04-23 18:40
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: problem with spark thrift server
Hi

What do you mean disable the driver? what are you trying to achieve.

Thanks
Arush

On Thu, Apr 23, 2015 at 12:29 PM, guoqing0...@yahoo.com.hk 
 wrote:
Hi , 
I have a question about spark thrift server , i deployed the spark on yarn  and 
found if the spark driver disable , the spark application will be crashed on 
yarn.  appreciate for any suggestions and idea . 

Thank you! 



-- 
Arush Kharbanda || Technical Teamlead
ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Spark SQL - Setting YARN Classpath for primordial class loader

2015-04-23 Thread Marcelo Vanzin
No, those have to be local paths.

On Thu, Apr 23, 2015 at 6:53 PM, Night Wolf  wrote:
> Thanks Marcelo, can this be a path on HDFS?
>
> On Fri, Apr 24, 2015 at 11:52 AM, Marcelo Vanzin 
> wrote:
>>
>> You'd have to use spark.{driver,executor}.extraClassPath to modify the
>> system class loader. But that also means you have to manually
>> distribute the jar to the nodes in your cluster, into a common
>> location.
>>
>> On Thu, Apr 23, 2015 at 6:38 PM, Night Wolf 
>> wrote:
>> > Hi guys,
>> >
>> > Having a problem build a DataFrame in Spark SQL from a JDBC data source
>> > when
>> > running with --master yarn-client and adding the JDBC driver JAR with
>> > --jars. If I run with a local[*] master all works fine.
>> >
>> > ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client
>> >
>> > sqlContext.load("jdbc", Map("url" ->
>> > "jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd", "driver" ->
>> > "com.mysql.jdbc.Driver", "dbtable" -> "MY_TBL”))
>> >
>> >
>> > This throws a class not found exception when running with Spark SQL. But
>> > when trying to get the driver class on the workers or driver the class
>> > is
>> > found no problems. So I'm guessing this is some problem with the
>> > primordial
>> > class loader/Java security in the DriverManager and the class loader
>> > used in
>> > Spark SQL when running on YARN.
>> >
>> > Any ideas? The only thing I have found that works is merging my mysql
>> > adbc
>> > driver into the Spark assembly JAR thats shipped to YARN. Because adding
>> > with --jars doesn't work.
>> >
>> > Cheers!
>>
>>
>>
>> --
>> Marcelo
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL - Setting YARN Classpath for primordial class loader

2015-04-23 Thread Night Wolf
Thanks Marcelo, can this be a path on HDFS?

On Fri, Apr 24, 2015 at 11:52 AM, Marcelo Vanzin 
wrote:

> You'd have to use spark.{driver,executor}.extraClassPath to modify the
> system class loader. But that also means you have to manually
> distribute the jar to the nodes in your cluster, into a common
> location.
>
> On Thu, Apr 23, 2015 at 6:38 PM, Night Wolf 
> wrote:
> > Hi guys,
> >
> > Having a problem build a DataFrame in Spark SQL from a JDBC data source
> when
> > running with --master yarn-client and adding the JDBC driver JAR with
> > --jars. If I run with a local[*] master all works fine.
> >
> > ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client
> >
> > sqlContext.load("jdbc", Map("url" ->
> > "jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd", "driver" ->
> > "com.mysql.jdbc.Driver", "dbtable" -> "MY_TBL”))
> >
> >
> > This throws a class not found exception when running with Spark SQL. But
> > when trying to get the driver class on the workers or driver the class is
> > found no problems. So I'm guessing this is some problem with the
> primordial
> > class loader/Java security in the DriverManager and the class loader
> used in
> > Spark SQL when running on YARN.
> >
> > Any ideas? The only thing I have found that works is merging my mysql
> adbc
> > driver into the Spark assembly JAR thats shipped to YARN. Because adding
> > with --jars doesn't work.
> >
> > Cheers!
>
>
>
> --
> Marcelo
>


Re: Spark SQL - Setting YARN Classpath for primordial class loader

2015-04-23 Thread Marcelo Vanzin
You'd have to use spark.{driver,executor}.extraClassPath to modify the
system class loader. But that also means you have to manually
distribute the jar to the nodes in your cluster, into a common
location.

On Thu, Apr 23, 2015 at 6:38 PM, Night Wolf  wrote:
> Hi guys,
>
> Having a problem build a DataFrame in Spark SQL from a JDBC data source when
> running with --master yarn-client and adding the JDBC driver JAR with
> --jars. If I run with a local[*] master all works fine.
>
> ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client
>
> sqlContext.load("jdbc", Map("url" ->
> "jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd", "driver" ->
> "com.mysql.jdbc.Driver", "dbtable" -> "MY_TBL”))
>
>
> This throws a class not found exception when running with Spark SQL. But
> when trying to get the driver class on the workers or driver the class is
> found no problems. So I'm guessing this is some problem with the primordial
> class loader/Java security in the DriverManager and the class loader used in
> Spark SQL when running on YARN.
>
> Any ideas? The only thing I have found that works is merging my mysql adbc
> driver into the Spark assembly JAR thats shipped to YARN. Because adding
> with --jars doesn't work.
>
> Cheers!



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL - Setting YARN Classpath for primordial class loader

2015-04-23 Thread Night Wolf
Hi guys,

Having a problem build a DataFrame in Spark SQL from a JDBC data source
when running with --master yarn-client and adding the JDBC driver JAR with
--jars. If I run with a local[*] master all works fine.

./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client

sqlContext.load("jdbc", Map("url" ->
"jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd", "driver" ->
"com.mysql.jdbc.Driver", "dbtable" -> "MY_TBL”))


This throws a class not found exception when running with Spark SQL. But
when trying to get the driver class on the workers or driver the class is
found no problems. So I'm guessing this is some problem with the primordial
class loader/Java security in the DriverManager and the class loader used
in Spark SQL when running on YARN.

Any ideas? The only thing I have found that works is merging my mysql adbc
driver into the Spark assembly JAR thats shipped to YARN. Because adding
with --jars doesn't work.

Cheers!


Re: gridsearch - python

2015-04-23 Thread Punyashloka Biswal
https://issues.apache.org/jira/browse/SPARK-7022.

Punya

On Thu, Apr 23, 2015 at 5:47 PM Pagliari, Roberto 
wrote:

> Can anybody point me to an example, if available, about gridsearch with
> python?
>
>
>
> Thank you,
>
>
>


Re: Understanding Spark/MLlib failures

2015-04-23 Thread Reza Zadeh
Hi Andrew,

The .principalComponents feature of RowMatrix is currently constrained to
tall and skinny matrices. Your matrix is barely above the skinny
requirement (10k columns), though the number of rows is fine.

What are you looking to do with the principal components? If unnormalized
PCA is OK for your application, you can instead run RowMatrix.computeSVD,
and use the 'V' matrix, which can be used the same way as the principal
components. The computeSVD method can handle square matrices, so it should
be able to handle your matrix.

Reza

On Thu, Apr 23, 2015 at 4:11 PM, aleverentz  wrote:

> [My apologies if this is a re-post.  I wasn't subscribed the first time I
> sent this message, and I'm hoping this second message will get through.]
>
> I’ve been using Spark 1.3.0 and MLlib for some machine learning tasks.  In
> a
> fit of blind optimism, I decided to try running MLlib’s Principal
> Components
> Analayis (PCA) on a dataset with approximately 10,000 columns and 200,000
> rows.
>
> The Spark job has been running for about 5 hours on a small cluster, and it
> has been stuck on a particular job ("treeAggregate at RowMatrix.scala:119")
> for most of that time.  The treeAggregate job is now on "retry 5", and
> after
> each failure it seems that the next retry uses a smaller number of tasks.
> (Initially, there were around 80 tasks; later it was down to 50, then 42;
> now it’s down to 16.)  The web UI shows the following error under "failed
> stages":  "org.apache.spark.shuffle.MetadataFetchFailedException: Missing
> an
> output location for shuffle 1".
>
> This raises a few questions:
>
> 1. What does "missing an output location for shuffle 1" mean?  I’m guessing
> this cryptic error message is indicative of some more fundamental problem
> (out of memory? out of disk space?), but I’m not sure how to diagnose it.
>
> 2. Why do subsequent retries use fewer and fewer tasks?  Does this mean
> that
> the algorithm is actually making progress?  Or is the scheduler just
> performing some kind of repartitioning and starting over from scratch?
> (Also, If the algorithm is in fact making progress, should I expect it to
> finish eventually?  Or do repeated failures generally indicate that the
> cluster is too small to perform the given task?)
>
> 3. Is it reasonable to expect that I could get PCA to run on this dataset
> using the same cluster simply by changing some configuration parameters?
> Or
> is a larger cluster with significantly more resources per node the only way
> around this problem?
>
> 4. In general, are there any tips for diagnosing performance issues like
> the
> one above?  I've spent some time trying to get a few different algorithms
> to
> scale to larger and larger datasets, and whenever I run into a failure, I'd
> like to be able to identify the bottleneck that is preventing further
> scaling.  Any general advice for doing that kind of detective work would be
> much appreciated.
>
> Thanks,
>
> ~ Andrew
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-MLlib-failures-tp22641.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Understanding Spark/MLlib failures

2015-04-23 Thread Burak Yavuz
Hi Andrew,

I observed similar behavior under high GC pressure, when running ALS. What
happened to me was that, there would be very long Full GC pauses (over 600
seconds at times). These would prevent the executors from sending
heartbeats to the driver. Then the driver would think that the executor
died, so it would kill it. The scheduler would look at the outputs and say:
`org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 1` or `Fetch Failed`, then reschedule the job at a
different executor.

Then these executors would get even more overloaded, causing them to GC
more often, and new jobs would be launched with even smaller tasks. Because
these executors were being killed by the driver, new jobs with the same
name (and less tasks) would be launched. However, it usually led to a
spiral of death, where executors were constantly being killed, and the
stage wasn't being completed, but restarted with different numbers of tasks.

Some configuration parameters that helped me through this process were:

spark.executor.memory  // decrease the executor memory so that Full GC's
take less time, however are more frequent
spark.executor.heartbeatInterval // This I set at 60 for 600 seconds
(10 minute GC!!)
spark.core.connection.ack.wait.timeout // another timeout to set

Hope these parameters help you. I haven't directly answered your questions,
but there are bits and pieces in there that are hopefully helpful.

Best,
Burak


On Thu, Apr 23, 2015 at 4:11 PM, aleverentz  wrote:

> [My apologies if this is a re-post.  I wasn't subscribed the first time I
> sent this message, and I'm hoping this second message will get through.]
>
> I’ve been using Spark 1.3.0 and MLlib for some machine learning tasks.  In
> a
> fit of blind optimism, I decided to try running MLlib’s Principal
> Components
> Analayis (PCA) on a dataset with approximately 10,000 columns and 200,000
> rows.
>
> The Spark job has been running for about 5 hours on a small cluster, and it
> has been stuck on a particular job ("treeAggregate at RowMatrix.scala:119")
> for most of that time.  The treeAggregate job is now on "retry 5", and
> after
> each failure it seems that the next retry uses a smaller number of tasks.
> (Initially, there were around 80 tasks; later it was down to 50, then 42;
> now it’s down to 16.)  The web UI shows the following error under "failed
> stages":  "org.apache.spark.shuffle.MetadataFetchFailedException: Missing
> an
> output location for shuffle 1".
>
> This raises a few questions:
>
> 1. What does "missing an output location for shuffle 1" mean?  I’m guessing
> this cryptic error message is indicative of some more fundamental problem
> (out of memory? out of disk space?), but I’m not sure how to diagnose it.
>
> 2. Why do subsequent retries use fewer and fewer tasks?  Does this mean
> that
> the algorithm is actually making progress?  Or is the scheduler just
> performing some kind of repartitioning and starting over from scratch?
> (Also, If the algorithm is in fact making progress, should I expect it to
> finish eventually?  Or do repeated failures generally indicate that the
> cluster is too small to perform the given task?)
>
> 3. Is it reasonable to expect that I could get PCA to run on this dataset
> using the same cluster simply by changing some configuration parameters?
> Or
> is a larger cluster with significantly more resources per node the only way
> around this problem?
>
> 4. In general, are there any tips for diagnosing performance issues like
> the
> one above?  I've spent some time trying to get a few different algorithms
> to
> scale to larger and larger datasets, and whenever I run into a failure, I'd
> like to be able to identify the bottleneck that is preventing further
> scaling.  Any general advice for doing that kind of detective work would be
> much appreciated.
>
> Thanks,
>
> ~ Andrew
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-MLlib-failures-tp22641.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Understanding Spark/MLlib failures

2015-04-23 Thread aleverentz
[My apologies if this is a re-post.  I wasn't subscribed the first time I
sent this message, and I'm hoping this second message will get through.]

I’ve been using Spark 1.3.0 and MLlib for some machine learning tasks.  In a
fit of blind optimism, I decided to try running MLlib’s Principal Components
Analayis (PCA) on a dataset with approximately 10,000 columns and 200,000
rows.

The Spark job has been running for about 5 hours on a small cluster, and it
has been stuck on a particular job ("treeAggregate at RowMatrix.scala:119")
for most of that time.  The treeAggregate job is now on "retry 5", and after
each failure it seems that the next retry uses a smaller number of tasks. 
(Initially, there were around 80 tasks; later it was down to 50, then 42;
now it’s down to 16.)  The web UI shows the following error under "failed
stages":  "org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1".

This raises a few questions:

1. What does "missing an output location for shuffle 1" mean?  I’m guessing
this cryptic error message is indicative of some more fundamental problem
(out of memory? out of disk space?), but I’m not sure how to diagnose it.

2. Why do subsequent retries use fewer and fewer tasks?  Does this mean that
the algorithm is actually making progress?  Or is the scheduler just
performing some kind of repartitioning and starting over from scratch? 
(Also, If the algorithm is in fact making progress, should I expect it to
finish eventually?  Or do repeated failures generally indicate that the
cluster is too small to perform the given task?)

3. Is it reasonable to expect that I could get PCA to run on this dataset
using the same cluster simply by changing some configuration parameters?  Or
is a larger cluster with significantly more resources per node the only way
around this problem?

4. In general, are there any tips for diagnosing performance issues like the
one above?  I've spent some time trying to get a few different algorithms to
scale to larger and larger datasets, and whenever I run into a failure, I'd
like to be able to identify the bottleneck that is preventing further
scaling.  Any general advice for doing that kind of detective work would be
much appreciated.

Thanks,

~ Andrew






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-MLlib-failures-tp22641.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Corey Nolet
If you return an iterable, you are not tying the API to a compactbuffer.
Someday, the data could be fetched lazily and he API would not have to
change.
On Apr 23, 2015 6:59 PM, "Dean Wampler"  wrote:

> I wasn't involved in this decision ("I just make the fries"), but
> CompactBuffer is designed for relatively small data sets that at least fit
> in memory. It's more or less an Array. In principle, returning an iterator
> could hide the actual data structure that might be needed to hold a much
> bigger data set, if necessary.
>
> HOWEVER, it actually returns a CompactBuffer.
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444
>
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren  wrote:
>
>> Should I repost this to dev list ?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Dean Wampler
I wasn't involved in this decision ("I just make the fries"), but
CompactBuffer is designed for relatively small data sets that at least fit
in memory. It's more or less an Array. In principle, returning an iterator
could hide the actual data structure that might be needed to hold a much
bigger data set, if necessary.

HOWEVER, it actually returns a CompactBuffer.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren  wrote:

> Should I repost this to dev list ?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Koert Kuipers
because CompactBuffer is considered an implementation detail. It is also
not public for the same reason.

On Thu, Apr 23, 2015 at 6:46 PM, Hao Ren  wrote:

> Should I repost this to dev list ?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Hao Ren
Should I repost this to dev list ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: problem writing to s3

2015-04-23 Thread Daniel Mahler
Hi Akhil

I can confirm that the problem goes away when jsonRaw and jsonClean are in
different s3 buckets.

thanks
Daniel

On Thu, Apr 23, 2015 at 1:27 AM, Akhil Das 
wrote:

> Can you try writing to a different S3 bucket and confirm that?
>
> Thanks
> Best Regards
>
> On Thu, Apr 23, 2015 at 12:11 AM, Daniel Mahler  wrote:
>
>> Hi Akhil,
>>
>> It works fine when outprefix is a hdfs:///localhost/... url.
>>
>> It looks to me as if there is something about spark writing to the same
>> s3 bucket it is reading from.
>>
>> That is the only real difference between the 2 saveAsTextFile whet
>> outprefix is on s3,
>> inpath is also on s3 but in a different bucket, but jsonRaw and jsonClean
>> are distinct directories in the same bucket.
>> I do know know why that should be a problem though.
>>
>> I will rerun using s3 paths and send the log information.
>>
>> thanks
>> Daniel
>>
>> thanks
>> Daniel
>>
>> On Wed, Apr 22, 2015 at 1:45 AM, Akhil Das 
>> wrote:
>>
>>> Can you look in your worker logs and see whats happening in there? Are
>>> you able to write the same to your HDFS?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, Apr 22, 2015 at 4:45 AM, Daniel Mahler 
>>> wrote:
>>>
 I am having a strange problem writing to s3 that I have distilled to
 this minimal example:

 def jsonRaw = s"${outprefix}-json-raw"
 def jsonClean = s"${outprefix}-json-clean"

 val txt = sc.textFile(inpath)//.coalesce(shards, false)
 txt.count

 val res = txt.saveAsTextFile(jsonRaw)

 val txt2 = sc.textFile(jsonRaw +"/part-*")
 txt2.count

 txt2.saveAsTextFile(jsonClean)

 This code should simply copy files from inpath to jsonRaw and then from
 jsonRaw to jsonClean.
 This code executes all the way down to the last line where it hangs
 after creating the output directory contatining a _temporary_$folder but no
 actual files not even temporary ones.

 `outputprefix` is and  bucket url, both jsonRaw and jsonClean are in
 the same bucket.
 Both calls .count succeed and return the same number. This means Spark
 can read from inpath and can both read from and write to jsonRaw. Since
 jsonClean is in the same bucket as jsonRaw and the final line does create
 the directory, I cannot think of any reason why the files should  not be
 written. If there were any access or url problems they should already
 manifest when writing jsonRaw.

 This problem is completely reproduceable with Spark 1.2.1 and 1.3.1
 The console output from the last line is

 scala> txt0.saveAsTextFile(jsonClean)
 15/04/21 22:55:48 INFO storage.BlockManager: Removing broadcast 3
 15/04/21 22:55:48 INFO storage.BlockManager: Removing block
 broadcast_3_piece0
 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3_piece0 of
 size 2024 dropped from memory (free 278251716)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-51-181-81.ec2.internal:45199 in memory (size:
 2024.0 B, free: 265.4 MB)
 15/04/21 22:55:48 INFO storage.BlockManagerMaster: Updated info of
 block broadcast_3_piece0
 15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3
 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3 of size
 2728 dropped from memory (free 27825)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-166-129-153.ec2.internal:46671 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-51-153-34.ec2.internal:51691 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-158-142-155.ec2.internal:54690 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-61-144-7.ec2.internal:44849 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-69-77-180.ec2.internal:42417 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO spark.ContextCleaner: Cleaned broadcast 3
 15/04/21 22:55:49 INFO spark.SparkContext: Starting job: saveAsTextFile
 at :38
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Got job 2
 (saveAsTextFile at :38) with 96 output partitions
 (allowLocal=false)
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Final stage: Stage
 2(saveAsTextFile at :38)
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Parents of final stage:
 List()
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Missing parents: List()
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting Stage 2
 (MapPartitionsRDD[5] at saveAsTextFile at :38), which has no
 missing parents
 15/04/21 22:55:49 INFO 

gridsearch - python

2015-04-23 Thread Pagliari, Roberto
Can anybody point me to an example, if available, about gridsearch with python?

Thank you,



Getting error running MLlib example with new cluster

2015-04-23 Thread Su She
Sorry, accidentally sent the last email before finishing.

I had asked this question before, but wanted to ask again as I think
it is now related to my pom file or project setup. Really appreciate the help!

I have been trying on/off for the past month to try to run this MLlib
example: 
https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala

I am able to build the project successfully. When I run it, it returns:

features in spam: 8
features in ham: 7

and then freezes. According to the UI, the description of the job is
"count at DataValidators.scala.38. This corresponds to this line in
the code:

val model = lrLearner.run(trainingData)

I've tried just about everything I can think of...changed numFeatures
from 1 -> 10,000, set executor memory to 1g, set up a new cluster, at
this point I think I might have missed dependencies as that has
usually been the problem in other spark apps I have tried to run. This
is my pom file, that I have used for other successful spark apps.
Please let me know if you think I need any additional dependencies or
there are incompatibility issues, or a pom.xml that is better to use.
Thank you!

Cluster information:

Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0)
java version "1.7.0_25"
Scala version: 2.10.4
hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0)



http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://w3.org/2001/XMLSchema-instance"; xsi:schemaLocation
="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd";>
 edu.berkely
 simple-project 
 4.0.0
 Simple Project 
 jar 
 1.0 


cloudera
 http://repository.cloudera.com/artifactory/cloudera-repos/



scala-tools.org
Scala-tools Maven2 Repository
http://scala-tools.org/repo-releases






scala-tools.org
Scala-tools Maven2 Repository
http://scala-tools.org/repo-releases






org.scala-tools
maven-scala-plugin



compile

compile

compile


test-compile

testCompile

test-compile


   process-resources
   
 compile
   




maven-compiler-plugin

1.7
1.7







 
 org.apache.spark
spark-core_2.10
1.2.0-cdh5.3.0



org.apache.hadoop
hadoop-client
2.5.0-mr1-cdh5.3.0



org.scala-lang
scala-library
2.10.4



org.scala-lang
scala-compiler
2.10.4



com.101tec
zkclient
0.3


 
 com.yammer.metrics
 metrics-core
 2.2.0
 



org.apache.hadoop
hadoop-yarn-server-web-proxy
2.5.0



org.apache.thrift
libthrift
0.9.2



com.google.guava
guava
18.0


 
junit
junit
3.8.1
test



org.apache.spark
spark-mllib_2.10
1.2.0



org.scalanlp
breeze-math_2.10
0.4



com.googlecode.netlib-java
netlib-java
1.0



org.jblas
jblas
1.2.3






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pyspark where do third parties libraries need to be installed under Yarn-client mode

2015-04-23 Thread dusts66
I am trying to figure out python library management.  So my question is: 
Where do third party Python libraries(ex. numpy, scipy, etc.) need to exist
if I running a spark job via 'spark-submit' against my cluster in 'yarn
client' mode.  Do the libraries need to only exist on the client(ie. the
server executing the driver code) or do the libraries need to exist on the
datanode/worker nodes where the tasks are executed?  The documentation seems
to indicate that under 'yarn client' the libraries are only need on the
client machine not the entire cluster.  If the libraries are needed across
all cluster machines, any suggestions on a deployment strategy or dependency
management model that works well?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-where-do-third-parties-libraries-need-to-be-installed-under-Yarn-client-mode-tp22639.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting error running MLlib example with new cluster

2015-04-23 Thread Su She
I had asked this question before, but wanted to ask again as I think
it is related to my pom file or project setup.

I have been trying on/off for the past month to try to run this MLlib example:

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Question regarding join with multiple columns with pyspark

2015-04-23 Thread Ali Bajwa
Hi experts,

Sorry if this is a n00b question or has already been answered...

Am trying to use the data frames API in python to join 2 dataframes
with more than 1 column. The example I've seen in the documentation
only shows a single column - so I tried this:

Example code

import pandas as pd
from pyspark.sql import SQLContext
hc = SQLContext(sc)
A = pd.DataFrame({'year': ['1993', '2005', '1994'], 'month': ['5',
'12', '12'], 'value': [100, 200, 300]})
a = hc.createDataFrame(A)
B = pd.DataFrame({'year': ['1993', '1993'], 'month': ['12', '12'],
'value': [101, 102]})
b = hc.createDataFrame(B)

print "Pandas"  # try with Pandas
print A
print B
print pd.merge(A, B, on=['year', 'month'], how='inner')

print "Spark"
print a.toPandas()
print b.toPandas()
print a.join(b, a.year==b.year and a.month==b.month, 'inner').toPandas()


*Output

Pandas
  month  value  year
0 5100  1993
112200  2005
212300  1994

  month  value  year
012101  1993
112102  1993

Empty DataFrame

Columns: [month, value_x, year, value_y]

Index: []

Spark
  month  value  year
0 5100  1993
112200  2005
212300  1994

  month  value  year
012101  1993
112102  1993

 month  value  year month  value  year
012200  200512102  1993
112200  200512101  1993
212300  199412102  1993
312300  199412101  1993

It looks like Spark returns some results where an inner join should
return nothing.

Am I doing the join with two columns in the wrong way? If yes, what is
the right syntax for this?

Thanks!
Ali

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Non-Deterministic Graph Building

2015-04-23 Thread hokiegeek2
Hi Everyone,

I am running into a really weird problem that only one other person has
reported to the best of my knowledge (and the thread never yielded a
resolution).  I build a GraphX Graph from an input EdgeRDD and VertexRDD via
the Graph(VertexRDD,EdgeRDD) constructor. When I execute Graph.triplets on
the Graph I get wildly varying results where the triplet source and
destination vertex data are inconsistent between runs and rarely, if ever,
match what I would expect from the input edge pairs that are used to
generate the VertexRDD and EdgeRDDs.

Here's what I know for sure:

1. Consistency of Input Edge Data--I read the edges in from HBase and
generate a "raw edge RDD" containing tuples consisting of a source edge name
and destination edge name. I've written this RDD out to HDFS over several
runs and confirmed that generation of the raw edge RDD is deterministic.

2. Consistency of Edge and Vertex Count--the overall numbers of edges and
vertices in the EdgeRDD and VertexRDD, respectively, are consistent between
jobs.

3. Inconsistency of Triplet Data--the output from Graph.triplets varies
between jobs, where the edge pairings are different.

4. Disconnect Between Input Edge Data and Triplets--the input edge data
often does not match the corresponding triplet data for the same job, but in
some cases will.  Interestingly, while the actual edge pairings as seen in
the input edge data RDD and the triplets often don't match, the total number
of edges in the input edge RDD and triplets RDD for each edge name is the
same.

Based upon what I've seen, it seems as if the vertex ids are skewed somehow,
especially given point (4) where I noted that the total number of
appearances of an edge name is consistent between input edge RDD data and
triplet RDD data for the same job but, again, the pairings with edges on the
other end of the relationship can vary.

I will post my code later tonight/tomorrow AM, but wanted to see if this
problem description matches what anyone else has seen.

Thanks

--John



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Non-Deterministic-Graph-Building-tp22638.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread Tathagata Das
What was the state of your streaming application? Was it falling behind
with a large increasing scheduling delay?

TD

On Thu, Apr 23, 2015 at 11:31 AM, N B  wrote:

> Thanks for the response, Conor. I tried with those settings and for a
> while it seemed like it was cleaning up shuffle files after itself.
> However, after exactly 5 hours later it started throwing exceptions and
> eventually stopped working again. A sample stack trace is below. What is
> curious about 5 hours is that I set the cleaner ttl to 5 hours after
> changing the max window size to 1 hour (down from 6 hours in order to
> test). It also stopped cleaning the shuffle files after this started
> happening.
>
> Any idea why this could be happening?
>
> 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
> Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
> java.lang.Exception: Could not compute split, block input-0-1429706099000
> not found
> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Thanks
> NB
>
>
> On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell <
> conor.fenn...@altocloud.com> wrote:
>
>> Hi,
>>
>>
>> We set the spark.cleaner.ttl to some reasonable time and also
>> set spark.streaming.unpersist=true.
>>
>>
>> Those together cleaned up the shuffle files for us.
>>
>>
>> -Conor
>>
>> On Tue, Apr 21, 2015 at 8:18 AM, N B  wrote:
>>
>>> We already do have a cron job in place to clean just the shuffle files.
>>> However, what I would really like to know is whether there is a "proper"
>>> way of telling spark to clean up these files once its done with them?
>>>
>>> Thanks
>>> NB
>>>
>>>
>>> On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele <
>>> gangele...@gmail.com> wrote:
>>>
 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B  wrote:

> Hi all,
>
> I had posed this query as part of a different thread but did not get a
> response there. So creating a new thread hoping to catch someone's
> attention.
>
> We are experiencing this issue of shuffle files being left behind and
> not being cleaned up by Spark. Since this is a Spark streaming 
> application,
> it is expected to stay up indefinitely, so shuffle files not being cleaned
> up is a big problem right now. Our max window size is 6 hours, so we have
> set up a cron job to clean up shuffle files older than 12 hours otherwise
> it will eat up all our disk space.
>
> Please see the following. It seems the non-cleaning of shuffle files
> is being documented in 1.3.1.
>
> https://github.com/apache/spark/pull/5074/files
> https://issues.apache.org/jira/browse/SPARK-5836
>
>
> Also, for some reason, the following JIRAs that were reported as
> functional issues were closed as Duplicates of the above Documentation 
> bug.
> Does this mean that this issue won't be tackled at all?
>
> https://issues.apache.org/jira/browse/SPARK-3563
> https://issues.apache.org/jira/browse/SPARK-4796
> https://issues.apache.org/jira/browse/SPARK-6011
>
> Any further insight into whether this is being looked into and
> meanwhile how to handle shuffle files will be greatly appreciated.
>
> Thanks
> NB
>
>




>>>
>>
>


RE: Bug? Can't reference to the column by name after join two DataFrame on a same name key

2015-04-23 Thread Shuai Zheng
Got it. Thanks! J

 

 

From: Yin Huai [mailto:yh...@databricks.com] 
Sent: Thursday, April 23, 2015 2:35 PM
To: Shuai Zheng
Cc: user
Subject: Re: Bug? Can't reference to the column by name after join two 
DataFrame on a same name key

 

Hi Shuai,

 

You can use "as" to create a table alias. For example, df1.as("df1"). Then you 
can use $"df1.col" to refer it. 

 

Thanks,

 

Yin

 

On Thu, Apr 23, 2015 at 11:14 AM, Shuai Zheng  wrote:

Hi All,

 

I use 1.3.1

 

When I have two DF and join them on a same name key, after that, I can’t get 
the common key by name.

 

Basically:

select * from t1 inner join t2 on t1.col1 = t2.col1

 

And I am using purely DataFrame, spark SqlContext not HiveContext

 

DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select(col);

 

because df1 and df2 join on the same key col,

 

Then I can't reference the key col. I understand I should use a full qualified 
name for that column (like in SQL, use t1.col), but I don’t know how should I 
address this in spark sql.

 

Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 
'id' is ambiguous, could be: id#8L, id#0L.;

 

It looks that joined key can't be referenced by name or by df1.col name pattern.

The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case, so I 
am not sure whether it is the same issue, but I still have the issue in latest 
code.

 

It looks like the result after join won't keep the parent DF information 
anywhere?

 

I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273

 

But not sure whether  it is the same issue? Should I open a new ticket for this?

 

Regards,

 

Shuai

 

 



Re: Bug? Can't reference to the column by name after join two DataFrame on a same name key

2015-04-23 Thread Yin Huai
Hi Shuai,

You can use "as" to create a table alias. For example, df1.as("df1"). Then
you can use $"df1.col" to refer it.

Thanks,

Yin

On Thu, Apr 23, 2015 at 11:14 AM, Shuai Zheng  wrote:

> Hi All,
>
>
>
> I use 1.3.1
>
>
>
> When I have two DF and join them on a same name key, after that, I can’t
> get the common key by name.
>
>
>
> Basically:
>
> select * from t1 inner join t2 on t1.col1 = t2.col1
>
>
>
> And I am using purely DataFrame, spark SqlContext not HiveContext
>
>
>
> DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select(
> *col*);
>
>
>
> because df1 and df2 join on the same key col,
>
>
>
> Then I can't reference the key col. I understand I should use a full
> qualified name for that column (like in SQL, use t1.col), but I don’t know
> how should I address this in spark sql.
>
>
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> Reference 'id' is ambiguous, could be: id#8L, id#0L.;
>
>
>
> It looks that joined key can't be referenced by name or by df1.col name
> pattern.
>
> The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive
> case, so I am not sure whether it is the same issue, but I still have the
> issue in latest code.
>
>
>
> It looks like the result after join won't keep the parent DF information
> anywhere?
>
>
>
> I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273
>
>
>
> But not sure whether  it is the same issue? Should I open a new ticket for
> this?
>
>
>
> Regards,
>
>
>
> Shuai
>
>
>


Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread N B
Thanks for the response, Conor. I tried with those settings and for a while
it seemed like it was cleaning up shuffle files after itself. However,
after exactly 5 hours later it started throwing exceptions and eventually
stopped working again. A sample stack trace is below. What is curious about
5 hours is that I set the cleaner ttl to 5 hours after changing the max
window size to 1 hour (down from 6 hours in order to test). It also stopped
cleaning the shuffle files after this started happening.

Any idea why this could be happening?

2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
java.lang.Exception: Could not compute split, block input-0-1429706099000
not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Thanks
NB


On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell 
wrote:

> Hi,
>
>
> We set the spark.cleaner.ttl to some reasonable time and also
> set spark.streaming.unpersist=true.
>
>
> Those together cleaned up the shuffle files for us.
>
>
> -Conor
>
> On Tue, Apr 21, 2015 at 8:18 AM, N B  wrote:
>
>> We already do have a cron job in place to clean just the shuffle files.
>> However, what I would really like to know is whether there is a "proper"
>> way of telling spark to clean up these files once its done with them?
>>
>> Thanks
>> NB
>>
>>
>> On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele > > wrote:
>>
>>> Write a crone job for this like below
>>>
>>> 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
>>> 32 * * * *  find /tmp -type d -cmin +1440 -name "spark-*-*-*" -prune
>>> -exec rm -rf {} \+
>>> 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
>>> +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+
>>>
>>>
>>> On 20 April 2015 at 23:12, N B  wrote:
>>>
 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and
 not being cleaned up by Spark. Since this is a Spark streaming application,
 it is expected to stay up indefinitely, so shuffle files not being cleaned
 up is a big problem right now. Our max window size is 6 hours, so we have
 set up a cron job to clean up shuffle files older than 12 hours otherwise
 it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and
 meanwhile how to handle shuffle files will be greatly appreciated.

 Thanks
 NB


>>>
>>>
>>>
>>>
>>
>


Bug? Can't reference to the column by name after join two DataFrame on a same name key

2015-04-23 Thread Shuai Zheng
Hi All,

 

I use 1.3.1

 

When I have two DF and join them on a same name key, after that, I can't get
the common key by name.

 

Basically:

select * from t1 inner join t2 on t1.col1 = t2.col1

 

And I am using purely DataFrame, spark SqlContext not HiveContext

 

DataFrame df3 = df1.join(df2,
df1.col(col).equalTo(df2.col(col))).select(col);

 

because df1 and df2 join on the same key col,

 

Then I can't reference the key col. I understand I should use a full
qualified name for that column (like in SQL, use t1.col), but I don't know
how should I address this in spark sql.

 

Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference
'id' is ambiguous, could be: id#8L, id#0L.;

 

It looks that joined key can't be referenced by name or by df1.col name
pattern.

The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case,
so I am not sure whether it is the same issue, but I still have the issue in
latest code.

 

It looks like the result after join won't keep the parent DF information
anywhere?

 

I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273

 

But not sure whether  it is the same issue? Should I open a new ticket for
this?

 

Regards,

 

Shuai

 



Re: Slower performance when bigger memory?

2015-04-23 Thread Ted Yu
Shuai:
Please take a look at:

http://blog.takipi.com/garbage-collectors-serial-vs-parallel-vs-cms-vs-the-g1-and-whats-new-in-java-8/



> On Apr 23, 2015, at 10:18 AM, Dean Wampler  wrote:
> 
> JVM's often have significant GC overhead with heaps bigger than 64GB. You 
> might try your experiments with configurations below this threshold.
> 
> dean
> 
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition (O'Reilly)
> Typesafe
> @deanwampler
> http://polyglotprogramming.com
> 
>> On Thu, Apr 23, 2015 at 12:14 PM, Shuai Zheng  wrote:
>> Hi All,
>> 
>>  
>> 
>> I am running some benchmark on r3*8xlarge instance. I have a cluster with 
>> one master (no executor on it) and one slave (r3*8xlarge).
>> 
>>  
>> 
>> My job has 1000 tasks in stage 0.
>> 
>>  
>> 
>> R3*8xlarge has 244G memory and 32 cores.
>> 
>>  
>> 
>> If I create 4 executors, each has 8 core+50G memory, each task will take 
>> around 320s-380s. And if I only use one big executor with 32 cores and 200G 
>> memory, each task will take 760s-900s.
>> 
>>  
>> 
>> And I check the log, looks like the minor GC takes much longer when using 
>> 200G memory:
>> 
>>  
>> 
>> 285.242: [GC [PSYoungGen: 29027310K->8646087K(31119872K)] 
>> 38810417K->19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 
>> sys=120.65, real=11.25 secs]
>> 
>>  
>> 
>> And when it uses 50G memory, the minor GC takes only less than 1s.
>> 
>>  
>> 
>> I try to see what is the best way to configure the Spark. For some special 
>> reason, I tempt to use a bigger memory on single executor if no significant 
>> penalty on performance. But now looks like it is?
>> 
>>  
>> 
>> Anyone has any idea?
>> 
>>  
>> 
>> Regards,
>> 
>>  
>> 
>> Shuai
>> 
> 


Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Argh, I looked and there really isn’t that much data yet. There will be 
thousands but starting small.

I bet this is just a total data size not requiring all workers thing—sorry, 
nevermind.


On Apr 23, 2015, at 10:30 AM, Pat Ferrel  wrote:

They are in HDFS so available on all workers

On Apr 23, 2015, at 10:29 AM, Pat Ferrel  wrote:

Physically? Not sure, they were written using the nano-batch rdds in a 
streaming job that is in a separate driver. The job is a Kafka consumer. 

Would that effect all derived rdds? If so is there something I can do to mix it 
up or does Spark know best about execution speed here?


On Apr 23, 2015, at 10:23 AM, Sean Owen  wrote:

Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel  wrote:
> Sure
> 
>  var columns = mc.textFile(source).map { line => line.split(delimiter) }
> 
> Here “source” is a comma delimited list of files or directories. Both the
> textFile and .map tasks happen only on the machine they were launched from.
> 
> Later other distributed operations happen but I suspect if I can figure out
> why the fist line is run only on the client machine the rest will clear up
> too. Here are some subsequent lines.
> 
>  if(filterColumn != -1) {
>columns = columns.filter { tokens => tokens(filterColumn) == filterBy
> }
>  }
> 
>  val interactions = columns.map { tokens =>
>tokens(rowIDColumn) -> tokens(columnIDPosition)
>  }
> 
>  interactions.cache()
> 
> On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele 
> wrote:
> 
> Will you be able to paste code here?
> 
> On 23 April 2015 at 22:21, Pat Ferrel  wrote:
>> 
>> Using Spark streaming to create a large volume of small nano-batch input
>> files, ~4k per file, thousands of ‘part-x’ files.  When reading the
>> nano-batch files and doing a distributed calculation my tasks run only on
>> the machine where it was launched. I’m launching in “yarn-client” mode. The
>> rdd is created using sc.textFile(“list of thousand files”)
>> 
>> What would cause the read to occur only on the machine that launched the
>> driver.
>> 
>> Do I need to do something to the RDD after reading? Has some partition
>> factor been applied to all derived rdds?
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 
> 
> 
> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dynamicAllocation & spark-shell

2015-04-23 Thread Cheolsoo Park
Hi,

>> Attempted to request a negative number of executor(s) -663 from the
cluster manager. Please specify a positive number!

This is a bug in dynamic allocation. Here is the jira-
https://issues.apache.org/jira/browse/SPARK-6954

Thanks!
Cheolsoo

On Thu, Apr 23, 2015 at 7:57 AM, Michael Stone  wrote:

> If I enable dynamicAllocation and then use spark-shell or pyspark, things
> start out working as expected: running simple commands causes new executors
> to start and complete tasks. If the shell is left idle for a while,
> executors start getting killed off:
>
> 15/04/23 10:52:43 INFO cluster.YarnClientSchedulerBackend: Requesting to
> kill executor(s) 368
> 15/04/23 10:52:43 INFO spark.ExecutorAllocationManager: Removing executor
> 368 because it has been idle for 600 seconds (new desired total will be 665)
>
> That makes sense. But the action also results in error messages:
>
> 15/04/23 10:52:47 ERROR cluster.YarnScheduler: Lost executor 368 on
> hostname: remote Akka client disassociated
> 15/04/23 10:52:47 INFO scheduler.DAGScheduler: Executor lost: 368 (epoch 0)
> 15/04/23 10:52:47 INFO spark.ExecutorAllocationManager: Existing executor
> 368 has been removed (new total is 665)
> 15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Trying to remove
> executor 368 from BlockManagerMaster.
> 15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Removing block
> manager BlockManagerId(368, hostname, 35877)
> 15/04/23 10:52:47 INFO storage.BlockManagerMaster: Removed 368
> successfully in removeExecutor
>
> After that, trying to run a simple command results in:
>
> 15/04/23 10:13:30 ERROR util.Utils: Uncaught exception in thread
> spark-dynamic-executor-allocation-0
> java.lang.IllegalArgumentException: Attempted to request a negative number
> of executor(s) -663 from the cluster manager. Please specify a positive
> number!
>
> And then only the single remaining executor attempts to complete the new
> tasks. Am I missing some kind of simple configuration item, are other
> people seeing the same behavior as a bug, or is this actually expected?
>
> Mike Stone
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
They are in HDFS so available on all workers

On Apr 23, 2015, at 10:29 AM, Pat Ferrel  wrote:

Physically? Not sure, they were written using the nano-batch rdds in a 
streaming job that is in a separate driver. The job is a Kafka consumer. 

Would that effect all derived rdds? If so is there something I can do to mix it 
up or does Spark know best about execution speed here?


On Apr 23, 2015, at 10:23 AM, Sean Owen  wrote:

Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel  wrote:
> Sure
> 
>   var columns = mc.textFile(source).map { line => line.split(delimiter) }
> 
> Here “source” is a comma delimited list of files or directories. Both the
> textFile and .map tasks happen only on the machine they were launched from.
> 
> Later other distributed operations happen but I suspect if I can figure out
> why the fist line is run only on the client machine the rest will clear up
> too. Here are some subsequent lines.
> 
>   if(filterColumn != -1) {
> columns = columns.filter { tokens => tokens(filterColumn) == filterBy
> }
>   }
> 
>   val interactions = columns.map { tokens =>
> tokens(rowIDColumn) -> tokens(columnIDPosition)
>   }
> 
>   interactions.cache()
> 
> On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele 
> wrote:
> 
> Will you be able to paste code here?
> 
> On 23 April 2015 at 22:21, Pat Ferrel  wrote:
>> 
>> Using Spark streaming to create a large volume of small nano-batch input
>> files, ~4k per file, thousands of ‘part-x’ files.  When reading the
>> nano-batch files and doing a distributed calculation my tasks run only on
>> the machine where it was launched. I’m launching in “yarn-client” mode. The
>> rdd is created using sc.textFile(“list of thousand files”)
>> 
>> What would cause the read to occur only on the machine that launched the
>> driver.
>> 
>> Do I need to do something to the RDD after reading? Has some partition
>> factor been applied to all derived rdds?
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 
> 
> 
> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Physically? Not sure, they were written using the nano-batch rdds in a 
streaming job that is in a separate driver. The job is a Kafka consumer. 

Would that effect all derived rdds? If so is there something I can do to mix it 
up or does Spark know best about execution speed here?


On Apr 23, 2015, at 10:23 AM, Sean Owen  wrote:

Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel  wrote:
> Sure
> 
>var columns = mc.textFile(source).map { line => line.split(delimiter) }
> 
> Here “source” is a comma delimited list of files or directories. Both the
> textFile and .map tasks happen only on the machine they were launched from.
> 
> Later other distributed operations happen but I suspect if I can figure out
> why the fist line is run only on the client machine the rest will clear up
> too. Here are some subsequent lines.
> 
>if(filterColumn != -1) {
>  columns = columns.filter { tokens => tokens(filterColumn) == filterBy
> }
>}
> 
>val interactions = columns.map { tokens =>
>  tokens(rowIDColumn) -> tokens(columnIDPosition)
>}
> 
>interactions.cache()
> 
> On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele 
> wrote:
> 
> Will you be able to paste code here?
> 
> On 23 April 2015 at 22:21, Pat Ferrel  wrote:
>> 
>> Using Spark streaming to create a large volume of small nano-batch input
>> files, ~4k per file, thousands of ‘part-x’ files.  When reading the
>> nano-batch files and doing a distributed calculation my tasks run only on
>> the machine where it was launched. I’m launching in “yarn-client” mode. The
>> rdd is created using sc.textFile(“list of thousand files”)
>> 
>> What would cause the read to occur only on the machine that launched the
>> driver.
>> 
>> Do I need to do something to the RDD after reading? Has some partition
>> factor been applied to all derived rdds?
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 
> 
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tasks run only on one machine

2015-04-23 Thread Sean Owen
Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel  wrote:
> Sure
>
> var columns = mc.textFile(source).map { line => line.split(delimiter) }
>
> Here “source” is a comma delimited list of files or directories. Both the
> textFile and .map tasks happen only on the machine they were launched from.
>
> Later other distributed operations happen but I suspect if I can figure out
> why the fist line is run only on the client machine the rest will clear up
> too. Here are some subsequent lines.
>
> if(filterColumn != -1) {
>   columns = columns.filter { tokens => tokens(filterColumn) == filterBy
> }
> }
>
> val interactions = columns.map { tokens =>
>   tokens(rowIDColumn) -> tokens(columnIDPosition)
> }
>
> interactions.cache()
>
> On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele 
> wrote:
>
> Will you be able to paste code here?
>
> On 23 April 2015 at 22:21, Pat Ferrel  wrote:
>>
>> Using Spark streaming to create a large volume of small nano-batch input
>> files, ~4k per file, thousands of ‘part-x’ files.  When reading the
>> nano-batch files and doing a distributed calculation my tasks run only on
>> the machine where it was launched. I’m launching in “yarn-client” mode. The
>> rdd is created using sc.textFile(“list of thousand files”)
>>
>> What would cause the read to occur only on the machine that launched the
>> driver.
>>
>> Do I need to do something to the RDD after reading? Has some partition
>> factor been applied to all derived rdds?
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Sure

var columns = mc.textFile(source).map { line => line.split(delimiter) }

Here “source” is a comma delimited list of files or directories. Both the 
textFile and .map tasks happen only on the machine they were launched from.

Later other distributed operations happen but I suspect if I can figure out why 
the fist line is run only on the client machine the rest will clear up too. 
Here are some subsequent lines.

if(filterColumn != -1) {
  columns = columns.filter { tokens => tokens(filterColumn) == filterBy }
}

val interactions = columns.map { tokens =>
  tokens(rowIDColumn) -> tokens(columnIDPosition)
}

interactions.cache()

On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele  wrote:

Will you be able to paste code here?

On 23 April 2015 at 22:21, Pat Ferrel mailto:p...@occamsmachete.com>> wrote:
Using Spark streaming to create a large volume of small nano-batch input files, 
~4k per file, thousands of ‘part-x’ files.  When reading the nano-batch 
files and doing a distributed calculation my tasks run only on the machine 
where it was launched. I’m launching in “yarn-client” mode. The rdd is created 
using sc.textFile(“list of thousand files”)

What would cause the read to occur only on the machine that launched the driver.

Do I need to do something to the RDD after reading? Has some partition factor 
been applied to all derived rdds?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 

For additional commands, e-mail: user-h...@spark.apache.org 








Re: Tasks run only on one machine

2015-04-23 Thread Jeetendra Gangele
Will you be able to paste code here?

On 23 April 2015 at 22:21, Pat Ferrel  wrote:

> Using Spark streaming to create a large volume of small nano-batch input
> files, ~4k per file, thousands of 'part-x' files.  When reading the
> nano-batch files and doing a distributed calculation my tasks run only on
> the machine where it was launched. I'm launching in "yarn-client" mode. The
> rdd is created using sc.textFile("list of thousand files")
>
> What would cause the read to occur only on the machine that launched the
> driver.
>
> Do I need to do something to the RDD after reading? Has some partition
> factor been applied to all derived rdds?
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Slower performance when bigger memory?

2015-04-23 Thread Dean Wampler
JVM's often have significant GC overhead with heaps bigger than 64GB. You
might try your experiments with configurations below this threshold.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Thu, Apr 23, 2015 at 12:14 PM, Shuai Zheng  wrote:

> Hi All,
>
>
>
> I am running some benchmark on r3*8xlarge instance. I have a cluster with
> one master (no executor on it) and one slave (r3*8xlarge).
>
>
>
> My job has 1000 tasks in stage 0.
>
>
>
> R3*8xlarge has 244G memory and 32 cores.
>
>
>
> If I create 4 executors, each has 8 core+50G memory, each task will take
> around 320s-380s. And if I only use one big executor with 32 cores and 200G
> memory, each task will take 760s-900s.
>
>
>
> And I check the log, looks like the minor GC takes much longer when using
> 200G memory:
>
>
>
> 285.242: [GC [PSYoungGen: 29027310K->8646087K(31119872K)]
> 38810417K->19703013K(135977472K), 11.2509770 secs] [Times: user=38.95
> sys=120.65, real=11.25 secs]
>
>
>
> And when it uses 50G memory, the minor GC takes only less than 1s.
>
>
>
> I try to see what is the best way to configure the Spark. For some special
> reason, I tempt to use a bigger memory on single executor if no significant
> penalty on performance. But now looks like it is?
>
>
>
> Anyone has any idea?
>
>
>
> Regards,
>
>
>
> Shuai
>


Re: Map Question

2015-04-23 Thread Vadim Bichutskiy
Thanks Ilya. I am having trouble doing that. Can you give me an example?
ᐧ

On Thu, Apr 23, 2015 at 12:06 PM, Ganelin, Ilya  wrote:

>  You need to expose that variable the same way you'd expose any other
> variable in Python that you wanted to see across modules. As long as you
> share a spark context all will work as expected.
>
>
> http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable
>
>
>
> Sent with Good (www.good.com)
>
>
>
> -Original Message-
> *From: *Vadim Bichutskiy [vadim.bichuts...@gmail.com]
> *Sent: *Thursday, April 23, 2015 12:00 PM Eastern Standard Time
> *To: *Tathagata Das
> *Cc: *user@spark.apache.org
> *Subject: *Re: Map Question
>
> Here it is. How do I access a broadcastVar in a function that's in another
> module (process_stuff.py below):
>
> Thanks,
> Vadim
>
>  main.py
> ---
>
> from pyspark import SparkContext, SparkConf
> from pyspark.streaming import StreamingContext
> from pyspark.sql import SQLContext
> from process_stuff import myfunc
> from metadata import get_metadata
>
> conf = SparkConf().setAppName('My App').setMaster('local[4]')
> sc = SparkContext(conf=conf)
> ssc = StreamingContext(sc, 30)
> sqlContext = SQLContext(sc)
>
> distFile = ssc.textFileStream("s3n://...")
>
> distFile.foreachRDD(process)
>
> mylist = get_metadata()
>
> print 'BROADCASTING...'
> broadcastVar = sc.broadcast(mylist)
> print broadcastVar
> print broadcastVar.value
> print 'FINISHED BROADCASTING...'
>
> ## mylist and broadcastVar, broadcastVar.value print fine
>
> def getSqlContextInstance(sparkContext):
>
> if ('sqlContextSingletonInstance' not in globals()):
> globals()['sqlContextSingletonInstance'] =
> SQLContext(sparkContext)
> return globals()['sqlContextSingletonInstance']
>
> def process(rdd):
>
> sqlContext = getSqlContextInstance(rdd.context)
>
> if rdd.take(1):
>
> jsondf = sqlContext.jsonRDD(rdd)
>
> #jsondf.printSchema()
>
> jsondf.registerTempTable('mytable')
>
> stuff = sqlContext.sql("SELECT ...")
> stuff_mapped = stuff.map(myfunc)  ## I want myfunc to see mylist from
> above?
>
> ...
>
> process_stuff.py
> --
>
> def myfunc(x):
>
> metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW
> TO FIX?
>
> ...
>
>
> metadata.py
> 
>
> def get_metadata():
>
> ...
>
> return mylist
>  ᐧ
>
> On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das 
> wrote:
>
>> Can you give full code? especially the myfunc?
>>
>> On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy <
>> vadim.bichuts...@gmail.com> wrote:
>>
>>> Here's what I did:
>>>
>>>  print 'BROADCASTING...'
>>> broadcastVar = sc.broadcast(mylist)
>>> print broadcastVar
>>> print broadcastVar.value
>>> print 'FINISHED BROADCASTING...'
>>>
>>> The above works fine,
>>>
>>> but when I call myrdd.map(myfunc) I get *NameError: global name
>>> 'broadcastVar' is not defined*
>>>
>>>  The myfunc function is in a different module. How do I make it aware
>>> of broadcastVar?
>>> ᐧ
>>>
>>> On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy <
>>> vadim.bichuts...@gmail.com> wrote:
>>>
 Great. Will try to modify the code. Always room to optimize!
 ᐧ

  On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das 
 wrote:

> Absolutely. The same code would work for local as well as distributed
> mode!
>
> On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy <
> vadim.bichuts...@gmail.com> wrote:
>
>> Can I use broadcast vars in local mode?
>> ᐧ
>>
>> On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das 
>> wrote:
>>
>>> Yep. Not efficient. Pretty bad actually. That's why broadcast
>>> variable were introduced right at the very beginning of Spark.
>>>
>>>
>>>
>>> On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy <
>>> vadim.bichuts...@gmail.com> wrote:
>>>
 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to
 "production" on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das >>> > wrote:

>  Is the mylist present on every executor? If not, then you have
> to pass it on. And broadcasts are the best way to pass them on. But 
> note
> that once broadcasted it will immutable at the executors, and if you 
> update
> the list at the driver, you will have to broadcast it again.
>
> TD
>
> On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy <
> vadim.bichuts...@gmail.com> wrote:
>
>> I am using Spark Streaming with Python. For each RDD, I call a
>> map, i.e., myrdd.map(myfunc), myf

Slower performance when bigger memory?

2015-04-23 Thread Shuai Zheng
Hi All,

 

I am running some benchmark on r3*8xlarge instance. I have a cluster with
one master (no executor on it) and one slave (r3*8xlarge).

 

My job has 1000 tasks in stage 0.

 

R3*8xlarge has 244G memory and 32 cores.

 

If I create 4 executors, each has 8 core+50G memory, each task will take
around 320s-380s. And if I only use one big executor with 32 cores and 200G
memory, each task will take 760s-900s.

 

And I check the log, looks like the minor GC takes much longer when using
200G memory:

 

285.242: [GC [PSYoungGen: 29027310K->8646087K(31119872K)]
38810417K->19703013K(135977472K), 11.2509770 secs] [Times: user=38.95
sys=120.65, real=11.25 secs] 

 

And when it uses 50G memory, the minor GC takes only less than 1s.

 

I try to see what is the best way to configure the Spark. For some special
reason, I tempt to use a bigger memory on single executor if no significant
penalty on performance. But now looks like it is?

 

Anyone has any idea?

 

Regards,

 

Shuai



[Spark Streaming] Help with updateStateByKey()

2015-04-23 Thread allonsy
Hi everybody,

I think I could use some help with the /updateStateByKey()/ JAVA method in
Spark Streaming.

*Context:*

I have a /JavaReceiverInputDStream du/ DStream, where object
/DataUpdate/ mainly has 2 fields of interest (in my case), namely
du.personId (an Integer) and du.cell.hashCode() (Integer, again). Obviously,
I am processing several /DataUpdate/ objects (coming from a log file read in
microbatches), and every /personId/ will be 'associated' to several
/du.cell.hashCode()/s.

What I need to do is, for every /personId/ statefully counting how many
times it appears with a particular /du.cell.hashCode()/, possibly
partitioning by the /personId/ key.

(Long story short: an area is split in cells and I wonder how many times
every person appears in every cell  )

In a very naive way, I guess everything should look like a
/HashMap/, but I am not quite
sure how to partition by /personId/ and increase the count. 

It looks like method /updateStateByKey()/ should do the trick (I am new to
Spark Streaming), yet I can't figure out in which way.

Any suggestions?

Feel free to ask anything in case I was unclear or more information is
needed. :)


Thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Help-with-updateStateByKey-tp22637.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Using Spark streaming to create a large volume of small nano-batch input files, 
~4k per file, thousands of ‘part-x’ files.  When reading the nano-batch 
files and doing a distributed calculation my tasks run only on the machine 
where it was launched. I’m launching in “yarn-client” mode. The rdd is created 
using sc.textFile(“list of thousand files”)

What would cause the read to occur only on the machine that launched the 
driver. 

Do I need to do something to the RDD after reading? Has some partition factor 
been applied to all derived rdds?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: A Spark Group by is running forever

2015-04-23 Thread ๏̯͡๏
I have seen multiple blogs stating to use reduceByKey instead of
groupByKey. Could someone please help me in converting below code to use
reduceByKey


Code

some spark processing
...

Below
val viEventsWithListingsJoinSpsLevelMetric:
 
org.apache.spark.rdd.RDD[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
 com.ebay.ep.poc.spark.reporting.process.detail.viewitem.provider.VISummary,
Long)]


  val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy {
  case (viDetail, vi, itemId) =>
(viDetail.get(0), viDetail.get(1).asInstanceOf[Long],
viDetail.get(2), viDetail.get(8).asInstanceOf[Int])
}

We grouby above key so that we get an iterable (list), with list we can
compute .max values for powersellers and sellerstdlevel.

val powerSellerLevel = sellerSegments.map {
  case (k, v) =>
val viGrouped = v.toList
val viPowerSellers = viGrouped.map { viTuple =>
Option(viTuple._2.powerSellerLevel).getOrElse("") }
val viSellerStandardLevels = viGrouped.map { viTuple =>
Option(viTuple._2.sellerStdLevel).getOrElse("") }
val powerSellerLevel = viPowerSellers.max
val sellerStandardLevel = viSellerStandardLevels.max
val viEventDetail = viGrouped.head._1
val viSummary = viGrouped.head._2
viSummary.powerSellerLevel = powerSellerLevel
viSummary.sellerStdLevel = sellerStandardLevel
viSummary.itemId = viGrouped.head._3
(viEventDetail, viSummary)
}


The above groupBy query ran for 6H and does not seem to finish. Hence i
started thinking of reduceByKey. Now reduceByKey() needs pairs and hence i
modified viEventsWithListingsJoinSpsLevelMetric ( x,y,z) to
viEventsWithListingsJoinSpsLevelMetric (A,B).

I moved the key generated through groupByquery into the processing of
viEventsWithListingsJoinSpsLevelMetric, so that
viEventsWithListingsJoinSpsLevelMetric is of type A,B. Hence it is modified
as

(((viEventDetail.get(0), viEventDetail.get(1).asInstanceOf[Long],
viEventDetail.get(2),
viEventDetail.get(8).asInstanceOf[Int])),(viEventDetail, viSummary,
itemId)).

Now i want to compute max values, and i do the next processing using
reduceByKey

val powerSellerLevel = viEventsWithListingsJoinSpsLevelMetric.reduceByKey {

  case (k, v) =>

val viGrouped = v.toList

 // Some code to compute max needs to go here.

}


But i get a compiler error that v.toList is not supported.

[ERROR]
/Users/dvasthimal/ebay/projects/ep-spark/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/detail/viewitem/provider/VISummaryDataProvider.scala:115:
error: value toList is not a member of
(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
com.ebay.ep.poc.spark.reporting.process.detail.viewitem.provider.VISummary,
Long)

[INFO] val viGrouped = v.toList

[INFO]   ^

[ERROR] one error found


Now if you think, groupBy was generating (k, Iterable) and hence the next
map() could get list and run through that list to compute max. How is that
possible with reduceByKey because it never generates max.


Suggestions are appreciated.


-Deepak















On Thu, Apr 23, 2015 at 1:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> I have a groupBy query after a map-side join & leftOuterJoin. And this
> query is running for more than 2 hours.
>
>
> asks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
> RecordsErrors  0 36 0 RUNNING PROCESS_LOCAL 17 /
> phxaishdc9dn1560.stratus.phx.ebay.com 2015/04/22 23:27:00 1.4 h  29 s
> 61.8 MB / 63144909  0.0 B / 0
>
>
>
> The input looks to be only 60 MB.
> *Command*
> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
> --jars
> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>  *--num-executors 36 --driver-memory 12g --driver-java-options
> "-XX:MaxPermSize=8G" --executor-memory 12g* *--executor-cores 6* --queue
> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
> startDate=2015-04-6 endDate=2015-04-7
> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
> maxbuffersize=1068 maxResultSize=2G
>
> Queries
>
> 1. val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi)
> }
> 2.  Brodcast Map - Join
>
> val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
> .collectAsMapval broadCastMap = sc.broadcast(lstgItemMap)
>
> val viEventsWithListings: RDD[(Long, (DetailInp

RE: Map Question

2015-04-23 Thread Ganelin, Ilya
You need to expose that variable the same way you'd expose any other variable 
in Python that you wanted to see across modules. As long as you share a spark 
context all will work as expected.

http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable



Sent with Good (www.good.com)


-Original Message-
From: Vadim Bichutskiy 
[vadim.bichuts...@gmail.com]
Sent: Thursday, April 23, 2015 12:00 PM Eastern Standard Time
To: Tathagata Das
Cc: user@spark.apache.org
Subject: Re: Map Question

Here it is. How do I access a broadcastVar in a function that's in another 
module (process_stuff.py below):

Thanks,
Vadim

main.py
---

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from process_stuff import myfunc
from metadata import get_metadata

conf = SparkConf().setAppName('My App').setMaster('local[4]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30)
sqlContext = SQLContext(sc)

distFile = ssc.textFileStream("s3n://...")

distFile.foreachRDD(process)

mylist = get_metadata()

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

## mylist and broadcastVar, broadcastVar.value print fine

def getSqlContextInstance(sparkContext):

if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']

def process(rdd):

sqlContext = getSqlContextInstance(rdd.context)

if rdd.take(1):

jsondf = sqlContext.jsonRDD(rdd)

#jsondf.printSchema()

jsondf.registerTempTable('mytable')

stuff = sqlContext.sql("SELECT ...")
stuff_mapped = stuff.map(myfunc)  ## I want myfunc to see mylist from 
above?

...

process_stuff.py
--

def myfunc(x):

metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX?

...


metadata.py


def get_metadata():

...

return mylist
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3D&type=zerocontent&guid=d750a2b5-528a-47e7-8d0c-df37c6ff3370]ᐧ


On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das 
mailto:t...@databricks.com>> wrote:
Can you give full code? especially the myfunc?

On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy 
mailto:vadim.bichuts...@gmail.com>> wrote:
Here's what I did:

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

The above works fine,

but when I call myrdd.map(myfunc) I get NameError: global name 'broadcastVar' 
is not defined

The myfunc function is in a different module. How do I make it aware of 
broadcastVar?
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3D&type=zerocontent&guid=cccea2c4-02b9-45f0-9e40-d25891e0ded5]ᐧ

On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy 
mailto:vadim.bichuts...@gmail.com>> wrote:
Great. Will try to modify the code. Always room to optimize!
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3D&type=zerocontent&guid=82843831-9ce6-4e1b-9fe8-72b9b7180fc4]ᐧ

On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das 
mailto:t...@databricks.com>> wrote:
Absolutely. The same code would work for local as well as distributed mode!

On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
mailto:vadim.bichuts...@gmail.com>> wrote:
Can I use broadcast vars in local mode?
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3D&type=zerocontent&guid=641ba5c3-4ac7-4614-84a9-45aafd24502f]ᐧ

On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das 
mailto:t...@databricks.com>> wrote:
Yep. Not efficient. Pretty bad actually. That's why broadcast variable were 
introduced right at the very beginning of Spark.



On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
mailto:vadim.bichuts...@gmail.com>> wrote:
Thanks TD. I was looking into broadcast variables.

Right now I am running it locally...and I plan to move it to "production" on 
EC2.

The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but 
I don't think it's efficient?

mylist is filled only once at the start and never changes.

Vadim
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3D&type=zerocontent&guid=5aa8db9d-d2c8-49b1-821f-621a3d2aaf87]ᐧ

On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das 
mailto:t...@databricks.com>> wrote:
Is the mylist present on every executor? If not, then you have to pass it on. 
And broadcasts are the best way to pass them on. But note that once broadcasted 
it will immutable at the executors, and if you update the list at the driver, 
you will have to broadcast it again.

TD

On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
mailto:vadim.bichuts...@gmail.com>> wrote:
I am using Spark Streaming with Python. For each RDD, 

Re: Trouble working with Spark-CSV package (error: object databricks is not a member of package com)

2015-04-23 Thread Mohammed Omer
Hm, no I don't have that in my path.

However, someone on the spark-csv project advised that since I could not
get another package/example to work, that this might be a Spark / Yarn
issue: https://github.com/databricks/spark-csv/issues/54

Thoughts? I'll open a ticket later this afternoon if the discussion turns
that way.

Thank you, by the way, for the work on this project.

Mo

On Thu, Apr 23, 2015 at 5:17 AM, Krishna Sankar  wrote:

> Do you have commons-csv-1.1-bin.jar in your path somewhere ? I had to
> download and add this.
> Cheers
> 
>
> On Wed, Apr 22, 2015 at 11:01 AM, Mohammed Omer 
> wrote:
>
>> Afternoon all,
>>
>> I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via:
>>
>> `mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package`
>>
>> The error is encountered when running spark shell via:
>>
>> `spark-shell --packages com.databricks:spark-csv_2.11:1.0.3`
>>
>> The full trace of the commands can be found at
>> https://gist.github.com/momer/9d1ca583f9978ec9739d
>>
>> Not sure if I've done something wrong, or if the documentation is
>> outdated, or...?
>>
>> Would appreciate any input or push in the right direction!
>>
>> Thank you,
>>
>> Mo
>>
>
>


Re: Map Question

2015-04-23 Thread Vadim Bichutskiy
Here it is. How do I access a broadcastVar in a function that's in another
module (process_stuff.py below):

Thanks,
Vadim

main.py
---

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from process_stuff import myfunc
from metadata import get_metadata

conf = SparkConf().setAppName('My App').setMaster('local[4]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30)
sqlContext = SQLContext(sc)

distFile = ssc.textFileStream("s3n://...")

distFile.foreachRDD(process)

mylist = get_metadata()

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

## mylist and broadcastVar, broadcastVar.value print fine

def getSqlContextInstance(sparkContext):

if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] =
SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']

def process(rdd):

sqlContext = getSqlContextInstance(rdd.context)

if rdd.take(1):

jsondf = sqlContext.jsonRDD(rdd)

#jsondf.printSchema()

jsondf.registerTempTable('mytable')

stuff = sqlContext.sql("SELECT ...")
stuff_mapped = stuff.map(myfunc)  ## I want myfunc to see mylist from
above?

...

process_stuff.py
--

def myfunc(x):

metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO
FIX?

...


metadata.py


def get_metadata():

...

return mylist
ᐧ

On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das  wrote:

> Can you give full code? especially the myfunc?
>
> On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy <
> vadim.bichuts...@gmail.com> wrote:
>
>> Here's what I did:
>>
>> print 'BROADCASTING...'
>> broadcastVar = sc.broadcast(mylist)
>> print broadcastVar
>> print broadcastVar.value
>> print 'FINISHED BROADCASTING...'
>>
>> The above works fine,
>>
>> but when I call myrdd.map(myfunc) I get *NameError: global name
>> 'broadcastVar' is not defined*
>>
>> The myfunc function is in a different module. How do I make it aware of
>> broadcastVar?
>> ᐧ
>>
>> On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy <
>> vadim.bichuts...@gmail.com> wrote:
>>
>>> Great. Will try to modify the code. Always room to optimize!
>>> ᐧ
>>>
>>> On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das 
>>> wrote:
>>>
 Absolutely. The same code would work for local as well as distributed
 mode!

 On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy <
 vadim.bichuts...@gmail.com> wrote:

> Can I use broadcast vars in local mode?
> ᐧ
>
> On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das 
> wrote:
>
>> Yep. Not efficient. Pretty bad actually. That's why broadcast
>> variable were introduced right at the very beginning of Spark.
>>
>>
>>
>> On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy <
>> vadim.bichuts...@gmail.com> wrote:
>>
>>> Thanks TD. I was looking into broadcast variables.
>>>
>>> Right now I am running it locally...and I plan to move it to
>>> "production" on EC2.
>>>
>>> The way I fixed it is by doing myrdd.map(lambda x: (x,
>>> mylist)).map(myfunc) but I don't think it's efficient?
>>>
>>> mylist is filled only once at the start and never changes.
>>>
>>> Vadim
>>> ᐧ
>>>
>>> On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das 
>>> wrote:
>>>
 Is the mylist present on every executor? If not, then you have to
 pass it on. And broadcasts are the best way to pass them on. But note 
 that
 once broadcasted it will immutable at the executors, and if you update 
 the
 list at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy <
 vadim.bichuts...@gmail.com> wrote:

> I am using Spark Streaming with Python. For each RDD, I call a
> map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. 
> In yet
> another separate Python module I have a global list, i.e. mylist,
> that's populated with metadata. I can't get myfunc to see 
> mylist...it's
> always empty. Alternatively, I guess I could pass mylist to map.
>
> Any suggestions?
>
> Thanks,
> Vadim
>


>>>
>>
>

>>>
>>
>


Re: Instantiating/starting Spark jobs programmatically

2015-04-23 Thread Anshul Singhle
Hi firemonk9,

What you're doing looks interesting. Can you share some more details?
Are you running the same spark context for each job, or are you running a
seperate spark context for each job?
Does your system need sharing of rdd's across multiple jobs? If yes, how do
you implement that?
Also what prompted you to run Yarn instead of standalone? Does this give
some performance benefit? Have you evaluated yarn vs mesos?
Also have you looked at spark jobserver by ooyala? It makes doing some if
the stuff I mentioned easier. IIRC it also works with yarn. Definitely
works with Mesos. Heres the link
https://github.com/spark-jobserver/spark-jobserver

Thanks
Anshul
On 23 Apr 2015 20:32, "Dean Wampler"  wrote:

> I strongly recommend spawning a new process for the Spark jobs. Much
> cleaner separation. Your driver program won't be clobbered if the Spark job
> dies, etc. It can even watch for failures and restart.
>
> In the Scala standard library, the sys.process package has classes for
> constructing and interoperating with external processes. Perhaps Java has
> something similar these days?
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Tue, Apr 21, 2015 at 2:15 PM, Steve Loughran 
> wrote:
>
>>
>>  On 21 Apr 2015, at 17:34, Richard Marscher 
>> wrote:
>>
>> - There are System.exit calls built into Spark as of now that could kill
>> your running JVM. We have shadowed some of the most offensive bits within
>> our own application to work around this. You'd likely want to do that or to
>> do your own Spark fork. For example, if the SparkContext can't connect to
>> your cluster master node when it is created, it will System.exit.
>>
>>
>> people can block "errant" System.exit calls by running under a
>> SecurityManager. Less than ideal (and there's a small performance hit) -but
>> possible
>>
>
>


Spark + Hue

2015-04-23 Thread MrAsanjar .
Hi all
Is there any good documentation on how to integrate spark with Hue 3.7.x?
Is the only way to install spark Job Server?
Thanks in advance for your help


dynamicAllocation & spark-shell

2015-04-23 Thread Michael Stone
If I enable dynamicAllocation and then use spark-shell or pyspark, 
things start out working as expected: running simple commands causes new 
executors to start and complete tasks. If the shell is left idle for a 
while, executors start getting killed off:


15/04/23 10:52:43 INFO cluster.YarnClientSchedulerBackend: Requesting to kill 
executor(s) 368
15/04/23 10:52:43 INFO spark.ExecutorAllocationManager: Removing executor 368 
because it has been idle for 600 seconds (new desired total will be 665)

That makes sense. But the action also results in error messages:

15/04/23 10:52:47 ERROR cluster.YarnScheduler: Lost executor 368 on hostname: 
remote Akka client disassociated
15/04/23 10:52:47 INFO scheduler.DAGScheduler: Executor lost: 368 (epoch 0)
15/04/23 10:52:47 INFO spark.ExecutorAllocationManager: Existing executor 368 
has been removed (new total is 665)
15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Trying to remove 
executor 368 from BlockManagerMaster.
15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Removing block manager 
BlockManagerId(368, hostname, 35877)
15/04/23 10:52:47 INFO storage.BlockManagerMaster: Removed 368 successfully in 
removeExecutor

After that, trying to run a simple command results in:

15/04/23 10:13:30 ERROR util.Utils: Uncaught exception in thread 
spark-dynamic-executor-allocation-0
java.lang.IllegalArgumentException: Attempted to request a negative number of 
executor(s) -663 from the cluster manager. Please specify a positive number!

And then only the single remaining executor attempts to complete the new 
tasks. Am I missing some kind of simple configuration item, are other 
people seeing the same behavior as a bug, or is this actually expected?


Mike Stone

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Instantiating/starting Spark jobs programmatically

2015-04-23 Thread Dean Wampler
I strongly recommend spawning a new process for the Spark jobs. Much
cleaner separation. Your driver program won't be clobbered if the Spark job
dies, etc. It can even watch for failures and restart.

In the Scala standard library, the sys.process package has classes for
constructing and interoperating with external processes. Perhaps Java has
something similar these days?

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Tue, Apr 21, 2015 at 2:15 PM, Steve Loughran 
wrote:

>
>  On 21 Apr 2015, at 17:34, Richard Marscher 
> wrote:
>
> - There are System.exit calls built into Spark as of now that could kill
> your running JVM. We have shadowed some of the most offensive bits within
> our own application to work around this. You'd likely want to do that or to
> do your own Spark fork. For example, if the SparkContext can't connect to
> your cluster master node when it is created, it will System.exit.
>
>
> people can block "errant" System.exit calls by running under a
> SecurityManager. Less than ideal (and there's a small performance hit) -but
> possible
>


Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-04-23 Thread Ted Yu
NativeS3FileSystem class is in hadoop-aws jar.
Looks like it was not on classpath.

Cheers

On Thu, Apr 23, 2015 at 7:30 AM, Sujee Maniyam  wrote:

> Thanks all...
>
> btw, s3n load works without any issues with  spark-1.3.1-bulit-for-hadoop
> 2.4
>
> I tried this on 1.3.1-hadoop26
> >  sc.hadoopConfiguration.set("fs.s3n.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> > val f = sc.textFile("s3n://bucket/file")
> > f.count
>
> No it can't find the implementation path.  Looks like some jar is missing ?
>
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>
> On Wednesday, April 22, 2015, Shuai Zheng  wrote:
>
>> Below is my code to access s3n without problem (only for 1.3.1. there is
>> a bug in 1.3.0).
>>
>>
>>
>>   Configuration hadoopConf = ctx.hadoopConfiguration();
>>
>>   hadoopConf.set("fs.s3n.impl",
>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
>>
>>   hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId);
>>
>>   hadoopConf.set("fs.s3n.awsSecretAccessKey",
>> awsSecretAccessKey);
>>
>>
>>
>> Regards,
>>
>>
>>
>> Shuai
>>
>>
>>
>> *From:* Sujee Maniyam [mailto:su...@sujee.net]
>> *Sent:* Wednesday, April 22, 2015 12:45 PM
>> *To:* Spark User List
>> *Subject:* spark 1.3.1 : unable to access s3n:// urls (no file system
>> for scheme s3n:)
>>
>>
>>
>> Hi all
>>
>> I am unable to access s3n://  urls using   sc.textFile().. getting 'no
>> file system for scheme s3n://'  error.
>>
>>
>>
>> a bug or some conf settings missing?
>>
>>
>>
>> See below for details:
>>
>>
>>
>> env variables :
>>
>> AWS_SECRET_ACCESS_KEY=set
>>
>> AWS_ACCESS_KEY_ID=set
>>
>>
>>
>> spark/RELAESE :
>>
>> Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0
>>
>> Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive
>> -Phive-thriftserver -Pyarn -DzincPort=3034
>>
>>
>>
>>
>>
>> ./bin/spark-shell
>>
>> > val f = sc.textFile("s3n://bucket/file")
>>
>> > f.count
>>
>>
>>
>> error==>
>>
>> java.io.IOException: No FileSystem for scheme: s3n
>>
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>>
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
>>
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>>
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>>
>> at
>> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
>>
>> at
>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
>>
>> at
>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
>>
>> at
>> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
>>
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>
>> at scala.Option.getOrElse(Option.scala:120)
>>
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>>
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>
>> at scala.Option.getOrElse(Option.scala:120)
>>
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>
>> at
>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
>>
>> at org.apache.spark.rdd.RDD.count(RDD.scala:1006)
>>
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24)
>>
>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:29)
>>
>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:31)
>>
>> at $iwC$$iwC$$iwC$$iwC$$iwC.(:33)
>>
>> at $iwC$$iwC$$iwC$$iwC.(:35)
>>
>> at $iwC$$iwC$$iwC.(:37)
>>
>> at $iwC$$iwC.(:39)
>>
>> at $iwC.(:41)
>>
>> at (:43)
>>
>> at .(:47)
>>
>> at .()
>>
>> at .(:7)
>>
>> at .()
>>
>> at $print()
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.refl

Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-04-23 Thread Sujee Maniyam
Thanks all...

btw, s3n load works without any issues with  spark-1.3.1-bulit-for-hadoop
2.4

I tried this on 1.3.1-hadoop26
>  sc.hadoopConfiguration.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> val f = sc.textFile("s3n://bucket/file")
> f.count

No it can't find the implementation path.  Looks like some jar is missing ?

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

On Wednesday, April 22, 2015, Shuai Zheng  wrote:

> Below is my code to access s3n without problem (only for 1.3.1. there is a
> bug in 1.3.0).
>
>
>
>   Configuration hadoopConf = ctx.hadoopConfiguration();
>
>   hadoopConf.set("fs.s3n.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
>
>   hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId);
>
>   hadoopConf.set("fs.s3n.awsSecretAccessKey",
> awsSecretAccessKey);
>
>
>
> Regards,
>
>
>
> Shuai
>
>
>
> *From:* Sujee Maniyam [mailto:su...@sujee.net
> ]
> *Sent:* Wednesday, April 22, 2015 12:45 PM
> *To:* Spark User List
> *Subject:* spark 1.3.1 : unable to access s3n:// urls (no file system for
> scheme s3n:)
>
>
>
> Hi all
>
> I am unable to access s3n://  urls using   sc.textFile().. getting 'no
> file system for scheme s3n://'  error.
>
>
>
> a bug or some conf settings missing?
>
>
>
> See below for details:
>
>
>
> env variables :
>
> AWS_SECRET_ACCESS_KEY=set
>
> AWS_ACCESS_KEY_ID=set
>
>
>
> spark/RELAESE :
>
> Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0
>
> Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive
> -Phive-thriftserver -Pyarn -DzincPort=3034
>
>
>
>
>
> ./bin/spark-shell
>
> > val f = sc.textFile("s3n://bucket/file")
>
> > f.count
>
>
>
> error==>
>
> java.io.IOException: No FileSystem for scheme: s3n
>
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
>
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
>
> at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
>
> at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>
> at
> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
>
> at
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
>
> at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
>
> at
> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
>
> at org.apache.spark.rdd.RDD.count(RDD.scala:1006)
>
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24)
>
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:29)
>
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:31)
>
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:33)
>
> at $iwC$$iwC$$iwC$$iwC.(:35)
>
> at $iwC$$iwC$$iwC.(:37)
>
> at $iwC$$iwC.(:39)
>
> at $iwC.(:41)
>
> at (:43)
>
> at .(:47)
>
> at .()
>
> at .(:7)
>
> at .()
>
> at $print()
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>
>

How to start Thrift JDBC server as part of standalone spark application?

2015-04-23 Thread Vladimir Grigor
Hello,

I would like to export RDD/DataFrames via JDBC SQL interface from the
standalone application for currently stable Spark v1.3.1.

I found one way of doing it but it requires the use of @DeveloperAPI method
HiveThriftServer2.startWithContext(sqlContext)

Is there a better, production level approach to do that?

Full code snippet is below:
// you can run it via:
// ../spark/bin/spark-submit --master local[*] --class "SimpleApp"
target/scala-2.10/simple-project_2.10-1.0.jar src/test/resources/1.json
tableFromJson


import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

object SimpleApp {

  def main(args: Array[String]) {

if (args.length != 2) {
  Console.err.println("Usage: app  ")
  System.exit(1)
}
val sourceFile = args(0)
val tableName = args(1)

val sparkConf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)

val df = sqlContext.jsonFile(sourceFile)
df.registerTempTable(tableName)

println("Registered temp table %s for data source
%s".format(tableName, sourceFile))

HiveThriftServer2.startWithContext(sqlContext)

  }
}





Best, Vladimir Grigor


Streaming Kmeans usage in java

2015-04-23 Thread Jeetendra Gangele
Do everyone do we have sample example how to use streaming k-means
clustering with java. I have seen some example usage in scala. can anybody
point me to the java example?

regards
jeetendra


Re: How to debug Spark on Yarn?

2015-04-23 Thread Ted Yu
For step 2, you can pipe application log to a file instead of copy-pasting. 

Cheers



> On Apr 22, 2015, at 10:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:
> 
> I submit a spark app to YARN and i get these messages
> 
> 
> 
> 15/04/22 22:45:04 INFO yarn.Client: Application report for 
> application_1429087638744_101363 (state: RUNNING)
> 
> 
> 15/04/22 22:45:04 INFO yarn.Client: Application report for 
> application_1429087638744_101363 (state: RUNNING).
> 
> ...
> 
> 
> 
> 1) I can go to Spark UI and see the status of the APP but cannot see the logs 
> as the job progresses. How can i see logs of executors as they progress ?
> 
> 2) In case the App fails/completes then Spark UI vanishes and i get a YARN 
> Job page that says job failed, there are no link to Spark UI again. Now i 
> take the job ID and run yarn application logs appid and my console fills up 
> (with huge scrolling) with logs of all executors. Then i copy and paste into 
> a text editor and search for keywords "Exception" , "Job aborted due to ". Is 
> this the right way to view logs ?
> 
> 
> -- 
> Deepak
> 


Is there a way to get the list of all jobs?

2015-04-23 Thread mkestemont
Hello,

I am currently trying to monitor the progression of jobs. I created a class
extending SparkListener, added a jobProgressListener to my sparkContext, and
overrided the methods OnTaskStart, OnTaskEnd, OnJobStart and OnJobEnd, which
leads to good results.

Then, I would also like to monitor the progression of one job in comparison
to the global progression of all jobs. I guess this is not directly
possible, so I would like to retrieve the list of all jobs (or at least, the
number of jobs), so that I can approximate the global progression by
dividing the progression of one job by the total number of jobs.

However, I do not find how to do this. I searched through the
JobProgressListener API, but I only found methods to get the list of active
jobs, or the list of already completed jobs. Is there a way to get the
number or the list of jobs in the current version of Spark ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-get-the-list-of-all-jobs-tp22635.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-04-23 Thread Rok Roskar
ok yes, I think I have narrowed it down to being a problem with driver
memory settings. It looks like the application master/driver is not being
launched with the settings specified:

For the driver process on the main node I see "-XX:MaxPermSize=128m
-Xms512m -Xmx512m" as options used to start the JVM, even though I
specified

'spark.yarn.am.memory', '5g'
'spark.yarn.am.memoryOverhead', '2000'

The info shows that these options were read:

15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120
MB memory including 2000 MB overhead

Is there some reason why these options are being ignored and instead
starting the driver with just 512Mb of heap?

On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar  wrote:

> the feature dimension is 800k.
>
> yes, I believe the driver memory is likely the problem since it doesn't
> crash until the very last part of the tree aggregation.
>
> I'm running it via pyspark through YARN -- I have to run in client mode so
> I can't set spark.driver.memory -- I've tried setting the
> spark.yarn.am.memory and overhead parameters but it doesn't seem to have an
> effect.
>
> Thanks,
>
> Rok
>
> On Apr 23, 2015, at 7:47 AM, Xiangrui Meng  wrote:
>
> > What is the feature dimension? Did you set the driver memory? -Xiangrui
> >
> > On Tue, Apr 21, 2015 at 6:59 AM, rok  wrote:
> >> I'm trying to use the StandardScaler in pyspark on a relatively small
> (a few
> >> hundred Mb) dataset of sparse vectors with 800k features. The fit
> method of
> >> StandardScaler crashes with Java heap space or Direct buffer memory
> errors.
> >> There should be plenty of memory around -- 10 executors with 2 cores
> each
> >> and 8 Gb per core. I'm giving the executors 9g of memory and have also
> tried
> >> lots of overhead (3g), thinking it might be the array creation in the
> >> aggregators that's causing issues.
> >>
> >> The bizarre thing is that this isn't always reproducible -- sometimes it
> >> actually works without problems. Should I be setting up executors
> >> differently?
> >>
> >> Thanks,
> >>
> >> Rok
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
>
>


Re: MLlib - Collaborative Filtering - trainImplicit task size

2015-04-23 Thread Christian S. Perone
All these warnings come from ALS iterations, from flatMap and also from
aggregate, for instance the origin of the state where the flatMap is
showing these warnings (w/ Spark 1.3.0, they are also shown in Spark 1.3.1):

org.apache.spark.rdd.RDD.flatMap(RDD.scala:296)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

And from the aggregate:

org.apache.spark.rdd.RDD.aggregate(RDD.scala:968)
org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)



On Thu, Apr 23, 2015 at 2:49 AM, Xiangrui Meng  wrote:

> This is the size of the serialized task closure. Is stage 246 part of
> ALS iterations, or something before or after it? -Xiangrui
>
> On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone
>  wrote:
> > Hi Sean, thanks for the answer. I tried to call repartition() on the
> input
> > with many different sizes and it still continues to show that warning
> > message.
> >
> > On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen  wrote:
> >>
> >> I think maybe you need more partitions in your input, which might make
> >> for smaller tasks?
> >>
> >> On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone
> >>  wrote:
> >> > I keep seeing these warnings when using trainImplicit:
> >> >
> >> > WARN TaskSetManager: Stage 246 contains a task of very large size (208
> >> > KB).
> >> > The maximum recommended task size is 100 KB.
> >> >
> >> > And then the task size starts to increase. Is this a known issue ?
> >> >
> >> > Thanks !
> >> >
> >> > --
> >> > Blog | Github | Twitter
> >> > "Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great
> >> > big
> >> > joke on me."
> >
> >
> >
> >
> > --
> > Blog | Github | Twitter
> > "Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
> > joke on me."
>



-- 
Blog  | Github 
| Twitter 
"Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
joke on me."


Contributors, read me! Updated Contributing to Spark wiki

2015-04-23 Thread Sean Owen
Following several discussions about how to improve the contribution
process in Spark, I've overhauled the guide to contributing. Anyone
who is going to contribute needs to read it, as it has more formal
guidance about the process:

https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark

We may push back harder now on pull requests and JIRAs that don't
follow this guidance. It will help everyone spend less time to get
changes in, and spend less time on duplicated effort, or changes that
won't.

A summary of key points is found in CONTRIBUTING.md, a prompt
presented before opening pull requests
(https://github.com/apache/spark/blob/master/CONTRIBUTING.md):

- Is the change important and ready enough to ask the community to
spend time reviewing?
- Have you searched for existing, related JIRAs and pull requests?
- Is this a new feature that can stand alone as a package on
http://spark-packages.org ?
- Is the change being proposed clearly explained and motivated?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ML regression - spark context dies without error

2015-04-23 Thread jamborta
Hi all,

I have been testing Spark ML algorithms with bigger dataset, and ran into
some problems with linear regression:

It seems the executors stop without any apparent reason:

15/04/22 20:15:05 INFO BlockManagerInfo: Added rdd_12492_80 in memory on
backend-node:48037 (size: 28.5 MB, free: 2.8 GB)
15/04/22 20:15:05 INFO BlockManagerInfo: Added rdd_12493_80 in memory on
backend-node:48037 (size: 37.6 MB, free: 2.7 GB)
15/04/22 20:15:08 INFO BlockManagerInfo: Added rdd_12489_81 in memory on
backend-node:48037 (size: 8.4 MB, free: 2.7 GB)
[E 150422 20:15:12 java_gateway:483] Error while sending or receiving.
Traceback (most recent call last):
  File
"/home/azureuser/spark-1.3.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 479, in send_command
raise Py4JError("Answer from Java side is empty")
Py4JError: Answer from Java side is empty

Then sparkcontext stops, too :

[E 150422 20:15:12 java_gateway:431] An error occurred while trying to
connect to the Java server

the problem is that it does not happen all the time, it only fails maybe
once in every five attempts.

any suggestions where can I get more detailed logs from?

Thanks,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ML-regression-spark-context-dies-without-error-tp22633.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building Spark : Adding new DataType in Catalyst

2015-04-23 Thread zia_kayani
I've already tried UDT in Spark 1.2 and 1.3 but I encountered Kryo
Serialization Exception on Joining as tracked  here
  , i've talked to 
Michael Armbrust   
about the Exception, he said "I'll caution you that this is not a stable
public API." So I moved to adding custom dataType into spark. I've got the
answer of my this question from  Iulian Dragoș
that "One way
is to use export SPARK_PREPEND_CLASSES=true. This will instruct the launcher
to prepend the target directories for each project to the spark assembly",
this solved my problem. Thanks...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-Adding-new-DataType-in-Catalyst-tp22604p22632.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is a higher-res or vector version of Spark logo available?

2015-04-23 Thread Enno Shioji
My employer (adform.com) would like to use the Spark logo in a recruitment
event (to indicate that we are using Spark in our company). I looked in the
Spark repo (https://github.com/apache/spark/tree/master/docs/img) but
couldn't find a vector format.

Is a higher-res or vector format version available anywhere?

Enno


Re: Re: HiveContext setConf seems not stable

2015-04-23 Thread guoqing0...@yahoo.com.hk
Hi all , 
My understanding for this problem is SQLConf will be overwrite by the 
hiveconfig in initialization phase when setConf(key: String, value: String)  
being called in the first time as below code snippets , so it is correctly in 
later. I`m not sure whether it is right , any point are welcome. Thanks.
@transient protected[hive] lazy val hiveconf: HiveConf = {
  setConf(sessionState.getConf.getAllProperties)
  sessionState.getConf
}
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = 
synchronized {
  try {
val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), 
hiveconf)...}protected[sql] def runSqlHive(sql: String): 
Seq[String] = {
  val maxResults = 10
  val results = runHive(sql, maxResults)
  // It is very confusing when you only get back some of the results...
  if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED")
  results
}override def setConf(key: String, value: String): Unit = {
  super.setConf(key, value)
  runSqlHive(s"SET $key=$value")

}
 
From: madhu phatak
Date: 2015-04-23 02:17
To: Michael Armbrust
CC: Ophir Cohen; Hao Ren; user
Subject: Re: HiveContext setConf seems not stable
Hi,
calling getConf don't solve the issue. Even many hive specific queries are 
broken. Seems like no hive configurations are getting passed properly. 




Regards,
Madhukara Phatak
http://datamantra.io/

On Wed, Apr 22, 2015 at 2:19 AM, Michael Armbrust  
wrote:
As a workaround, can you call getConf first before any setConf?

On Tue, Apr 21, 2015 at 1:58 AM, Ophir Cohen  wrote:
I think I encounter the same problem, I'm trying to turn on the compression of 
Hive.
I have the following lines:
def initHiveContext(sc: SparkContext): HiveContext = {
val hc: HiveContext = new HiveContext(sc)
hc.setConf("hive.exec.compress.output", "true")
hc.setConf("mapreduce.output.fileoutputformat.compress.codec", 
"org.apache.hadoop.io.compress.SnappyCodec")
hc.setConf("mapreduce.output.fileoutputformat.compress.type", "BLOCK")


logger.info(hc.getConf("hive.exec.compress.output"))
logger.info(hc.getConf("mapreduce.output.fileoutputformat.compress.codec"))
logger.info(hc.getConf("mapreduce.output.fileoutputformat.compress.type"))

hc
  }
And the log for calling it twice:
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: false
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: 
org.apache.hadoop.io.compress.SnappyCodec
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: true
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: 
org.apache.hadoop.io.compress.SnappyCodec
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK

BTW
It worked on 1.2.1...


On Thu, Apr 2, 2015 at 11:47 AM, Hao Ren  wrote:
Hi,

Jira created: https://issues.apache.org/jira/browse/SPARK-6675

Thank you.


On Wed, Apr 1, 2015 at 7:50 PM, Michael Armbrust  wrote:
Can you open a JIRA please?

On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren  wrote:
Hi,

I find HiveContext.setConf does not work correctly. Here are some code snippets 
showing the problem:

snippet 1:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object Main extends App {

  val conf = new SparkConf()
.setAppName("context-test")
.setMaster("local[8]")
  val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)

  hc.setConf("spark.sql.shuffle.partitions", "10")
  hc.setConf("hive.metastore.warehouse.dir", "/home/spark/hive/warehouse_test")
  hc.getAllConfs filter(_._1.contains("warehouse.dir")) foreach println
  hc.getAllConfs filter(_._1.contains("shuffle.partitions")) foreach println
}


Results:
(hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test)
(spark.sql.shuffle.partitions,10)

snippet 2:

...
  hc.setConf("hive.metastore.warehouse.dir", "/home/spark/hive/warehouse_test")
  hc.setConf("spark.sql.shuffle.partitions", "10")
  hc.getAllConfs filter(_._1.contains("warehouse.dir")) foreach println
  hc.getAllConfs filter(_._1.contains("shuffle.partitions")) foreach println
...


Results:
(hive.metastore.warehouse.dir,/user/hive/warehouse)
(spark.sql.shuffle.partitions,10)

You can see that I just permuted the two setConf call, then that leads to two 
different Hive configuration.
It seems that HiveContext can not set a new value on 
"hive.metastore.warehouse.dir" key in one or the first "s

Re: Error in creating spark RDD

2015-04-23 Thread madhvi

On Thursday 23 April 2015 12:22 PM, Akhil Das wrote:
Here's a complete scala example 
https://github.com/bbux-proteus/spark-accumulo-examples/blob/1dace96a115f29c44325903195c8135edf828c86/src/main/scala/org/bbux/spark/AccumuloMetadataCount.scala


Thanks
Best Regards

On Thu, Apr 23, 2015 at 12:19 PM, Akhil Das 
mailto:ak...@sigmoidanalytics.com>> wrote:


Change your import from mapred to mapreduce. like :

import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;

Thanks
Best Regards

On Wed, Apr 22, 2015 at 2:42 PM, madhvi mailto:madhvi.gu...@orkash.com>> wrote:

Hi,

I am creating a spark RDD through accumulo writing like:

JavaPairRDD accumuloRDD =

sc.newAPIHadoopRDD(accumuloJob.getConfiguration(),AccumuloInputFormat.class,Key.class,
Value.class);

But I am getting the following error and it is not getting
compiled:

Bound mismatch: The generic method
newAPIHadoopRDD(Configuration, Class, Class, Class)
of type JavaSparkContext is not applicable for the arguments
(Configuration, Class, Class,
Class). The inferred type AccumuloInputFormat is not a
valid substitute for the bounded parameter >

I am using the following import statements:

import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;

I am not getting what is the problem in this.

Thanks
Madhvi


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org





Hi,

Thanks.I got that solved:)

madhvi


Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-23 Thread Sourav Chandra
HI TD,

Some observations:

1. If I submit the application using spark-submit tool with *client as
deploy mode* it works fine with single master and worker (driver, master
and worker are running in same machine)
2. If I submit the application using spark-submit tool with client as
deploy mode it *crashes after some time with  StackOverflowError* *single
master and 2 workers* (driver, master and 1 worker is running in same
machine, other
worker is in different machine)
 *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0
(TID 5412)*
*java.lang.StackOverflowError*
*at
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)*
*at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)*
*at
java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)*
*at
java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)*
*at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)*
*at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
*at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
*at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
*at java.lang.reflect.Method.invoke(Method.java:606)*
*at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
*at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
*at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
*at java.lang.reflect.Method.invoke(Method.java:606)*
*at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
scala.collection.immutable.$colon$colon.readObject(List.scala:366)*
*at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*


3. If I submit the application

Re: Trouble working with Spark-CSV package (error: object databricks is not a member of package com)

2015-04-23 Thread Krishna Sankar
Do you have commons-csv-1.1-bin.jar in your path somewhere ? I had to
download and add this.
Cheers


On Wed, Apr 22, 2015 at 11:01 AM, Mohammed Omer 
wrote:

> Afternoon all,
>
> I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via:
>
> `mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package`
>
> The error is encountered when running spark shell via:
>
> `spark-shell --packages com.databricks:spark-csv_2.11:1.0.3`
>
> The full trace of the commands can be found at
> https://gist.github.com/momer/9d1ca583f9978ec9739d
>
> Not sure if I've done something wrong, or if the documentation is
> outdated, or...?
>
> Would appreciate any input or push in the right direction!
>
> Thank you,
>
> Mo
>


Re: problem with spark thrift server

2015-04-23 Thread Arush Kharbanda
Hi

What do you mean disable the driver? what are you trying to achieve.

Thanks
Arush

On Thu, Apr 23, 2015 at 12:29 PM, guoqing0...@yahoo.com.hk <
guoqing0...@yahoo.com.hk> wrote:

> Hi ,
> I have a question about spark thrift server , i deployed the spark on yarn
>  and found if the spark driver disable , the spark application will be
> crashed on yarn.  appreciate for any suggestions and idea .
>
> Thank you!
>



-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Spark SQL performance issue.

2015-04-23 Thread Arush Kharbanda
Hi

Can you share your Web UI, depicting your task level breakup.I can see many
thing
s that can be improved.

1. JavaRDD rdds = ...rdds.cache(); ->this caching is not needed as
you are not reading the rdd  for any action

2.Instead of collecting as list, if you can save as text file, it would be
better. As it would avoid moving results to the driver.

Thanks
Arush

On Thu, Apr 23, 2015 at 2:47 PM, Nikolay Tikhonov  wrote:

> > why are you cache both rdd and table?
> I try to cache all the data to avoid the bad performance for the first
> query. Is it right?
>
> > Which stage of job is slow?
> The query is run many times on one sqlContext and each query execution
> takes 1 second.
>
> 2015-04-23 11:33 GMT+03:00 ayan guha :
>
>> Quick questions: why are you cache both rdd and table?
>> Which stage of job is slow?
>> On 23 Apr 2015 17:12, "Nikolay Tikhonov" 
>> wrote:
>>
>>> Hi,
>>> I have Spark SQL performance issue. My code contains a simple JavaBean:
>>>
>>> public class Person implements Externalizable {
>>> private int id;
>>> private String name;
>>> private double salary;
>>> 
>>> }
>>>
>>>
>>> Apply a schema to an RDD and register table.
>>>
>>> JavaRDD rdds = ...
>>> rdds.cache();
>>>
>>> DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class);
>>> dataFrame.registerTempTable("person");
>>>
>>> sqlContext.cacheTable("person");
>>>
>>>
>>> Run sql query.
>>>
>>> sqlContext.sql("SELECT id, name, salary FROM person WHERE salary >=
>>> YYY
>>> AND salary <= XXX").collectAsList()
>>>
>>>
>>> I launch standalone cluster which contains 4 workers. Each node runs on
>>> machine with 8 CPU and 15 Gb memory. When I run the query on the
>>> environment
>>> over RDD which contains 1 million persons it takes 1 minute. Somebody can
>>> tell me how to tuning the performance?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


RE: Custom paritioning of DSTream

2015-04-23 Thread Evo Eftimov
You can use "transform" which yields RDDs from the DStream as on each of the
RDDs you can then apply partitionBy - transform also returns another DSTream
while foreach doesn't 

 

Btw what do you mean re "foreach killing the performance by not distributing
the workload"  - every function (provided it is not Action) applied to an
RDD within foreach is distributed across the cluster since it gets applied
to an RDD 

 

From: davidkl [via Apache Spark User List]
[mailto:ml-node+s1001560n22630...@n3.nabble.com] 
Sent: Thursday, April 23, 2015 10:13 AM
To: Evo Eftimov
Subject: Re: Custom paritioning of DSTream

 

Hello Evo, Ranjitiyer, 

I am also looking for the same thing. Using foreach is not useful for me as
processing the RDD as a whole won't be distributed across workers and that
would kill performance in my application :-/ 

Let me know if you find a solution for this. 

Regards 

  _  

If you reply to this email, your message will be added to the discussion
below:

http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DS
Tream-tp22574p22630.html 

To unsubscribe from Custom paritioning of DSTream, click here
 .
 
 NAML 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22631.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark SQL performance issue.

2015-04-23 Thread Nikolay Tikhonov
> why are you cache both rdd and table?
I try to cache all the data to avoid the bad performance for the first
query. Is it right?

> Which stage of job is slow?
The query is run many times on one sqlContext and each query execution
takes 1 second.

2015-04-23 11:33 GMT+03:00 ayan guha :

> Quick questions: why are you cache both rdd and table?
> Which stage of job is slow?
> On 23 Apr 2015 17:12, "Nikolay Tikhonov" 
> wrote:
>
>> Hi,
>> I have Spark SQL performance issue. My code contains a simple JavaBean:
>>
>> public class Person implements Externalizable {
>> private int id;
>> private String name;
>> private double salary;
>> 
>> }
>>
>>
>> Apply a schema to an RDD and register table.
>>
>> JavaRDD rdds = ...
>> rdds.cache();
>>
>> DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class);
>> dataFrame.registerTempTable("person");
>>
>> sqlContext.cacheTable("person");
>>
>>
>> Run sql query.
>>
>> sqlContext.sql("SELECT id, name, salary FROM person WHERE salary >=
>> YYY
>> AND salary <= XXX").collectAsList()
>>
>>
>> I launch standalone cluster which contains 4 workers. Each node runs on
>> machine with 8 CPU and 15 Gb memory. When I run the query on the
>> environment
>> over RDD which contains 1 million persons it takes 1 minute. Somebody can
>> tell me how to tuning the performance?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Custom paritioning of DSTream

2015-04-23 Thread davidkl
Hello Evo, Ranjitiyer,

I am also looking for the same thing. Using foreach is not useful for me as
processing the RDD as a whole won't be distributed across workers and that
would kill performance in my application :-/

Let me know if you find a solution for this. 

Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22630.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL performance issue.

2015-04-23 Thread ayan guha
Quick questions: why are you cache both rdd and table?
Which stage of job is slow?
On 23 Apr 2015 17:12, "Nikolay Tikhonov"  wrote:

> Hi,
> I have Spark SQL performance issue. My code contains a simple JavaBean:
>
> public class Person implements Externalizable {
> private int id;
> private String name;
> private double salary;
> 
> }
>
>
> Apply a schema to an RDD and register table.
>
> JavaRDD rdds = ...
> rdds.cache();
>
> DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class);
> dataFrame.registerTempTable("person");
>
> sqlContext.cacheTable("person");
>
>
> Run sql query.
>
> sqlContext.sql("SELECT id, name, salary FROM person WHERE salary >= YYY
> AND salary <= XXX").collectAsList()
>
>
> I launch standalone cluster which contains 4 workers. Each node runs on
> machine with 8 CPU and 15 Gb memory. When I run the query on the
> environment
> over RDD which contains 1 million persons it takes 1 minute. Somebody can
> tell me how to tuning the performance?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Convert DStream to DataFrame

2015-04-23 Thread Sergio Jiménez Barrio
Thank you ver much, Tathagata!

El miércoles, 22 de abril de 2015, Tathagata Das 
escribió:

> Aaah, that. That is probably a limitation of the SQLContext (cc'ing Yin
> for more information).
>
>
> On Wed, Apr 22, 2015 at 7:07 AM, Sergio Jiménez Barrio <
> drarse.a...@gmail.com
> > wrote:
>
>> Sorry, this is the error:
>>
>> [error] /home/sergio/Escritorio/hello/streaming.scala:77: Implementation
>> restriction: case classes cannot have more than 22 parameters.
>>
>>
>>
>> 2015-04-22 16:06 GMT+02:00 Sergio Jiménez Barrio > >:
>>
>>> I tried the solution of the guide, but I exceded the size of case class
>>> Row:
>>>
>>>
>>> 2015-04-22 15:22 GMT+02:00 Tathagata Das >> >:
>>>
 Did you checkout the latest streaming programming guide?


 http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

 You also need to be aware of that to convert json RDDs to dataframe,
 sqlContext has to make a pass on the data to learn the schema. This will
 fail if a batch has no data. You have to safeguard against that.

 On Wed, Apr 22, 2015 at 6:19 AM, ayan guha >>> > wrote:

> What about sqlcontext.createDataframe(rdd)?
> On 22 Apr 2015 23:04, "Sergio Jiménez Barrio"  > wrote:
>
>> Hi,
>>
>> I am using Kafka with Apache Stream to send JSON to Apache Spark:
>>
>> val messages = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>
>> Now, I want parse the DStream created to DataFrame, but I don't know
>> if Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the
>> message with:
>>
>> val lines = messages.map(_._2)
>>
>> Thank u for all. Sergio J.
>>
>>
>>

>>>
>>
>

-- 
Atte. Sergio Jiménez


Re: Hive table creation - possible bug in Spark 1.3?

2015-04-23 Thread madhu phatak
Hi Michael,
Here  is the jira issue
and PR  for the same. Please
have a look.




Regards,
Madhukara Phatak
http://datamantra.io/

On Thu, Apr 23, 2015 at 1:22 PM, madhu phatak  wrote:

> Hi,
>  Hive table creation need an extra step from 1.3. You can follow the
> following template
>
>  df.registerTempTable(tableName)
>
>  hc.sql(s"create table $tableName as select * from $tableName")
>
> this will save the table in hive with given tableName.
>
>
>
>
>
>
>
>
>
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>
> On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust 
> wrote:
>
>> Sorry for the confusion.  We should be more clear about the semantics in
>> the documentation. (PRs welcome :) )
>>
>> .saveAsTable does not create a hive table, but instead creates a Spark
>> Data Source table.  Here the metadata is persisted into Hive, but hive
>> cannot read the tables (as this API support MLlib vectors, schema
>> discovery, and other things that hive does not).  If you want to create a
>> hive table, use HiveQL and run a CREATE TABLE AS SELECT ...
>>
>> On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen  wrote:
>>
>>> I wrote few mails here regarding this issue.
>>> After further investigation I think there is a bug in Spark 1.3 in
>>> saving Hive tables.
>>>
>>> (hc is HiveContext)
>>>
>>> 1. Verify the needed configuration exists:
>>> scala> hc.sql("set hive.exec.compress.output").collect
>>> res4: Array[org.apache.spark.sql.Row] =
>>> Array([hive.exec.compress.output=true])
>>> scala> hc.sql("set
>>> mapreduce.output.fileoutputformat.compress.codec").collect
>>> res5: Array[org.apache.spark.sql.Row] =
>>> Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec])
>>> scala> hc.sql("set
>>> mapreduce.output.fileoutputformat.compress.type").collect
>>> res6: Array[org.apache.spark.sql.Row] =
>>> Array([mapreduce.output.fileoutputformat.compress.type=BLOCK])
>>> 2. Loading DataFrame and save as table (path point to exists file):
>>> val saDF = hc.parquetFile(path)
>>> saDF.count
>>>
>>> (count yield 229764 - i.e. the rdd exists)
>>> saDF.saveAsTable("test_hive_ms")
>>>
>>> Now for few interesting outputs:
>>> 1. Trying to query Hive CLI, the table exists but with wrong output
>>> format:
>>> Failed with exception java.io.IOException:java.io.IOException: hdfs://
>>> 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet
>>> not a SequenceFile
>>> 2. Looking at the output files found that files are '.parquet' and not
>>> '.snappy'
>>> 3. Looking at the saveAsTable output shows that it actually store the
>>> table in both, wrong output format and without compression:
>>> 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table:
>>> Table(tableName:test_hive_ms, dbName:default, owner:hadoop,
>>> createTime:1429687014, lastAccessTime:0, retention:0,
>>> sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
>>> comment:from deserializer)], location:null,
>>> inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
>>> outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
>>> compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
>>> serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
>>> parameters:{serialization.format=1, path=hdfs://
>>> 10.166.157.97:9000/user/hive/warehouse/test_hive_ms}
>>> ),
>>> bucketCols:[], sortCols:[], parameters:{},
>>> skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
>>> skewedColValueLocationMaps:{})), partitionKeys:[],
>>> parameters:{spark.sql.sources.schema.part.0={"type":"struct","fields":[{"name":"ADJDATE","type":"long","nullable":true,"metadata":{}},{"name":"sid","type":"integer","nullable":true,"metadata":{}},{"name":"ADJTYPE","type":"integer","nullable":true,"metadata":{}},{"name":"ENDADJDATE","type":"long","nullable":true,"metadata":{}},{"name":"ADJFACTOR","type":"double","nullable":true,"metadata":{}},{"name":"CUMADJFACTOR","type":"double","nullable":true,"metadata":{}}]},
>>> EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
>>> spark.sql.sources.provider=org.apache.spark.sql.parquet},
>>> viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
>>>
>>> So, the question is: do I miss some configuration here or should I open
>>> a bug?
>>>
>>> Thanks,
>>> Ophir
>>>
>>>
>>
>


A Spark Group by is running forever

2015-04-23 Thread ๏̯͡๏
I have a groupBy query after a map-side join & leftOuterJoin. And this
query is running for more than 2 hours.


asks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsErrors
0 36 0 RUNNING PROCESS_LOCAL 17 /
phxaishdc9dn1560.stratus.phx.ebay.com 2015/04/22
23:27:00 1.4 h  29 s  61.8 MB / 63144909  0.0 B / 0



The input looks to be only 60 MB.
*Command*
./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
 *--num-executors 36 --driver-memory 12g --driver-java-options
"-XX:MaxPermSize=8G" --executor-memory 12g* *--executor-cores 6* --queue
hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-04-6 endDate=2015-04-7
input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=2G

Queries

1. val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }
2.  Brodcast Map - Join

val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
.collectAsMapval broadCastMap = sc.broadcast(lstgItemMap)

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= viEvents.mapPartitions({

// buisness logic )}

3.

Left Outer

val spsLevelMetricSum = DataUtil.getSpsLevelMetricSum(sc, startDate)

val spsLvlMetric = spsLevelMetricSum.map { sps => (sps.getUserId.toLong,
sps) }

 val viEventsWithListingsJoinSpsLevelMetric = viEventsWithListings
.leftOuterJoin(spsLvlMetric).map {
 // buisness logic
}

Any thoughts ?

4. Group BY :

val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy {

  case (viDetail, vi, itemId) =>

(viDetail.get(0), viDetail.get(1).asInstanceOf[Long], viDetail.get(2),
viDetail.get(8).asInstanceOf[Int])

}


#4 is very slow.

-- 



Deepak


Re: Hive table creation - possible bug in Spark 1.3?

2015-04-23 Thread madhu phatak
Hi,
 Hive table creation need an extra step from 1.3. You can follow the
following template

 df.registerTempTable(tableName)

 hc.sql(s"create table $tableName as select * from $tableName")

this will save the table in hive with given tableName.









Regards,
Madhukara Phatak
http://datamantra.io/

On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust 
wrote:

> Sorry for the confusion.  We should be more clear about the semantics in
> the documentation. (PRs welcome :) )
>
> .saveAsTable does not create a hive table, but instead creates a Spark
> Data Source table.  Here the metadata is persisted into Hive, but hive
> cannot read the tables (as this API support MLlib vectors, schema
> discovery, and other things that hive does not).  If you want to create a
> hive table, use HiveQL and run a CREATE TABLE AS SELECT ...
>
> On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen  wrote:
>
>> I wrote few mails here regarding this issue.
>> After further investigation I think there is a bug in Spark 1.3 in saving
>> Hive tables.
>>
>> (hc is HiveContext)
>>
>> 1. Verify the needed configuration exists:
>> scala> hc.sql("set hive.exec.compress.output").collect
>> res4: Array[org.apache.spark.sql.Row] =
>> Array([hive.exec.compress.output=true])
>> scala> hc.sql("set
>> mapreduce.output.fileoutputformat.compress.codec").collect
>> res5: Array[org.apache.spark.sql.Row] =
>> Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec])
>> scala> hc.sql("set
>> mapreduce.output.fileoutputformat.compress.type").collect
>> res6: Array[org.apache.spark.sql.Row] =
>> Array([mapreduce.output.fileoutputformat.compress.type=BLOCK])
>> 2. Loading DataFrame and save as table (path point to exists file):
>> val saDF = hc.parquetFile(path)
>> saDF.count
>>
>> (count yield 229764 - i.e. the rdd exists)
>> saDF.saveAsTable("test_hive_ms")
>>
>> Now for few interesting outputs:
>> 1. Trying to query Hive CLI, the table exists but with wrong output
>> format:
>> Failed with exception java.io.IOException:java.io.IOException: hdfs://
>> 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet
>> not a SequenceFile
>> 2. Looking at the output files found that files are '.parquet' and not
>> '.snappy'
>> 3. Looking at the saveAsTable output shows that it actually store the
>> table in both, wrong output format and without compression:
>> 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table:
>> Table(tableName:test_hive_ms, dbName:default, owner:hadoop,
>> createTime:1429687014, lastAccessTime:0, retention:0,
>> sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
>> comment:from deserializer)], location:null,
>> inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
>> outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
>> compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
>> serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
>> parameters:{serialization.format=1, path=hdfs://
>> 10.166.157.97:9000/user/hive/warehouse/test_hive_ms}
>> ),
>> bucketCols:[], sortCols:[], parameters:{},
>> skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
>> skewedColValueLocationMaps:{})), partitionKeys:[],
>> parameters:{spark.sql.sources.schema.part.0={"type":"struct","fields":[{"name":"ADJDATE","type":"long","nullable":true,"metadata":{}},{"name":"sid","type":"integer","nullable":true,"metadata":{}},{"name":"ADJTYPE","type":"integer","nullable":true,"metadata":{}},{"name":"ENDADJDATE","type":"long","nullable":true,"metadata":{}},{"name":"ADJFACTOR","type":"double","nullable":true,"metadata":{}},{"name":"CUMADJFACTOR","type":"double","nullable":true,"metadata":{}}]},
>> EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
>> spark.sql.sources.provider=org.apache.spark.sql.parquet},
>> viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
>>
>> So, the question is: do I miss some configuration here or should I open a
>> bug?
>>
>> Thanks,
>> Ophir
>>
>>
>


Re: Pipeline in pyspark

2015-04-23 Thread ayan guha
I do not think you can share data across spark contexts. So as long as you
can pass it around you should be good.
On 23 Apr 2015 17:12, "Suraj Shetiya"  wrote:

> Hi,
>
> I have come across ways of building pipeline of input/transform and output
> pipelines with Java (Google Dataflow/Spark etc). I also understand that
> Spark itelf provides ways for creating a pipeline within mlib for
> MLtransforms (primarily fit) Both of the above are available in Java/Scala
> environment and the later being supported on Python as well.
>
> However, if my understanding is correct, pipelines within mltransforms
> donot create a complete dataflow transform for non-ml scenarios (ex. io
> transforms, dataframe/graph transforms). Correct me if otherwise. I would
> like to know, what is the best way to create spark dataflow pipeline in a
> generic way. I have a use case where I have my input files in different
> formats and would like to convert them to rdd and further build the
> dataframe transforms and stream/store them finally. I hope not to do Disk
> I/Os between pipeline tasks.
>
>  I also came across luigi(http://luigi.readthedocs.org/en/latest/) on
> Python, but I found that it stores the contents onto disc and reloads it
> for the next phase of the pipeline.
>
> Appreciate if you can share your thoughts.
>
>
> --
> Regards,
> Suraj
>


Re: Problem with using Spark ML

2015-04-23 Thread Staffan
So I got the tip of trying to reduce step-size and that finally gave some
more decent results, had hoped for the default params to give at least OK
results and thought that the problem must be somewhere else in the code.
Problem solved!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591p22628.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pipeline in pyspark

2015-04-23 Thread Suraj Shetiya
Hi,

I have come across ways of building pipeline of input/transform and output
pipelines with Java (Google Dataflow/Spark etc). I also understand that
Spark itelf provides ways for creating a pipeline within mlib for
MLtransforms (primarily fit) Both of the above are available in Java/Scala
environment and the later being supported on Python as well.

However, if my understanding is correct, pipelines within mltransforms
donot create a complete dataflow transform for non-ml scenarios (ex. io
transforms, dataframe/graph transforms). Correct me if otherwise. I would
like to know, what is the best way to create spark dataflow pipeline in a
generic way. I have a use case where I have my input files in different
formats and would like to convert them to rdd and further build the
dataframe transforms and stream/store them finally. I hope not to do Disk
I/Os between pipeline tasks.

 I also came across luigi(http://luigi.readthedocs.org/en/latest/) on
Python, but I found that it stores the contents onto disc and reloads it
for the next phase of the pipeline.

Appreciate if you can share your thoughts.


-- 
Regards,
Suraj


Spark SQL performance issue.

2015-04-23 Thread Nikolay Tikhonov
Hi,
I have Spark SQL performance issue. My code contains a simple JavaBean:

public class Person implements Externalizable {
private int id;
private String name;
private double salary;

}


Apply a schema to an RDD and register table.

JavaRDD rdds = ...
rdds.cache();

DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class);
dataFrame.registerTempTable("person");

sqlContext.cacheTable("person");


Run sql query.

sqlContext.sql("SELECT id, name, salary FROM person WHERE salary >= YYY
AND salary <= XXX").collectAsList()


I launch standalone cluster which contains 4 workers. Each node runs on
machine with 8 CPU and 15 Gb memory. When I run the query on the environment
over RDD which contains 1 million persons it takes 1 minute. Somebody can
tell me how to tuning the performance?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark yarn-cluster job failing in batch processing

2015-04-23 Thread sachin Singh
Hi All,
I am trying to execute batch processing in yarn-cluster mode i.e. I have
many sql insert queries,based on argument provided it will it will fetch the
queries ,create context , schema RDD and insert in hive tables,

Please Note- in standalone mode its working and in cluster mode working is I
configured one query,also I have configured
yarn.nodemanager.delete.debug-sec = 600

I am using below command-

spark-submit --jars
./analiticlibs/utils-common-1.0.0.jar,./analiticlibs/mysql-connector-java-5.1.17.jar,./analiticlibs/log4j-1.2.17.jar
--files datasource.properties,log4j.properties,hive-site.xml --deploy-mode
cluster --master yarn --num-executors 1 --driver-memory 2g
--driver-java-options "-XX:MaxPermSize=1G" --executor-memory 1g
--executor-cores 1 --class com.java.analitics.jobs.StandaloneAggregationJob
sparkanalitics-1.0.0.jar daily_agg 2015-04-21


Exception from Container log-

Exception in thread "Driver" java.lang.ArrayIndexOutOfBoundsException: 2
at
com.java.analitics.jobs.StandaloneAggregationJob.main(StandaloneAggregationJob.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)

exception in our exception log file-

 diagnostics: Application application_1429800386537_0001 failed 2 times due
to AM Container for appattempt_1429800386537_0001_02 exited with 
exitCode: 15 due to: Exception from container-launch.
Container id: container_1429800386537_0001_02_01
Exit code: 15
Stack trace: ExitCodeException exitCode=15: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 15
.Failing this attempt.. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: root.hdfs
 start time: 1429800525569
 final status: FAILED
 tracking URL:
http://tejas.alcatel.com:8088/cluster/app/application_1429800386537_0001
 user: hdfs
2015-04-23 20:19:27 DEBUG Client - stopping client from cache:
org.apache.hadoop.ipc.Client@12f5f40b
2015-04-23 20:19:27 DEBUG Utils - Shutdown hook called

need urgent support,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-yarn-cluster-job-failing-in-batch-processing-tp22626.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark RDD Lifecycle: whether RDD will be reclaimed out of scope

2015-04-23 Thread Prannoy
Hi,

Yes, Spark automatically removes old RDDs from the cache when you make new
ones. Unpersist forces it to remove them right away.

On Thu, Apr 23, 2015 at 9:28 AM, Jeffery [via Apache Spark User List] <
ml-node+s1001560n22618...@n3.nabble.com> wrote:

> Hi, Dear Spark Users/Devs:
>
> In a method, I create a new RDD, and cache it, whether Spark will unpersit
> the RDD automatically after the rdd is out of scope?
>
> I am thinking so, but want to make sure with you, the experts :)
>
> Thanks,
> Jeffery Yuan
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-Lifecycle-whether-RDD-will-be-reclaimed-out-of-scope-tp22618.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-Lifecycle-whether-RDD-will-be-reclaimed-out-of-scope-tp22618p22625.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

problem with spark thrift server

2015-04-23 Thread guoqing0...@yahoo.com.hk
Hi , 
I have a question about spark thrift server , i deployed the spark on yarn  and 
found if the spark driver disable , the spark application will be crashed on 
yarn.  appreciate for any suggestions and idea . 

Thank you!