Filesystem closed Exception

2015-03-20 Thread Sea
Hi, all:




When I exit the console of spark-sql, the following exception throwed..


Exception in thread "Thread-3" java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1677)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
at 
org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:196)
at 
org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388)
at 
org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1388)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)‍

Filesystem closed Exception

2015-03-20 Thread Sea
Hi, all:




When I exit the console of spark-sql, the following exception throwed..


My spark version is 1.3.0, hadoop version is 2.2.0


Exception in thread "Thread-3" java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1677)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
at 
org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:196)
at 
org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388)
at 
org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1388)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)‍

Re: About the env of Spark1.2

2015-03-20 Thread Ted Yu
bq. Caused by: java.net.UnknownHostException: dhcp-10-35-14-100: Name or
service not known

Can you check your DNS ?

Cheers

On Fri, Mar 20, 2015 at 8:54 PM, tangzilu  wrote:

> Hi All:
> I recently started to deploy Spark1.2 in my VisualBox Linux.
> But when I run the command "./spark-shell" in the path of
> "/opt/spark-1.2.1/bin", I got the result like this:
>
> [root@dhcp-10-35-14-100 bin]# ./spark-shell
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 15/03/20 13:56:06 INFO SecurityManager: Changing view acls to: root
> 15/03/20 13:56:06 INFO SecurityManager: Changing modify acls to: root
> 15/03/20 13:56:06 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(root); users
> with modify permissions: Set(root)
> 15/03/20 13:56:06 INFO HttpServer: Starting HTTP Server
> 15/03/20 13:56:06 INFO Utils: Successfully started service 'HTTP class
> server' on port 47691.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.2.1
>   /_/
>
> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
> Type in expressions to have them evaluated.
> Type :help for more information.
> java.net.UnknownHostException: dhcp-10-35-14-100: dhcp-10-35-14-100: Name
> or service not known
> at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
> at org.apache.spark.util.Utils$.findLocalIpAddress(Utils.scala:710)
> at
> org.apache.spark.util.Utils$.localIpAddress$lzycompute(Utils.scala:702)
> at org.apache.spark.util.Utils$.localIpAddress(Utils.scala:702)
> at org.apache.spark.HttpServer.uri(HttpServer.scala:158)
> at
> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:982)
> at $iwC$$iwC.(:9)
> at $iwC.(:18)
> at (:20)
> at .(:24)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
> at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
> at
> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123)
> at
> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
> at
> org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270)
> at
> org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
> at
> org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945)
> at
> org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147)
> at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60)
> at
> org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
> at
> org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.jav

About the env of Spark1.2

2015-03-20 Thread tangzilu
Hi All:
I recently started to deploy Spark1.2 in my VisualBox Linux.
But when I run the command "./spark-shell" in the path of 
"/opt/spark-1.2.1/bin", I got the result like this:

[root@dhcp-10-35-14-100 bin]# ./spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/20 13:56:06 INFO SecurityManager: Changing view acls to: root
15/03/20 13:56:06 INFO SecurityManager: Changing modify acls to: root
15/03/20 13:56:06 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root); users with 
modify permissions: Set(root)
15/03/20 13:56:06 INFO HttpServer: Starting HTTP Server
15/03/20 13:56:06 INFO Utils: Successfully started service 'HTTP class server' 
on port 47691.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
  /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
java.net.UnknownHostException: dhcp-10-35-14-100: dhcp-10-35-14-100: Name or 
service not known
at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
at org.apache.spark.util.Utils$.findLocalIpAddress(Utils.scala:710)
at 
org.apache.spark.util.Utils$.localIpAddress$lzycompute(Utils.scala:702)
at org.apache.spark.util.Utils$.localIpAddress(Utils.scala:702)
at org.apache.spark.HttpServer.uri(HttpServer.scala:158)
at 
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:982)
at $iwC$$iwC.(:9)
at $iwC.(:18)
at (:20)
at .(:24)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at 
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123)
at 
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270)
at 
org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945)
at 
org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60)
at 
org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
at 
org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.UnknownHostException: dhcp-10-35-14-100: Name or service 
not known
at java.net.Inet6Addr

Registring UDF from a different package fails

2015-03-20 Thread Ravindra
Hi All,

I have all my UDFs defined in the classes residing in a different package
than where I am instantiating my HiveContext.

I have a register function in my UDF class. I pass HiveContext to this
function. and in this function I call
"hiveContext.registerFunction("myudf", myudf _)"

All goes well but at the runtime when I execute query "val sqlresult =
hiveContext.sql(query)"
It doesn't work. sqlresult comes as null. There is no exception thrown by
spark and there is no proper logs indicating the error.

But all goes well if I bring my UDF class into the same package where I am
instantiating hiveContext.

I dig more into the spark code and found that (may be I am wrong here)

./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

has a class named as SimpleFunctionRegistry wherein lookupFunction is not
throwing error if it doesn't find a function.


Kindly help


Appreciate

Ravi.


Spark Streaming Not Reading Messages From Multiple Kafka Topics

2015-03-20 Thread EH
Hi all,

I'm building a Spark Streaming application that will continuously read
multiple kafka topics at the same time.  However, I found a weird issue that
it reads only hundreds of messages then it stopped reading any more.  If I
changed the three topic to only one topic, then it is fine and it will
continue to consume.  Below is the code I have.

val consumerThreadsPerInputDstream = 1
val topics = Map("raw_0" -> consumerThreadsPerInputDstream)
 "raw_1" -> consumerThreadsPerInputDstream,
 "raw_2" -> consumerThreadsPerInputDstream)

val msgs = KafkaUtils.createStream(ssc, "10.10.10.10:2181/hkafka",
"group01", topics).map(_._2)
...

How come it will no longer consume after hundreds of messages for three
topic reading?  How to resolve this issue?

Thank you for your help,
Eason



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Not-Reading-Messages-From-Multiple-Kafka-Topics-tp22170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: WebUI on yarn through ssh tunnel affected by AmIpfilter

2015-03-20 Thread Marcelo Vanzin
Instead of opening a tunnel to the Spark web ui port, could you open a
tunnel to the YARN RM web ui instead? That should allow you to
navigate to the Spark application's web ui through the RM proxy, and
hopefully that will work better.

On Fri, Feb 6, 2015 at 9:08 PM, yangqch  wrote:
> Hi folks,
>
> I am new to spark. I just get spark 1.2 to run on emr ami 3.3.1 (hadoop
> 2.4).
> I ssh to emr master node and submit the job or start the shell. Everything
> runs well except the webUI.
>
> In order to see the UI, I used ssh tunnel which forward my dev machine port
> to emr master node webUI port.
>
> When I open the webUI, at the very beginning of the application (during the
> spark launch time), the webUI is as nice as shown in many spark docs.
> However, once the YARN AmIpfilter started to work, the webUI becomes very
> ugly. No pictures can be displayed, only text can be shown (just like you
> view it in lynx). Meanwhile, in spark shell, it pops up "amfilter.AmIpFilter
> (AmIpFilter.java:doFilter(157)) - Could not find proxy-user cookie, so user
> will not be set”.
>
> Can anyone give me some help? Thank you!
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-tp21540.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: WebUI on yarn through ssh tunnel affected by AmIpfilter

2015-03-20 Thread benbongalon

I ran into a similar issue. What's happening is that when Spark is running
in YARN client mode, YARN automatically launches a  Web Application Proxy

  
to reduce hacking attempts. In doing so, it adds the AmIpFilter to the
proxy. You can see this is the example log snippet below:

15/03/20 21:33:14 INFO cluster.YarnClientSchedulerBackend: ApplicationMaster
registered as
Actor[akka.tcp://sparkyar...@ip-172-31-44-228.us-west-2.compute.internal:53028/user/YarnAM#-1897510590]
15/03/20 21:33:14 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter.
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS
-> 172.31.36.22, PROXY_URI_BASES ->
http://172.31.36.22:9046/proxy/application_1426881405719_0009),
/proxy/application_1426881405719_0009
15/03/20 21:33:14 INFO ui.JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
15/03/20 21:33:15 INFO yarn.Client: Application report for
application_1426881405719_0009 (state: RUNNING)
15/03/20 21:33:15 INFO yarn.Client: 
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: ip-172-31-44-228.us-west-2.compute.internal
 ApplicationMaster RPC port: 0
 queue: default
 start time: 1426887190001
 final status: UNDEFINED
 *tracking URL:
http://172.31.36.22:9046/proxy/application_1426881405719_0009/*

While I haven't found a way to disable it (the  Spark doc
   may help), you can view
the Web UI by forwarding the proxy's port (9046 by default). Then point your
browser the the tracking URL with the host IP replaced with localhost, eg:

http://localhost:9046/proxy/application_1426881405719_0009

Hope that helps.
Ben





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-tp21540p22169.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: IPyhon notebook command for spark need to be updated?

2015-03-20 Thread cong yue
Let me do it now. I appreciate the perfect easy understandable
documentation of spark!

The updated command will be like

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook"
./bin/pyspark

When IPython notebook server is launched, you can create a new "Python
2" notebook from "Files" tab. Inside the notebook, you can input the
'%pylab inline' command as part of your notebook before you start to
try spark from IPython notebook.


Cheers.
Cong

2015-03-20 16:14 GMT-07:00 Matei Zaharia :
> Feel free to send a pull request to fix the doc (or say which versions it's
> needed in).
>
> Matei
>
> On Mar 20, 2015, at 6:49 PM, Krishna Sankar  wrote:
>
> Yep the command-option is gone. No big deal, just add the '%pylab inline'
> command as part of your notebook.
> Cheers
> 
>
> On Fri, Mar 20, 2015 at 3:45 PM, cong yue  wrote:
>>
>> Hello :
>>
>> I tried ipython notebook with the following command in my enviroment.
>>
>> PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
>> --pylab inline" ./bin/pyspark
>>
>> But it shows " --pylab inline" support is removed from ipython newest
>> version.
>> the log is as :
>> ---
>> $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
>> --pylab inline" ./bin/pyspark
>> [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the
>> command line has been removed.
>> [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or
>> `%matplotlib inline` in the notebook itself.
>> --
>> I am using IPython 3.0.0. and only IPython works in my enviroment.
>> --
>> $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
>> --pylab inline" ./bin/pyspark
>> --
>>
>> Does somebody have the same issue as mine? How do you solve it?
>>
>> Thanks,
>> Cong
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-20 Thread Manoj Samel
Forgot to add - the cluster is idle otherwise so there should be no
resource issues. Also the configuration works when not using Dynamic
allocation.

On Fri, Mar 20, 2015 at 4:15 PM, Manoj Samel 
wrote:

> Hi,
>
> Running Spark 1.3 with secured Hadoop.
>
> Spark-shell with Yarn client mode runs without issue when not using
> Dynamic Allocation.
>
> When Dynamic allocation is turned on, the shell comes up but same SQL etc.
> causes it to loop.
>
> spark.dynamicAllocation.enabled=true
> spark.dynamicAllocation.initialExecutors=1
> spark.dynamicAllocation.maxExecutors=10
> # Set IdleTime low for testing
> spark.dynamicAllocation.executorIdleTimeout=60
> spark.shuffle.service.enabled=true
>
> Following is the start of the messages and then it keeps looping with
> "Requesting 0 new executors"
>
> 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_1_piece0
> 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from
> broadcast at DAGScheduler.scala:839
> 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
> from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100)
> 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1
> tasks
> 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new
> executor(s) because tasks are backlogged (new desired total will be 1)
> 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new
> executor(s) because tasks are backlogged (new desired total will be 1)
> 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient resources
> 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new
> executor(s) because tasks are backlogged (new desired total will be 1)
> 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new
> executor(s) because tasks are backlogged (new desired total will be 1)
> 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new
> executor(s) because tasks are backlogged (new desired total will be 1)
> 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new
> executor(s) because tasks are backlogged (new desired total will be 1)
> 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient resources
> 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new
> executor(s) because tasks are backlogged (new desired total will be 1)
> 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new
> executor(s) because tasks are backlogged (new desired total will be 1)
> 15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new
> executor(s) because tasks are backlogged (new desired total will be 1)
>


Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-20 Thread Manoj Samel
Hi,

Running Spark 1.3 with secured Hadoop.

Spark-shell with Yarn client mode runs without issue when not using Dynamic
Allocation.

When Dynamic allocation is turned on, the shell comes up but same SQL etc.
causes it to loop.

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.initialExecutors=1
spark.dynamicAllocation.maxExecutors=10
# Set IdleTime low for testing
spark.dynamicAllocation.executorIdleTimeout=60
spark.shuffle.service.enabled=true

Following is the start of the messages and then it keeps looping with
"Requesting 0 new executors"

15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block
broadcast_1_piece0
15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from
broadcast at DAGScheduler.scala:839
15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100)
15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1
tasks
15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient resources
15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient resources
15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)


Re: IPyhon notebook command for spark need to be updated?

2015-03-20 Thread Matei Zaharia
Feel free to send a pull request to fix the doc (or say which versions it's 
needed in).

Matei

> On Mar 20, 2015, at 6:49 PM, Krishna Sankar  wrote:
> 
> Yep the command-option is gone. No big deal, just add the '%pylab inline' 
> command as part of your notebook.
> Cheers
> 
> 
> On Fri, Mar 20, 2015 at 3:45 PM, cong yue  > wrote:
> Hello :
> 
> I tried ipython notebook with the following command in my enviroment.
> 
> PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
> --pylab inline" ./bin/pyspark
> 
> But it shows " --pylab inline" support is removed from ipython newest version.
> the log is as :
> ---
> $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
> --pylab inline" ./bin/pyspark
> [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the
> command line has been removed.
> [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or
> `%matplotlib inline` in the notebook itself.
> --
> I am using IPython 3.0.0. and only IPython works in my enviroment.
> --
> $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
> --pylab inline" ./bin/pyspark
> --
> 
> Does somebody have the same issue as mine? How do you solve it?
> 
> Thanks,
> Cong
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: MLlib Spam example gets stuck in Stage X

2015-03-20 Thread Su She
Hello Xiangrui,

I use spark 1.2.0 on cdh 5.3. Thanks!

-Su


On Fri, Mar 20, 2015 at 2:27 PM Xiangrui Meng  wrote:

> Su, which Spark version did you use? -Xiangrui
>
> On Thu, Mar 19, 2015 at 3:49 AM, Akhil Das 
> wrote:
> > To get these metrics out, you need to open the driver ui running on port
> > 4040. And in there you will see Stages information and for each stage you
> > can see how much time it is spending on GC etc. In your case, the
> > parallelism seems 4, the more # of parallelism the more # of tasks you
> will
> > see.
> >
> > Thanks
> > Best Regards
> >
> > On Thu, Mar 19, 2015 at 1:15 PM, Su She  wrote:
> >>
> >> Hi Akhil,
> >>
> >> 1) How could I see how much time it is spending on stage 1? Or what if,
> >> like above, it doesn't get past stage 1?
> >>
> >> 2) How could I check if its a GC time? and where would I increase the
> >> parallelism for the model? I have a Spark Master and 2 Workers running
> on
> >> CDH 5.3...what would the default spark-shell level of parallelism be...I
> >> thought it would be 3?
> >>
> >> Thank you for the help!
> >>
> >> -Su
> >>
> >>
> >> On Thu, Mar 19, 2015 at 12:32 AM, Akhil Das  >
> >> wrote:
> >>>
> >>> Can you see where exactly it is spending time? Like you said it goes to
> >>> Stage 2, then you will be able to see how much time it spend on Stage
> 1. See
> >>> if its a GC time, then try increasing the level of parallelism or
> >>> repartition it like sc.getDefaultParallelism*3.
> >>>
> >>> Thanks
> >>> Best Regards
> >>>
> >>> On Thu, Mar 19, 2015 at 12:15 PM, Su She 
> wrote:
> 
>  Hello Everyone,
> 
>  I am trying to run this MLlib example from Learning Spark:
> 
>  https://github.com/databricks/learning-spark/blob/master/src
> /main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48
> 
>  Things I'm doing differently:
> 
>  1) Using spark shell instead of an application
> 
>  2) instead of their spam.txt and normal.txt I have text files with
> 3700
>  and 2700 words...nothing huge at all and just plain text
> 
>  3) I've used numFeatures = 100, 1000 and 10,000
> 
>  Error: I keep getting stuck when I try to run the model:
> 
>  val model = new LogisticRegressionWithSGD().run(trainingData)
> 
>  It will freeze on something like this:
> 
>  [Stage 1:==>
> (1 +
>  0) / 4]
> 
>  Sometimes its Stage 1, 2 or 3.
> 
>  I am not sure what I am doing wrong...any help is much appreciated,
>  thank you!
> 
>  -Su
> 
> 
> >>>
> >>
> >
>


Re: IPyhon notebook command for spark need to be updated?

2015-03-20 Thread Krishna Sankar
Yep the command-option is gone. No big deal, just add the '%pylab
inline' command
as part of your notebook.
Cheers


On Fri, Mar 20, 2015 at 3:45 PM, cong yue  wrote:

> Hello :
>
> I tried ipython notebook with the following command in my enviroment.
>
> PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
> --pylab inline" ./bin/pyspark
>
> But it shows " --pylab inline" support is removed from ipython newest
> version.
> the log is as :
> ---
> $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
> --pylab inline" ./bin/pyspark
> [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the
> command line has been removed.
> [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or
> `%matplotlib inline` in the notebook itself.
> --
> I am using IPython 3.0.0. and only IPython works in my enviroment.
> --
> $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
> --pylab inline" ./bin/pyspark
> --
>
> Does somebody have the same issue as mine? How do you solve it?
>
> Thanks,
> Cong
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


IPyhon notebook command for spark need to be updated?

2015-03-20 Thread cong yue
Hello :

I tried ipython notebook with the following command in my enviroment.

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
--pylab inline" ./bin/pyspark

But it shows " --pylab inline" support is removed from ipython newest version.
the log is as :
---
$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
--pylab inline" ./bin/pyspark
[E 15:29:43.076 NotebookApp] Support for specifying --pylab on the
command line has been removed.
[E 15:29:43.077 NotebookApp] Please use `%pylab inline` or
`%matplotlib inline` in the notebook itself.
--
I am using IPython 3.0.0. and only IPython works in my enviroment.
--
$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
--pylab inline" ./bin/pyspark
--

Does somebody have the same issue as mine? How do you solve it?

Thanks,
Cong

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to check that a dataset is sorted after it has been written out?

2015-03-20 Thread Michael Albert
Greetings!
I sorted a dataset in Spark and then wrote it out in avro/parquet.
Then I wanted to check that it was sorted.
It looks like each partition has been sorted, but when reading in, the first 
"partition" (i.e., as seen in the partition index of mapPartitionsWithIndex) is 
not the same  as implied by the names of the parquet files (even when the 
number of partitions is the same in therdd which was read as on disk).
If I "take()" a few hundred values, they are sorted, but they are *not* the 
same as if I explicitly open "part-r-0.parquet" and take values from that.
It seems that when opening the rdd, the "partitions" of the rdd are not in the 
sameorder as implied by the data on disk (i.e., "part-r-0.parquet, 
part-r-1.parquet, etc).
So, how might one read the data so that one maintains the sort order?
And while on the subject, after the "terasort", how did they check that the 
data was actually sorted correctly? (or did they :-) ? ).
Is there any way to read the data back in so as to preserve the sort, or do I 
need to "zipWithIndex" before writing it out, and write the index at that time? 
(I haven't tried the latter yet).
Thanks!-Mike


Re: Spark per app logging

2015-03-20 Thread Ted Yu
Are these jobs the same jobs, just run by different users or, different
jobs ?
If the latter, can each application use its own log4j.properties ?

Cheers

On Fri, Mar 20, 2015 at 1:43 PM, Udit Mehta  wrote:

> Hi,
>
> We have spark setup such that there are various users running multiple
> jobs at the same time. Currently all the logs go to 1 file specified in the
> log4j.properties.
> Is it possible to configure log4j in spark for per app/user logging
> instead of sending all logs to 1 file mentioned in the log4j.properties?
>
> Thanks
> Udit
>


Re: Create a Spark cluster with cloudera CDH 5.2 support

2015-03-20 Thread Sean Owen
I think you missed -Phadoop-2.4

On Fri, Mar 20, 2015 at 5:27 PM, morfious902002  wrote:
> Hi,
> I am trying to create a Spark cluster using the spark-ec2 script which will
> support 2.5.0-cdh5.3.2 for HDFS as well as Hive. I created a cluster by
> adding --hadoop-major-version=2.5.0 which solved some of the errors I was
> getting. But now when I run select query on hive I get the following error:-
>
> Caused by: com.google.protobuf.InvalidProtocolBufferException: Message
> missing required fields: callId, status
>
> Has anybody tried doing this? Is there a solution?
> I used this command to create my cluster:-
> ./spark-ec2 --key-pair=awskey --identity-file=awskey.pem
> --instance-type=m3.xlarge --spot-price=0.08 --region=us-west-2
> --zone=us-west-2c --hadoop-major-version=2.5.0-cdh5.3.2
> --spark-version=1.3.0 --slaves=1 launch spark-cluster
>
> Thank You for the help.
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Create-a-Spark-cluster-with-cloudera-CDH-5-2-support-tp22168.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Create a Spark cluster with cloudera CDH 5.2 support

2015-03-20 Thread morfious902002
Hi,
I am trying to create a Spark cluster using the spark-ec2 script which will
support 2.5.0-cdh5.3.2 for HDFS as well as Hive. I created a cluster by
adding --hadoop-major-version=2.5.0 which solved some of the errors I was
getting. But now when I run select query on hive I get the following error:-

Caused by: com.google.protobuf.InvalidProtocolBufferException: Message
missing required fields: callId, status

Has anybody tried doing this? Is there a solution?
I used this command to create my cluster:-
./spark-ec2 --key-pair=awskey --identity-file=awskey.pem
--instance-type=m3.xlarge --spot-price=0.08 --region=us-west-2
--zone=us-west-2c --hadoop-major-version=2.5.0-cdh5.3.2
--spark-version=1.3.0 --slaves=1 launch spark-cluster

Thank You for the help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Create-a-Spark-cluster-with-cloudera-CDH-5-2-support-tp22168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib Spam example gets stuck in Stage X

2015-03-20 Thread Xiangrui Meng
Su, which Spark version did you use? -Xiangrui

On Thu, Mar 19, 2015 at 3:49 AM, Akhil Das  wrote:
> To get these metrics out, you need to open the driver ui running on port
> 4040. And in there you will see Stages information and for each stage you
> can see how much time it is spending on GC etc. In your case, the
> parallelism seems 4, the more # of parallelism the more # of tasks you will
> see.
>
> Thanks
> Best Regards
>
> On Thu, Mar 19, 2015 at 1:15 PM, Su She  wrote:
>>
>> Hi Akhil,
>>
>> 1) How could I see how much time it is spending on stage 1? Or what if,
>> like above, it doesn't get past stage 1?
>>
>> 2) How could I check if its a GC time? and where would I increase the
>> parallelism for the model? I have a Spark Master and 2 Workers running on
>> CDH 5.3...what would the default spark-shell level of parallelism be...I
>> thought it would be 3?
>>
>> Thank you for the help!
>>
>> -Su
>>
>>
>> On Thu, Mar 19, 2015 at 12:32 AM, Akhil Das 
>> wrote:
>>>
>>> Can you see where exactly it is spending time? Like you said it goes to
>>> Stage 2, then you will be able to see how much time it spend on Stage 1. See
>>> if its a GC time, then try increasing the level of parallelism or
>>> repartition it like sc.getDefaultParallelism*3.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Thu, Mar 19, 2015 at 12:15 PM, Su She  wrote:

 Hello Everyone,

 I am trying to run this MLlib example from Learning Spark:

 https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48

 Things I'm doing differently:

 1) Using spark shell instead of an application

 2) instead of their spam.txt and normal.txt I have text files with 3700
 and 2700 words...nothing huge at all and just plain text

 3) I've used numFeatures = 100, 1000 and 10,000

 Error: I keep getting stuck when I try to run the model:

 val model = new LogisticRegressionWithSGD().run(trainingData)

 It will freeze on something like this:

 [Stage 1:==>(1 +
 0) / 4]

 Sometimes its Stage 1, 2 or 3.

 I am not sure what I am doing wrong...any help is much appreciated,
 thank you!

 -Su


>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark per app logging

2015-03-20 Thread Udit Mehta
Hi,

We have spark setup such that there are various users running multiple jobs
at the same time. Currently all the logs go to 1 file specified in the
log4j.properties.
Is it possible to configure log4j in spark for per app/user logging instead
of sending all logs to 1 file mentioned in the log4j.properties?

Thanks
Udit


Re: Spark SQL UDT Kryo serialization, Unable to find class

2015-03-20 Thread Michael Armbrust
You probably don't cause a shuffle (which requires serialization) unless
there is a join or group by.

It's possible that we are need to pass the spark class loader to kryo when
creating a new instance (you can get it from Utils I believe).  We never
run Otto this problem since this API is not public yet.  I'd start by
looking in SparkSqlSerializer.
On Mar 18, 2015 1:13 AM, "Zia Ur Rehman Kayani" 
wrote:

> Thanks for your reply. I've tried this as well, by passing the JAR file
> path to *spark.executor.extraClassPath *but it doesn't help me out,
> actually I've figured it out that custom UDT works fine if I use only one
> RDD (table). the issue arises when we join two or more RDDs. According to
> this , its is a bug
> when we use custom ROW and use JOIN. But the solution proposed isn't
> working in my case.
>
> Any clue ?
>
>
> On Tue, Mar 17, 2015 at 10:19 PM, Michael Armbrust  > wrote:
>
>> I'll caution you that this is not a stable public API.
>>
>> That said, it seems that the issue is that you have not copied the jar
>> file containing your class to all of the executors.  You should not need to
>> do any special configuration of serialization (you can't for SQL, as we
>> hard code it for performance, since we generally know all the types that
>> are going to be shipped)
>>
>> On Tue, Mar 17, 2015 at 5:17 AM, zia_kayani 
>> wrote:
>>
>>> Hi,
>>> I want to introduce custom type for SchemaRDD, I'm following  this
>>> <
>>> https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
>>> >
>>> example. But I'm having Kryo Serialization issues, here is stack trace:
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in
>>> stage 6.0 failed 1 times, most recent failure:
>>> Lost task 0.0 in stage 6.0 (TID 22, localhost):
>>> *com.esotericsoftware.kryo.KryoException: Unable to find class:
>>> com.gis.io.GeometryWritable*
>>> Serialization trace:
>>> value (org.apache.spark.sql.catalyst.expressions.MutableAny)
>>> values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
>>>at
>>>
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>at
>>>
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
>>>at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)
>>>at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>at
>>>
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>at
>>>
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>at
>>>
>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
>>>at
>>>
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>at
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>at
>>>
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>at
>>>
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>at
>>>
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>at
>>>
>>> org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80)
>>>at
>>>
>>> org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46)
>>>at
>>>
>>> org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45)
>>>at
>>>
>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartiti

RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Shuai Zheng
Below is the output:

 

core file size  (blocks, -c) 0

data seg size   (kbytes, -d) unlimited

scheduling priority (-e) 0

file size   (blocks, -f) unlimited

pending signals (-i) 1967947

max locked memory   (kbytes, -l) 64

max memory size (kbytes, -m) unlimited

open files  (-n) 2024

pipe size(512 bytes, -p) 8

POSIX message queues (bytes, -q) 819200

real-time priority  (-r) 0

stack size  (kbytes, -s) 8192

cpu time   (seconds, -t) unlimited

max user processes  (-u) 1967947

virtual memory  (kbytes, -v) unlimited

file locks  (-x) unlimited

 

I have set the max open file to 2024 by ulimit -n 2024, but same issue

I am not sure whether it is a reasonable setting.

 

Actually I am doing a loop, each time try to sort only 3GB data, it runs
very quick in first loop, and slow down in second loop. At each time loop I
start and destroy the context (because I want to clean up the temp file
create under tmp folder, which take a lot of space). Just default setting.

 

My logic:

 

For loop:

Val sc = new sc

Sql = sc.loadParquet

Sortbykey

Sc.stop

End

 

And I run on the EC2 c3*8xlarge, Amazon Linux AMI 2014.09.2 (HVM).

 

From: java8964 [mailto:java8...@hotmail.com] 
Sent: Friday, March 20, 2015 3:54 PM
To: user@spark.apache.org
Subject: RE: com.esotericsoftware.kryo.KryoException: java.io.IOException:
File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

 

Do you think the ulimit for the user running Spark on your nodes?

 

Can you run "ulimit -a" under the user who is running spark on the executor
node? Does the result make sense for the data you are trying to process?

 

Yong

 

  _  

From: szheng.c...@gmail.com
To: user@spark.apache.org
Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File
too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Date: Fri, 20 Mar 2015 15:28:26 -0400

Hi All,

 

I try to run a simple sort by on 1.2.1. And it always give me below two
errors:

 

1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6
b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d
d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782
6 (Too many open files)

 

And then I switch to:

conf.set("spark.shuffle.consolidateFiles", "true")

.set("spark.shuffle.manager", "SORT")

 

Then I get the error:

 

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure:
Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal):
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large

at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)

 

I roughly know the first issue is because Spark shuffle creates too many
local temp files (and I don't know the solution, because looks like my
solution also cause other issues), but I am not sure what means is the
second error. 

 

Anyone knows the solution for both cases?

 

Regards,

 

Shuai



Re: saveAsTable broken in v1.3 DataFrames?

2015-03-20 Thread Christian Perez
Any other users interested in a feature
DataFrame.saveAsExternalTable() for making _useful_ external tables in
Hive, or am I the only one? Bueller? If I start a PR for this, will it
be taken seriously?

On Thu, Mar 19, 2015 at 9:34 AM, Christian Perez  wrote:
> Hi Yin,
>
> Thanks for the clarification. My first reaction is that if this is the
> intended behavior, it is a wasted opportunity. Why create a managed
> table in Hive that cannot be read from inside Hive? I think I
> understand now that you are essentially piggybacking on Hive's
> metastore to persist table info between/across sessions, but I imagine
> others might expect more (as I have.)
>
> We find ourselves wanting to do work in Spark and persist the results
> where other users (e.g. analysts using Tableau connected to
> Hive/Impala) can explore it. I imagine this is very common. I can, of
> course, save it as parquet and create an external table in hive (which
> I will do now), but saveAsTable seems much less useful to me now.
>
> Any other opinions?
>
> Cheers,
>
> C
>
> On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai  wrote:
>> I meant table properties and serde properties are used to store metadata of
>> a Spark SQL data source table. We do not set other fields like SerDe lib.
>> For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table
>> should not show unrelated stuff like Serde lib and InputFormat. I have
>> created https://issues.apache.org/jira/browse/SPARK-6413 to track the
>> improvement on the output of DESCRIBE statement.
>>
>> On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai  wrote:
>>>
>>> Hi Christian,
>>>
>>> Your table is stored correctly in Parquet format.
>>>
>>> For saveAsTable, the table created is not a Hive table, but a Spark SQL
>>> data source table
>>> (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources).
>>> We are only using Hive's metastore to store the metadata (to be specific,
>>> only table properties and serde properties). When you look at table
>>> property, there will be a field called "spark.sql.sources.provider" and the
>>> value will be "org.apache.spark.sql.parquet.DefaultSource". You can also
>>> look at your files in the file system. They are stored by Parquet.
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez 
>>> wrote:

 Hi all,

 DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
 CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
 schema _and_ storage format in the Hive metastore, so that the table
 cannot be read from inside Hive. Spark itself can read the table, but
 Hive throws a Serialization error because it doesn't know it is
 Parquet.

 val df = sc.parallelize( Array((1,2), (3,4)) ).toDF("education",
 "income")
 df.saveAsTable("spark_test_foo")

 Expected:

 COLUMNS(
   education BIGINT,
   income BIGINT
 )

 SerDe Library:
 org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
 InputFormat:
 org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

 Actual:

 COLUMNS(
   col array COMMENT "from deserializer"
 )

 SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
 InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat

 ---

 Manually changing schema and storage restores access in Hive and
 doesn't affect Spark. Note also that Hive's table property
 "spark.sql.sources.schema" is correct. At first glance, it looks like
 the schema data is serialized when sent to Hive but not deserialized
 properly on receive.

 I'm tracing execution through source code... but before I get any
 deeper, can anyone reproduce this behavior?

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

>>>
>>
>
>
>
> --
> Christian Perez
> Silicon Valley Data Science
> Data Analyst
> christ...@svds.com
> @cp_phd



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread java8964
Do you think the ulimit for the user running Spark on your nodes?
Can you run "ulimit -a" under the user who is running spark on the executor 
node? Does the result make sense for the data you are trying to process?
Yong
From: szheng.c...@gmail.com
To: user@spark.apache.org
Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too 
large vs FileNotFoundException (Too many open files) on spark 1.2.1
Date: Fri, 20 Mar 2015 15:28:26 -0400

Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two 
errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 
(TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: 
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826
 (Too many open files) And then I switch 
to:conf.set("spark.shuffle.consolidateFiles", 
"true").set("spark.shuffle.manager", "SORT") Then I get the error: Exception in 
thread "main" org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 
in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): 
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large
at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly 
know the first issue is because Spark shuffle creates too many local temp files 
(and I don’t know the solution, because looks like my solution also cause other 
issues), but I am not sure what means is the second error.  Anyone knows the 
solution for both cases? Regards, Shuai 
  

EC2 cluster created by spark using old HDFS 1.0

2015-03-20 Thread morfious902002
Hi,
I created a cluster using spark-ec2 script. But it installs HDFS version
1.0. I would like to use this cluster to connect to HIVE installed on a
cloudera CDH 5.3 cluster. But I am getting the following error:- 

org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot
communicate with client vers
 
ion 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy10.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
at
org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:8

 
9)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
at
org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.

 
java:40)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:83)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:23)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at $iwC$$iwC$$iwC$$iwC.(:34)
at $iwC$$iwC$$iwC.(:36)
at $iwC$$iwC.(:38)
at $iwC.(:40)
at (:42)
at .(:46)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl

Matching Spark application metrics data to App Id

2015-03-20 Thread Judy Nash
Hi,

I want to get telemetry metrics on spark apps activities, such as run time and 
jvm activities.

Using Spark Metrics I am able to get the following sample data point on the an 
app:
type=GAUGE, name=application.SparkSQL::headnode0.1426626495312.runtime_ms, 
value=414873

How can I match this datapoint to the AppId? (i.e. app-20150317210815-0001)
Spark App name is not an unique identifier.
1426626495312 appear to be unique, but I am unable to see how this is related 
to the AppId.

Thanks,
Judy


Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Charles Feduke
Assuming you are on Linux, what is your /etc/security/limits.conf set for
nofile/soft (number of open file handles)?

On Fri, Mar 20, 2015 at 3:29 PM Shuai Zheng  wrote:

> Hi All,
>
>
>
> I try to run a simple sort by on 1.2.1. And it always give me below two
> errors:
>
>
>
> 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
> 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
> /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826
> (Too many open files)
>
>
>
> And then I switch to:
>
> conf.set("spark.shuffle.consolidateFiles", "true")
>
> .set("spark.shuffle.manager", "SORT")
>
>
>
> Then I get the error:
>
>
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent
> failure: Lost task 5.3 in stage 1.0 (TID 36,
> ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException:
> java.io.IOException: File too large
>
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)
>
>
>
> I roughly know the first issue is because Spark shuffle creates too many
> local temp files (and I don’t know the solution, because looks like my
> solution also cause other issues), but I am not sure what means is the
> second error.
>
>
>
> Anyone knows the solution for both cases?
>
>
>
> Regards,
>
>
>
> Shuai
>


Re: Mailing list schizophrenia?

2015-03-20 Thread Jim Kleckner
Yes, it did get delivered to the apache list shown here:

http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E

But the web site for spark community directs people to nabble for viewing
messages and it doesn't show up there.

Community page: http://spark.apache.org/community.html

Link in that page to the archive:
http://apache-spark-user-list.1001560.n3.nabble.com/

The reliable archive:
http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser



On Fri, Mar 20, 2015 at 12:34 PM, Ted Yu  wrote:

> Jim:
> I can find the example message here:
> http://search-hadoop.com/m/JW1q5zP54J1
>
> On Fri, Mar 20, 2015 at 12:29 PM, Jim Kleckner 
> wrote:
>
>> I notice that some people send messages directly to user@spark.apache.org
>> and some via nabble, either using email or the web client.
>>
>> There are two index sites, one directly at apache.org and one at
>> nabble.  But messages sent directly to user@spark.apache.org only show
>> up in the apache list.  Further, it appears that you can subscribe either
>> directly to user@spark.apache.org, in which you see all emails, or via
>> nabble and you see a subset.
>>
>> Is this correct and is it intentional?
>>
>> Apache site:
>>   http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser
>>
>> Nabble site:
>>   http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> An example of a message that only shows up in Apache:
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E
>>
>>
>> This message was sent both to Nabble and user@spark.apache.org to see
>> how that behaves.
>>
>> Jim
>>
>>
>


Re: Mailing list schizophrenia?

2015-03-20 Thread Ted Yu
Jim:
I can find the example message here:
http://search-hadoop.com/m/JW1q5zP54J1

On Fri, Mar 20, 2015 at 12:29 PM, Jim Kleckner  wrote:

> I notice that some people send messages directly to user@spark.apache.org
> and some via nabble, either using email or the web client.
>
> There are two index sites, one directly at apache.org and one at nabble.
> But messages sent directly to user@spark.apache.org only show up in the
> apache list.  Further, it appears that you can subscribe either directly to
> user@spark.apache.org, in which you see all emails, or via nabble and you
> see a subset.
>
> Is this correct and is it intentional?
>
> Apache site:
>   http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser
>
> Nabble site:
>   http://apache-spark-user-list.1001560.n3.nabble.com/
>
> An example of a message that only shows up in Apache:
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E
>
>
> This message was sent both to Nabble and user@spark.apache.org to see how
> that behaves.
>
> Jim
>
>


Mailing list schizophrenia?

2015-03-20 Thread Jim Kleckner
I notice that some people send messages directly to user@spark.apache.org
and some via nabble, either using email or the web client.

There are two index sites, one directly at apache.org and one at nabble.
But messages sent directly to user@spark.apache.org only show up in the
apache list.  Further, it appears that you can subscribe either directly to
user@spark.apache.org, in which you see all emails, or via nabble and you
see a subset.

Is this correct and is it intentional?

Apache site:
  http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser

Nabble site:
  http://apache-spark-user-list.1001560.n3.nabble.com/

An example of a message that only shows up in Apache:

http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E


This message was sent both to Nabble and user@spark.apache.org to see how
that behaves.

Jim


com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Shuai Zheng
Hi All,

 

I try to run a simple sort by on 1.2.1. And it always give me below two
errors:

 

1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6
b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d
d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782
6 (Too many open files)

 

And then I switch to:

conf.set("spark.shuffle.consolidateFiles", "true")

.set("spark.shuffle.manager", "SORT")

 

Then I get the error:

 

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure:
Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal):
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large

at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)

 

I roughly know the first issue is because Spark shuffle creates too many
local temp files (and I don't know the solution, because looks like my
solution also cause other issues), but I am not sure what means is the
second error. 

 

Anyone knows the solution for both cases?

 

Regards,

 

Shuai



Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yin Huai
spark.sql.shuffle.partitions only control the number of tasks in the second
stage (the number of reducers). For your case, I'd say that the number of
tasks in the first state (number of mappers) will be the number of files
you have.

Actually, have you changed "spark.executor.memory" (it controls the memory
for an executor of your application)? I did not see it in your original
email. The difference between worker memory and executor memory can be
found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html),

SPARK_WORKER_MEMORY
Total amount of memory to allow Spark applications to use on the machine,
e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
application's individual memory is configured using its
spark.executor.memory property.


On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas 
wrote:

> Actually I realized that the correct way is:
>
> sqlContext.sql("set spark.sql.shuffle.partitions=1000")
>
> but I am still experiencing the same behavior/error.
>
> On 20 March 2015 at 16:04, Yiannis Gkoufas  wrote:
>
>> Hi Yin,
>>
>> the way I set the configuration is:
>>
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>>
>> it is the correct way right?
>> In the mapPartitions task (the first task which is launched), I get again
>> the same number of tasks and again the same error. :(
>>
>> Thanks a lot!
>>
>> On 19 March 2015 at 17:40, Yiannis Gkoufas  wrote:
>>
>>> Hi Yin,
>>>
>>> thanks a lot for that! Will give it a shot and let you know.
>>>
>>> On 19 March 2015 at 16:30, Yin Huai  wrote:
>>>
 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
 wrote:

> Hi Yin,
>
> Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
> The number of tasks launched is equal to the number of parquet files. Do
> you have any idea on how to deal with this situation?
>
> Thanks a lot
> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>
>> Seems there are too many distinct groups processed in a task, which
>> trigger the problem.
>>
>> How many files do your dataset have and how large is a file? Seems
>> your query will be executed with two stages, table scan and map-side
>> aggregation in the first stage and the final round of reduce-side
>> aggregation in the second stage. Can you take a look at the numbers of
>> tasks launched in these two stages?
>>
>> Thanks,
>>
>> Yin
>>
>> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
>> johngou...@gmail.com> wrote:
>>
>>> Hi there, I set the executor memory to 8g but it didn't help
>>>
>>> On 18 March 2015 at 13:59, Cheng Lian  wrote:
>>>
 You should probably increase executor memory by setting
 "spark.executor.memory".

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

> Hi there,
>
> I was trying the new DataFrame API with some basic operations on a
> parquet dataset.
> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in
> a standalone cluster mode.
> The code is the following:
>
> val people = sqlContext.parquetFile("/data.parquet");
> val res = people.groupBy("name","date").
> agg(sum("power"),sum("supply")).take(10);
> System.out.println(res);
>
> The dataset consists of 16 billion entries.
> The error I get is java.lang.OutOfMemoryError: GC overhead limit
> exceeded
>
> My configuration is:
>
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.driver.memory6g
> spark.executor.extraJavaOptions -XX:+UseCompressedOops
> spark.shuffle.managersort
>
> Any idea how can I workaround this?
>
> Thanks a lot
>


>>>
>>

>>>
>>
>


Re: Spark-submit and multiple files

2015-03-20 Thread Davies Liu
You MUST put --py-files BEFORE main.py, as mentioned in another threads.

On Fri, Mar 20, 2015 at 1:47 AM, Guillaume Charhon
 wrote:
> Hi Davies,
>
> I am already using --py-files. The system does use the other file. The error
> I am getting is not trivial. Please check the error log.
>
>
>
> On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu  wrote:
>>
>> You could submit additional Python source via --py-files , for example:
>>
>> $ bin/spark-submit --py-files work.py main.py
>>
>> On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez 
>> wrote:
>> > Hello guys,
>> >
>> > I am having a hard time to understand how spark-submit behave with
>> > multiple
>> > files. I have created two code snippets. Each code snippet is composed
>> > of a
>> > main.py and work.py. The code works if I paste work.py then main.py in a
>> > pyspark shell. However both snippets do not work when using spark submit
>> > and
>> > generate different errors.
>> >
>> > Function add_1 definition outside
>> > http://www.codeshare.io/4ao8B
>> > https://justpaste.it/jzvj
>> >
>> > Embedded add_1 function definition
>> > http://www.codeshare.io/OQJxq
>> > https://justpaste.it/jzvn
>> >
>> > I am trying a way to make it work.
>> >
>> > Thank you for your support.
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2. loses often all executors

2015-03-20 Thread Davies Liu
Maybe this is related to a bug in 1.2 [1], it's fixed in 1.2.2 (not
released), could checkout the 1.2 branch and verify that?

[1] https://issues.apache.org/jira/browse/SPARK-5788

On Fri, Mar 20, 2015 at 3:21 AM, mrm  wrote:
> Hi,
>
> I recently changed from Spark 1.1. to Spark 1.2., and I noticed that it
> loses all executors whenever I have any Python code bug (like looking up a
> key in a dictionary that does not exist). In earlier versions, it would
> raise an exception but it would not lose all executors.
>
> Anybody with a similar problem?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Why I didn't see the benefits of using KryoSerializer

2015-03-20 Thread java8964
Hi, Imran:
Thanks for your information.
I found a benchmark online about serialization which compares Java vs Kryo vs 
gridgain at here: 
http://gridgain.blogspot.com/2012/12/java-serialization-good-fast-and-faster.html
>From my test result, in the above benchmark case for the SimpleObject, Kryo is 
>slightly faster than Java serialization, but only use half of the space vs 
>Java serialization.
So now I understand more about what kind of benefits I should expect from using 
KryoSerializer.
But I have some questions related to Spark SQL. If I use Spark SQL, should I 
expect less memory usage? I mean in Spark SQL, everything is controlled by 
Spark. If I pass in 
"-Dspark.serializer=org.apache.spark.serializer.KryoSerializer" and save the 
table in Cache, so it will use much less memory? Do I also need to specify 
"StorageLevel.MEMORY_ONLY_SER" if I want to use less memory? Where I can set 
that in Spark SQL?
Thanks
Yong

From: iras...@cloudera.com
Date: Fri, 20 Mar 2015 11:54:38 -0500
Subject: Re: Why I didn't see the benefits of using KryoSerializer
To: java8...@hotmail.com
CC: user@spark.apache.org

Hi Yong,
yes I think your analysis is correct.  I'd imagine almost all serializers out 
there will just convert a string to its utf-8 representation.  You might be 
interested in adding compression on top of a serializer, which would probably 
bring the string size down in almost all cases, but then you also need to take 
the time for compression.  Kryo is generally more efficient than the java 
serializer on complicated object types.
I guess I'm still a little surprised that kryo is slower than java 
serialization for you.  You might try setting "spark.kryo.referenceTracking" to 
false if you are just serializing objects with no circular references.  I think 
that will improve the performance a little, though I dunno how much.
It might be worth running your experiments again with slightly more complicated 
objects and see what you observe.
Imran

On Thu, Mar 19, 2015 at 12:57 PM, java8964  wrote:



I read the Spark code a little bit, trying to understand my own question.
It looks like the different is really between 
org.apache.spark.serializer.JavaSerializer and 
org.apache.spark.serializer.KryoSerializer, both having the method named 
writeObject.
In my test case, for each line of my text file, it is about 140 bytes of 
String. When either JavaSerializer.writeObject(140 bytes of String) or 
KryoSerializer.writeObject(140 bytes of String), I didn't see difference in the 
underline OutputStream space usage.
Does this mean that KryoSerializer really doesn't give us any benefit for 
String type? I understand that for primitives types, it shouldn't have any 
benefits, but how about String type?
When we talk about lower the memory using KryoSerializer in spark, under what 
case it can bring significant benefits? It is my first experience with the 
KryoSerializer, so maybe I am total wrong about its usage.
Thanks
Yong 
From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Why I didn't see the benefits of using KryoSerializer
Date: Tue, 17 Mar 2015 12:01:35 -0400




Hi, I am new to Spark. I tried to understand the memory benefits of using 
KryoSerializer.
I have this one box standalone test environment, which is 24 cores with 24G 
memory. I installed Hadoop 2.2 plus Spark 1.2.0.
I put one text file in the hdfs about 1.2G.  Here is the settings in the 
spark-env.sh
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4"export 
SPARK_WORKER_MEMORY=32gexport SPARK_DRIVER_MEMORY=2gexport 
SPARK_EXECUTOR_MEMORY=4g
First test case:val 
log=sc.textFile("hdfs://namenode:9000/test_1g/")log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)log.count()log.count()
The data is about 3M rows. For the first test case, from the storage in the web 
UI, I can see "Size in Memory" is 1787M, and "Fraction Cached" is 70% with 7 
cached partitions.This matched with what I thought, and first count finished 
about 17s, and 2nd count finished about 6s.
2nd test case after restart the spark-shell:val 
log=sc.textFile("hdfs://namenode:9000/test_1g/")log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count()
Now from the web UI, I can see "Size in Memory" is 1231M, and "Fraction Cached" 
is 100% with 10 cached partitions. It looks like caching the default "java 
serialized format" reduce the memory usage, but coming with a cost that first 
count finished around 39s and 2nd count finished around 9s. So the job runs 
slower, with less memory usage.
So far I can understand all what happened and the tradeoff.
Now the problem comes with when I tried to test with KryoSerializer
SPARK_JAVA_OPTS="-Dspark.serializer=org.apache.spark.serializer.KryoSerializer" 
/opt/spark/bin/spark-shellval 
log=sc.textFile("hdfs://namenode:9000/test_1g/")log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count()
First, I saw that the new serializer setting passed in, as proven in the Spark

Re: ShuffleBlockFetcherIterator: Failed to get block(s)

2015-03-20 Thread Imran Rashid
I think you should see some other errors before that, from
NettyBlockTransferService, with a msg like "Exception while beginning
fetchBlocks".  There might be a bit more information there.  there are an
assortment of possible causes, but first lets just make sure you have all
the details from the original cause.

On Fri, Mar 20, 2015 at 8:49 AM, Eric Friedman 
wrote:

> My job crashes with a bunch of these messages in the YARN logs.
>
> What are the appropriate steps in troubleshooting?
>
> 15/03/19 23:29:45 ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 10 outstanding blocks (after 3 retries)
>
> 15/03/19 23:29:45 ERROR storage.ShuffleBlockFetcherIterator: Failed to get
> block(s) from :
>


RE: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-20 Thread Shuai Zheng
Thanks!

 

Let me update the status.

 

I have copied the DirectOutputCommitter to my local. And set:

 

Conf.set("spark.hadoop.mapred.output.committer.class", 
"org..DirectOutputCommitter")

 

It works perfectly.

 

Thanks  everyone J

 

Regards,

 

Shuai

 

From: Aaron Davidson [mailto:ilike...@gmail.com] 
Sent: Tuesday, March 17, 2015 3:06 PM
To: Imran Rashid
Cc: Shuai Zheng; user@spark.apache.org
Subject: Re: Spark will process _temporary folder on S3 is very slow and always 
cause failure

 

Actually, this is the more relevant JIRA (which is resolved):

https://issues.apache.org/jira/browse/SPARK-3595

 

6352 is about saveAsParquetFile, which is not in use here.

 

Here is a DirectOutputCommitter implementation:

https://gist.github.com/aarondav/c513916e72101bbe14ec

 

and it can be configured in Spark with:

sparkConf.set("spark.hadoop.mapred.output.committer.class", 
classOf[DirectOutputCommitter].getName)

 

On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid  wrote:

I'm not super familiar w/ S3, but I think the issue is that you want to use a 
different output committers with "object" stores, that don't have a simple move 
operation.  There have been a few other threads on S3 & outputcommitters.  I 
think the most relevant for you is most probably this open JIRA:

 

https://issues.apache.org/jira/browse/SPARK-6352

 

On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng  wrote:

Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as 
a single node cluster for test. The data I use to sort is around 4GB and sit on 
S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the script 
(because I just want a benchmark now).

 

My job is as simple as:

val parquetFile = 
sqlContext.parquetFile("s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,")

parquetFile.registerTempTable("Test")

val sortedResult = sqlContext.sql("SELECT * FROM Test order by time").map { row 
=> { row.mkString("\t") } }

sortedResult.saveAsTextFile("s3n://myplace,");

 

The job takes around 6 mins to finish the sort when I am monitoring the 
process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at 
:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary 
folder first, after all sub-tasks finished, it will try to move all the ready 
result from _temporary folder to the final location. This process might be 
quick locally (because it will just be a cut/paste), but it looks like very 
slow on my S3, it takes a few second to move one file (usually there will be 
200 partitions). And then it raise exceptions after it move might be 40-50 
files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

at 
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

at 
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

at 
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

at 
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

at 
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure 
anything wrong here, but I use something very basic and I can see the job has 
finished and all result on the S3 under temporary folder, but then it raise the 
exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don’t know what is the issue here, I never see MapReduce has similar issue. 
So it could not be S3’s problem.

 

Regards,

 

Shuai

 

 



can distinct transform applied on DStream?

2015-03-20 Thread Darren Hoo
val aDstream = ...

val distinctStream = aDstream.transform(_.distinct())

but the elements in distinctStream  are not distinct.

Did I use it wrong?


Re: version conflict common-net

2015-03-20 Thread Sean Owen
It's not a crazy question, no. I'm having a bit of trouble figuring
out what's happening. Commons Net 2.2 is what's used by Spark. The
error appears to come from Spark. But the error is not finding a
method that did not exist in 2.2. I am not sure what ZipStream is, for
example. This could be a bizarre situation where classloader rules
mean that part of 2.2 and part of 3.3 are being used. For example,
let's say:

- your receiver uses 3.3 classes that are only in 3.3, so they are
found in your user classloader
- 3.3 classes call some class that also existed in 2.2, but those are
found in the Spark classloader.
- 2.2 class doesn't have methods that 3.3 expects

userClassPathFirst is often a remedy. There are several versions of
this flag though. For example you need a different one if on YARN to
have it take effect.

It's worth ruling that out first. If all else fails you can shade 3.3.

On Fri, Mar 20, 2015 at 11:44 AM, Jacob Abraham  wrote:
> Anyone? or is this question nonsensical... and I am doing something
> fundamentally wrong?
>
>
>
> On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham  wrote:
>>
>> Hi Folks,
>>
>> I have a situation where I am getting a version conflict between java
>> libraries that is used by my application and ones used by spark.
>>
>> Following are the details -
>>
>> I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark
>> 1.2.0-cdh5.3.2). The library that is causing the conflict is commons-net.
>>
>> In our spark application we use commons-net with version 3.3.
>>
>> However I found out that spark uses commons-net version 2.2.
>>
>> Hence when we try to submit our application using spark-submit, I end up
>> getting, a NoSuchMethodError()
>>
>> Error starting receiver 5 -
>>
>>
>> java.lang.NoSuchMethodError:
>> org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V
>>
>>  at ZipStream.onStart(ZipStream.java:55)
>>  at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>  at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>  at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
>>  at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
>>
>>   .
>>
>>
>>
>>
>>
>>
>> Now, if I change the commons-net version to 2.2, the job runs fine (expect
>> for the fact that some of the features we use from the commons-net 3.3 are
>> not there).
>>
>>
>>
>> How does one resolve such an issue where sparks uses one set of libraries
>> and our user application requires the same set of libraries, but just a
>> different version of it (In my case commons-net 2.2 vs 3.3).
>>
>>
>> I see that there is a setting that I can supply -
>> "spark.files.userClassPathFirst", but the documentation says that it is
>> experimental and for us this did not work at all.
>>
>>
>> Thanks in advance.
>>
>>
>> Regards,
>>
>> -Jacob
>>
>>
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded

2015-03-20 Thread Imran Rashid
I think you are running into a combo of

https://issues.apache.org/jira/browse/SPARK-5928
and
https://issues.apache.org/jira/browse/SPARK-5945

The standard solution is to just increase the number of partitions you are
creating. textFile(), reduceByKey(), and sortByKey() all take an optional
second argument, where you can specify the number of partitions you use.
It looks its using spark.default.parallelism right now, which will be the
number of cores in your cluster usually (not sure what that is in your
case).  The exception you gave shows your about 6x over the limit in at
least this one case, so I'd start by with at least 10x the number of
partitions you have now, and increase until it works (or you run into some
other problem from too many partitions ...)

I'd also strongly suggest doing the filter before you do the sortByKey --
no reason to force all that data if you're going to through a lot of it
away.  Its not completely clear where you are hitting the error now -- that
alone. might even solve your problem.

hope this helps,
Imran


On Thu, Mar 19, 2015 at 5:28 PM, roni  wrote:

> I get 2 types of error -
> -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0 and
> FetchFailedException: Adjusted frame length exceeds 2147483647:
> 12716268407 - discarded
>
> Spar keeps re-trying to submit the code and keeps getting this error.
>
> My file on which I am finding  the sliding window strings is 500 MB  and I
> am doing it with length = 150.
> It woks fine till length is 100.
>
> This is my code -
>  val hgfasta = sc.textFile(args(0)) // read the fasta file
> val kCount = hgfasta.flatMap(r => { r.sliding(args(2).toInt) })
> val kmerCount = kCount.map(x => (x, 1)).reduceByKey(_ + _).map { case
> (x, y) => (y, x) }.sortByKey(false).map { case (i, j) => (j, i) }
>
>   val filtered = kmerCount.filter(kv => kv._2 < 5)
>   filtered.map(kv => kv._1 + ", " +
> kv._2.toLong).saveAsTextFile(args(1))
>
>   }
> It gets stuck and flat map and save as Text file  Throws
> -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0 and
>
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 
> 2147483647: 12716268407 - discarded
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>
>
>


Re: Why I didn't see the benefits of using KryoSerializer

2015-03-20 Thread Imran Rashid
Hi Yong,

yes I think your analysis is correct.  I'd imagine almost all serializers
out there will just convert a string to its utf-8 representation.  You
might be interested in adding compression on top of a serializer, which
would probably bring the string size down in almost all cases, but then you
also need to take the time for compression.  Kryo is generally more
efficient than the java serializer on complicated object types.

I guess I'm still a little surprised that kryo is slower than java
serialization for you.  You might try setting
"spark.kryo.referenceTracking" to false if you are just serializing objects
with no circular references.  I think that will improve the performance a
little, though I dunno how much.

It might be worth running your experiments again with slightly more
complicated objects and see what you observe.

Imran


On Thu, Mar 19, 2015 at 12:57 PM, java8964  wrote:

> I read the Spark code a little bit, trying to understand my own question.
>
> It looks like the different is really between
> org.apache.spark.serializer.JavaSerializer and
> org.apache.spark.serializer.KryoSerializer, both having the method named
> writeObject.
>
> In my test case, for each line of my text file, it is about 140 bytes of
> String. When either JavaSerializer.writeObject(140 bytes of String) or
> KryoSerializer.writeObject(140 bytes of String), I didn't see difference in
> the underline OutputStream space usage.
>
> Does this mean that KryoSerializer really doesn't give us any benefit for
> String type? I understand that for primitives types, it shouldn't have any
> benefits, but how about String type?
>
> When we talk about lower the memory using KryoSerializer in spark, under
> what case it can bring significant benefits? It is my first experience with
> the KryoSerializer, so maybe I am total wrong about its usage.
>
> Thanks
>
> Yong
>
> --
> From: java8...@hotmail.com
> To: user@spark.apache.org
> Subject: Why I didn't see the benefits of using KryoSerializer
> Date: Tue, 17 Mar 2015 12:01:35 -0400
>
>
> Hi, I am new to Spark. I tried to understand the memory benefits of using
> KryoSerializer.
>
> I have this one box standalone test environment, which is 24 cores with
> 24G memory. I installed Hadoop 2.2 plus Spark 1.2.0.
>
> I put one text file in the hdfs about 1.2G.  Here is the settings in the
> spark-env.sh
>
> export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4"
> export SPARK_WORKER_MEMORY=32g
> export SPARK_DRIVER_MEMORY=2g
> export SPARK_EXECUTOR_MEMORY=4g
>
> First test case:
> val log=sc.textFile("hdfs://namenode:9000/test_1g/")
> log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
> log.count()
> log.count()
>
> The data is about 3M rows. For the first test case, from the storage in
> the web UI, I can see "Size in Memory" is 1787M, and "Fraction Cached" is
> 70% with 7 cached partitions.
> This matched with what I thought, and first count finished about 17s, and
> 2nd count finished about 6s.
>
> 2nd test case after restart the spark-shell:
> val log=sc.textFile("hdfs://namenode:9000/test_1g/")
> log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
> log.count()
> log.count()
>
> Now from the web UI, I can see "Size in Memory" is 1231M, and "Fraction
> Cached" is 100% with 10 cached partitions. It looks like caching the
> default "java serialized format" reduce the memory usage, but coming with a
> cost that first count finished around 39s and 2nd count finished around 9s.
> So the job runs slower, with less memory usage.
>
> So far I can understand all what happened and the tradeoff.
>
> Now the problem comes with when I tried to test with KryoSerializer
>
> SPARK_JAVA_OPTS="-Dspark.serializer=org.apache.spark.serializer.KryoSerializer"
> /opt/spark/bin/spark-shell
> val log=sc.textFile("hdfs://namenode:9000/test_1g/")
> log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
> log.count()
> log.count()
>
> First, I saw that the new serializer setting passed in, as proven in the
> Spark Properties of "Environment" shows "
>
> spark.driver.extraJavaOptions
>
>   -Dspark.serializer=org.apache.spark.serializer.KryoSerializer
>   ". This is not there for first 2 test cases.
> But in the web UI of "Storage", the "Size in Memory" is 1234M, with 100%
> "Fraction Cached" and 10 cached partitions. The first count took 46s and
> 2nd count took 23s.
>
> I don't get much less memory size as I expected, but longer run time for
> both counts. Anything I did wrong? Why the memory foot print of 
> "MEMORY_ONLY_SER"
> for KryoSerializer still use the same size as default Java serializer, with
> worse duration?
>
> Thanks
>
> Yong
>


Re: RDD Blocks skewing to just few executors

2015-03-20 Thread Sean Owen
Hm is data locality a factor here? I don't know.

Just a side note: this doesn't cause OOM errors per se since the cache
won't exceed the % of heap it's allowed. However that will hasten OOM
problems due to tasks using too much memory, of course. The solution
is to get more memory to the tasks or reduce their working set size.

On Fri, Mar 20, 2015 at 12:32 PM, Alessandro Lulli  wrote:
> Hi All,
>
> I'm experiencing the same issue with Spark 120 (not verified with previous).
>
> Could you please help us on this?
>
> Thanks
> Alessandro
>
> On Tue, Nov 18, 2014 at 1:40 AM, mtimper  wrote:
>>
>> Hi I'm running a standalone cluster with 8 worker servers.
>> I'm developing a streaming app that is adding new lines of text to several
>> different RDDs each batch interval. Each line has a well randomized unique
>> identifier that I'm trying to use for partitioning, since the data stream
>> does contain duplicates lines. I'm doing partitioning with this:
>>
>> val eventsByKey =  streamRDD.map { event => (getUID(event), event)}
>> val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq)
>>.partitionBy(new HashPartitioner(numPartions)).map(e => e._2)
>>
>> I'm adding to the existing RDD like with this:
>>
>> val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) {
>> (currentIter,batchIter) =>
>> val uniqEvents = ListBuffer[String]()
>> val uids = Map[String,Boolean]()
>> Array(currentIter, batchIter).foreach { iter =>
>>   iter.foreach { event =>
>> val uid = getUID(event)
>> if (!uids.contains(uid)) {
>> uids(uid) = true
>> uniqEvents +=event
>> }
>>   }
>> }
>> uniqEvents.iterator
>> }
>>
>> val count = mergedRDD.count
>>
>> The reason I'm doing it this way is that when I was doing:
>>
>> val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct
>> val count = mergedRDD.count
>>
>> It would start taking a long time and a lot of shuffles.
>>
>> The zipPartitions approach does perform better, though after running an
>> hour
>> or so I start seeing this
>> in the webUI.
>>
>>
>> 
>>
>> As you can see most of the data is skewing to just 2 executors, with 1
>> getting more than half the Blocks. These become a hotspot and eventually I
>> start seeing OOM errors. I've tried this a half a dozen times and the
>> 'hot'
>> executors changes, but not the skewing behavior.
>>
>> Any idea what is going on here?
>>
>> Thanks,
>>
>> Mike
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD Blocks skewing to just few executors

2015-03-20 Thread Alessandro Lulli
Hi All,

I'm experiencing the same issue with Spark 120 (not verified with previous).

Could you please help us on this?

Thanks
Alessandro

On Tue, Nov 18, 2014 at 1:40 AM, mtimper  wrote:

> Hi I'm running a standalone cluster with 8 worker servers.
> I'm developing a streaming app that is adding new lines of text to several
> different RDDs each batch interval. Each line has a well randomized unique
> identifier that I'm trying to use for partitioning, since the data stream
> does contain duplicates lines. I'm doing partitioning with this:
>
> val eventsByKey =  streamRDD.map { event => (getUID(event), event)}
> val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq)
>.partitionBy(new HashPartitioner(numPartions)).map(e => e._2)
>
> I'm adding to the existing RDD like with this:
>
> val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) {
> (currentIter,batchIter) =>
> val uniqEvents = ListBuffer[String]()
> val uids = Map[String,Boolean]()
> Array(currentIter, batchIter).foreach { iter =>
>   iter.foreach { event =>
> val uid = getUID(event)
> if (!uids.contains(uid)) {
> uids(uid) = true
> uniqEvents +=event
> }
>   }
> }
> uniqEvents.iterator
> }
>
> val count = mergedRDD.count
>
> The reason I'm doing it this way is that when I was doing:
>
> val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct
> val count = mergedRDD.count
>
> It would start taking a long time and a lot of shuffles.
>
> The zipPartitions approach does perform better, though after running an
> hour
> or so I start seeing this
> in the webUI.
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n19112/Executors.png
> >
>
> As you can see most of the data is skewing to just 2 executors, with 1
> getting more than half the Blocks. These become a hotspot and eventually I
> start seeing OOM errors. I've tried this a half a dozen times and the 'hot'
> executors changes, but not the skewing behavior.
>
> Any idea what is going on here?
>
> Thanks,
>
> Mike
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Job History Server

2015-03-20 Thread Zhan Zhang
Hi Patcharee,

It is an alpha feature in HDP distribution, integrating ATS with Spark history 
server. If you are using upstream, you can configure spark as regular without 
these configuration. But other related configuration are still mandatory, such 
as hdp.version related.

Thanks.

Zhan Zhang
 
On Mar 18, 2015, at 3:30 AM, patcharee  wrote:

> Hi,
> 
> I am using spark 1.3. I would like to use Spark Job History Server. I added 
> the following line into conf/spark-defaults.conf
> 
> spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService
> spark.history.provider 
> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
> spark.yarn.historyServer.address  sandbox.hortonworks.com:19888
> 
> But got Exception in thread "main" java.lang.ClassNotFoundException: 
> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
> 
> What class is really needed? How to fix it?
> 
> Br,
> Patcharee
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Visualizing Spark Streaming data

2015-03-20 Thread Roger Hoover
Hi Harut,

Jeff's right that Kibana + Elasticsearch can take you quite far out of the
box.  Depending on your volume of data, you may only be able to keep recent
data around though.

Another option that is custom-built for handling many dimensions at query
time (not as separate metrics) is Druid (http://druid.io/).  It supports
the Lambda architecture.  It does real-time indexing from Kafka and after a
configurable window, hands off shards to historical nodes.  The historical
shards can also be recomputed in batch mode to fixed up duplicates or late
data.

I wrote a plugin for Grafana that talks to Druid.  It doesn't support all
of Druid's rich query API but it can get you pretty far.

https://github.com/Quantiply/grafana-plugins/

Cheers,

Roger



On Fri, Mar 20, 2015 at 9:11 AM, Harut Martirosyan <
harut.martiros...@gmail.com> wrote:

> But it requires all possible combinations of your filters as separate
> metrics, moreover, it only can show time based information, you cannot
> group by say country.
>
> On 20 March 2015 at 19:09, Irfan Ahmad  wrote:
>
>> Grafana allows pretty slick interactive use patterns, especially with
>> graphite as the back-end. In a multi-user environment, why not have each
>> user just build their own independent dashboards and name them under some
>> simple naming convention?
>>
>>
>> *Irfan Ahmad*
>> CTO | Co-Founder | *CloudPhysics* 
>> Best of VMworld Finalist
>> Best Cloud Management Award
>> NetworkWorld 10 Startups to Watch
>> EMA Most Notable Vendor
>>
>> On Fri, Mar 20, 2015 at 1:06 AM, Harut Martirosyan <
>> harut.martiros...@gmail.com> wrote:
>>
>>> Hey Jeffrey.
>>> Thanks for reply.
>>>
>>> I already have something similar, I use Grafana and Graphite, and for
>>> simple metric streaming we've got all set-up right.
>>>
>>> My question is about interactive patterns. For instance, dynamically
>>> choose an event to monitor, dynamically choose group-by field or any sort
>>> of filter, then view results. This is easy when you have 1 user, but if you
>>> have team of analysts all specifying their own criteria, it becomes hard to
>>> manage them all.
>>>
>>> On 20 March 2015 at 12:02, Jeffrey Jedele 
>>> wrote:
>>>
 Hey Harut,
 I don't think there'll by any general practices as this part heavily
 depends on your environment, skills and what you want to achieve.

 If you don't have a general direction yet, I'd suggest you to have a
 look at Elasticsearch+Kibana. It's very easy to set up, powerful and
 therefore gets a lot of traction currently.

 Regards,
 Jeff

 2015-03-20 8:43 GMT+01:00 Harut :

> I'm trying to build a dashboard to visualize stream of events coming
> from
> mobile devices.
> For example, I have event called add_photo, from which I want to
> calculate
> trending tags for added photos for last x minutes. Then I'd like to
> aggregate that by country, etc. I've built the streaming part, which
> reads
> from Kafka, and calculates needed results and get appropriate RDDs, the
> question is now how to connect it to UI.
>
> Is there any general practices on how to pass parameters to spark from
> some
> custom built UI, how to organize data retrieval, what intermediate
> storages
> to use, etc.
>
> Thanks in advance.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

>>>
>>>
>>> --
>>> RGRDZ Harut
>>>
>>
>>
>
>
> --
> RGRDZ Harut
>


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yiannis Gkoufas
Actually I realized that the correct way is:

sqlContext.sql("set spark.sql.shuffle.partitions=1000")

but I am still experiencing the same behavior/error.

On 20 March 2015 at 16:04, Yiannis Gkoufas  wrote:

> Hi Yin,
>
> the way I set the configuration is:
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> sqlContext.setConf("spark.sql.shuffle.partitions","1000");
>
> it is the correct way right?
> In the mapPartitions task (the first task which is launched), I get again
> the same number of tasks and again the same error. :(
>
> Thanks a lot!
>
> On 19 March 2015 at 17:40, Yiannis Gkoufas  wrote:
>
>> Hi Yin,
>>
>> thanks a lot for that! Will give it a shot and let you know.
>>
>> On 19 March 2015 at 16:30, Yin Huai  wrote:
>>
>>> Was the OOM thrown during the execution of first stage (map) or the
>>> second stage (reduce)? If it was the second stage, can you increase the
>>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>>
>>> This setting controls the number of reduces Spark SQL will use and the
>>> default is 200. Maybe there are too many distinct values and the memory
>>> pressure on every task (of those 200 reducers) is pretty high. You can
>>> start with 400 and increase it until the OOM disappears. Hopefully this
>>> will help.
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>>
>>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
>>> wrote:
>>>
 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
 The number of tasks launched is equal to the number of parquet files. Do
 you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, "Yin Huai"  wrote:

> Seems there are too many distinct groups processed in a task, which
> trigger the problem.
>
> How many files do your dataset have and how large is a file? Seems
> your query will be executed with two stages, table scan and map-side
> aggregation in the first stage and the final round of reduce-side
> aggregation in the second stage. Can you take a look at the numbers of
> tasks launched in these two stages?
>
> Thanks,
>
> Yin
>
> On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas <
> johngou...@gmail.com> wrote:
>
>> Hi there, I set the executor memory to 8g but it didn't help
>>
>> On 18 March 2015 at 13:59, Cheng Lian  wrote:
>>
>>> You should probably increase executor memory by setting
>>> "spark.executor.memory".
>>>
>>> Full list of available configurations can be found here
>>> http://spark.apache.org/docs/latest/configuration.html
>>>
>>> Cheng
>>>
>>>
>>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>>
 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in
 a standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile("/data.parquet");
 val res = people.groupBy("name","date").
 agg(sum("power"),sum("supply")).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot

>>>
>>>
>>
>
>>>
>>
>


Re: Error communicating with MapOutputTracker

2015-03-20 Thread Imran Rashid
Hi Thomas,

sorry for such a late reply.  I don't have any super-useful advice, but
this seems like something that is important to follow up on.  to answer
your immediate question, No, there should not be any hard limit to the
number of tasks that MapOutputTracker can handle.  Though of course as
things get bigger, the overheads increase which is why you might hit
timeouts.

Two other minor suggestions:
(1) increase spark.akka.askTimeout -- thats the timeout you are running
into, it defaults to 30 seconds
(2) as you've noted, you've needed to play w/ other timeouts b/c of long GC
pauses -- its possible some GC tuning might help, though its a bit of a
black art so its hard to say what you can try.  You cold always try
Concurrent Mark Swee to avoid the long pauses, but of course that will
probably hurt overall performance.

can you share any more details of what you are trying to do?

Since you're fetching shuffle blocks in a shuffle map task, I guess you've
got two shuffles back-to-back, eg.
someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}.  Do you
expect to be doing a lot of GC in between the two shuffles?? -eg., in the
little example I have, if there were lots of objects being created in the
map & filter steps that will make it out of the eden space.  One possible
solution to this would be to force the first shuffle to complete, before
running any of the subsequent transformations, eg. by forcing
materialization to the cache first

val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK)
intermediateRDD.count() // force the shuffle to complete, without trying to
do our complicated downstream logic at the same time

val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...}

Also, can you share your data size?  Do you expect the shuffle to be
skewed, or do you think it will be well-balanced?  Not that I'll have any
suggestions for you based on the answer, but it may help us reproduce it
and try to fix whatever the root cause is.

thanks,
Imran



On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber 
wrote:

> I meant spark.default.parallelism of course.
>
> On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber 
> wrote:
>
>> Follow up:
>> We re-retried, this time after *decreasing* spark.parallelism. It was set
>> to 16000 before, (5 times the number of cores in our cluster). It is now
>> down to 6400 (2 times the number of cores).
>>
>> And it got past the point where it failed before.
>>
>> Does the MapOutputTracker have a limit on the number of tasks it can
>> track?
>>
>>
>> On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber 
>> wrote:
>>
>>> Hello,
>>>
>>> We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge
>>> workers). We use spark-submit to start an application.
>>>
>>> We got the following error which leads to a failed stage:
>>>
>>> Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
>>> most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
>>> ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
>>> communicating with MapOutputTracker
>>>
>>>
>>> We tried the whole application again, and it failed on the same stage
>>> (but it got more tasks completed on that stage) with the same error.
>>>
>>> We then looked at executors stderr, and all show similar logs, on both
>>> runs (see below). As far as we can tell, executors and master have disk
>>> space left.
>>>
>>> *Any suggestion on where to look to understand why the communication
>>> with the MapOutputTracker fails?*
>>>
>>> Thanks
>>> Thomas
>>> 
>>> In case it matters, our akka settings:
>>> spark.akka.frameSize 50
>>> spark.akka.threads 8
>>> // those below are 10* the default, to cope with large GCs
>>> spark.akka.timeout 1000
>>> spark.akka.heartbeat.pauses 6
>>> spark.akka.failure-detector.threshold 3000.0
>>> spark.akka.heartbeat.interval 1
>>>
>>> Appendix: executor logs, where it starts going awry
>>>
>>> 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 
>>> 298525
>>> 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
>>> 298525)
>>> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
>>> curMem=5543008799, maxMem=18127202549
>>> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
>>> bytes in memory (estimated size 1473.0 B, free 11.7 GB)
>>> 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
>>> broadcast_339_piece0
>>> 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 
>>> took 224 ms
>>> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
>>> curMem=5543010272, maxMem=18127202549
>>> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
>>> memory (estimated size 2.5 KB, free 11.7 GB)
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>> shuffle 18, fetching them
>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
>>> actor =

Re: Visualizing Spark Streaming data

2015-03-20 Thread Harut Martirosyan
But it requires all possible combinations of your filters as separate
metrics, moreover, it only can show time based information, you cannot
group by say country.

On 20 March 2015 at 19:09, Irfan Ahmad  wrote:

> Grafana allows pretty slick interactive use patterns, especially with
> graphite as the back-end. In a multi-user environment, why not have each
> user just build their own independent dashboards and name them under some
> simple naming convention?
>
>
> *Irfan Ahmad*
> CTO | Co-Founder | *CloudPhysics* 
> Best of VMworld Finalist
> Best Cloud Management Award
> NetworkWorld 10 Startups to Watch
> EMA Most Notable Vendor
>
> On Fri, Mar 20, 2015 at 1:06 AM, Harut Martirosyan <
> harut.martiros...@gmail.com> wrote:
>
>> Hey Jeffrey.
>> Thanks for reply.
>>
>> I already have something similar, I use Grafana and Graphite, and for
>> simple metric streaming we've got all set-up right.
>>
>> My question is about interactive patterns. For instance, dynamically
>> choose an event to monitor, dynamically choose group-by field or any sort
>> of filter, then view results. This is easy when you have 1 user, but if you
>> have team of analysts all specifying their own criteria, it becomes hard to
>> manage them all.
>>
>> On 20 March 2015 at 12:02, Jeffrey Jedele 
>> wrote:
>>
>>> Hey Harut,
>>> I don't think there'll by any general practices as this part heavily
>>> depends on your environment, skills and what you want to achieve.
>>>
>>> If you don't have a general direction yet, I'd suggest you to have a
>>> look at Elasticsearch+Kibana. It's very easy to set up, powerful and
>>> therefore gets a lot of traction currently.
>>>
>>> Regards,
>>> Jeff
>>>
>>> 2015-03-20 8:43 GMT+01:00 Harut :
>>>
 I'm trying to build a dashboard to visualize stream of events coming
 from
 mobile devices.
 For example, I have event called add_photo, from which I want to
 calculate
 trending tags for added photos for last x minutes. Then I'd like to
 aggregate that by country, etc. I've built the streaming part, which
 reads
 from Kafka, and calculates needed results and get appropriate RDDs, the
 question is now how to connect it to UI.

 Is there any general practices on how to pass parameters to spark from
 some
 custom built UI, how to organize data retrieval, what intermediate
 storages
 to use, etc.

 Thanks in advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>>
>> --
>> RGRDZ Harut
>>
>
>


-- 
RGRDZ Harut


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yiannis Gkoufas
Hi Yin,

the way I set the configuration is:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions","1000");

it is the correct way right?
In the mapPartitions task (the first task which is launched), I get again
the same number of tasks and again the same error. :(

Thanks a lot!

On 19 March 2015 at 17:40, Yiannis Gkoufas  wrote:

> Hi Yin,
>
> thanks a lot for that! Will give it a shot and let you know.
>
> On 19 March 2015 at 16:30, Yin Huai  wrote:
>
>> Was the OOM thrown during the execution of first stage (map) or the
>> second stage (reduce)? If it was the second stage, can you increase the
>> value of spark.sql.shuffle.partitions and see if the OOM disappears?
>>
>> This setting controls the number of reduces Spark SQL will use and the
>> default is 200. Maybe there are too many distinct values and the memory
>> pressure on every task (of those 200 reducers) is pretty high. You can
>> start with 400 and increase it until the OOM disappears. Hopefully this
>> will help.
>>
>> Thanks,
>>
>> Yin
>>
>>
>> On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
>> wrote:
>>
>>> Hi Yin,
>>>
>>> Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
>>> The number of tasks launched is equal to the number of parquet files. Do
>>> you have any idea on how to deal with this situation?
>>>
>>> Thanks a lot
>>> On 18 Mar 2015 17:35, "Yin Huai"  wrote:
>>>
 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems your
 query will be executed with two stages, table scan and map-side aggregation
 in the first stage and the final round of reduce-side aggregation in the
 second stage. Can you take a look at the numbers of tasks launched in these
 two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas >>> > wrote:

> Hi there, I set the executor memory to 8g but it didn't help
>
> On 18 March 2015 at 13:59, Cheng Lian  wrote:
>
>> You should probably increase executor memory by setting
>> "spark.executor.memory".
>>
>> Full list of available configurations can be found here
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> Cheng
>>
>>
>> On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:
>>
>>> Hi there,
>>>
>>> I was trying the new DataFrame API with some basic operations on a
>>> parquet dataset.
>>> I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
>>> standalone cluster mode.
>>> The code is the following:
>>>
>>> val people = sqlContext.parquetFile("/data.parquet");
>>> val res = people.groupBy("name","date").
>>> agg(sum("power"),sum("supply")).take(10);
>>> System.out.println(res);
>>>
>>> The dataset consists of 16 billion entries.
>>> The error I get is java.lang.OutOfMemoryError: GC overhead limit
>>> exceeded
>>>
>>> My configuration is:
>>>
>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>> spark.driver.memory6g
>>> spark.executor.extraJavaOptions -XX:+UseCompressedOops
>>> spark.shuffle.managersort
>>>
>>> Any idea how can I workaround this?
>>>
>>> Thanks a lot
>>>
>>
>>
>

>>
>


Re: version conflict common-net

2015-03-20 Thread Jacob Abraham
Anyone? or is this question nonsensical... and I am doing something
fundamentally wrong?



On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham  wrote:

> Hi Folks,
>
> I have a situation where I am getting a version conflict between java
> libraries that is used by my application and ones used by spark.
>
> Following are the details -
>
> I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark
> 1.2.0-cdh5.3.2). The library that is causing the conflict is commons-net.
>
> In our spark application we use commons-net with version 3.3.
>
> However I found out that spark uses commons-net version 2.2.
>
> Hence when we try to submit our application using spark-submit, I end up
> getting, a NoSuchMethodError()
>
> ​
> Error starting receiver 5 -
>
> ​  ​
> java.lang.NoSuchMethodError: 
> org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V
>
>   at ZipStream.onStart(ZipStream.java:55)
>   at 
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>   at 
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
>
> ​  .
>
>   ​
>
>
>
>
> ​Now, if I change the commons-net version to 2.2, the job runs fine (expect 
> for the fact that some of the features we use from the commons-net 3.3 are 
> not there).
>
>
>
> ​How does one resolve such an issue where ​sparks uses one set of libraries 
> and our user application requires the same set of libraries, but just a 
> different version of it (In my case commons-net 2.2 vs 3.3).
>
>
> I see that there is a setting that I can supply - 
> "spark.files.userClassPathFirst", but the documentation says that it is 
> experimental and for us this did not work at all.
>
>
> ​Thanks in advance.​
>
>
> Regards,
>
> -Jacob
>
>
>
>
>


Buffering for Socket streams

2015-03-20 Thread jamborta
Hi all,

We are designing a workflow where we try to stream local files to a Socket
streamer, that would clean and process the files and write them to hdfs. We
have an issue with bigger files when the streamer cannot keep up with the
data, and runs out of memory. 

What would be the best way to implement an approach where the Socket stream
receiver would notify the stream not to send more data (stop reading from
disk too?), just before it might run out of memory?

thanks,




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Buffering-for-Socket-streams-tp22164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)

2015-03-20 Thread Ralf Heyde
 Exception in thread "main" java.lang.IllegalArgumentException:
>> AWS Access Key ID and Secret Access Key must be specified as the username
>> or password (respectively) of a s3 URL, or by setting the
>> fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).
>>
>>
>> Drilled down in the Job, I can see, that the RestStorageService
>> recognizes AWS4-HMAC-SHA256 ... but somehow it gets a ResponseCode 400 (log
>> below) -> i replaced the key / encoded secret with XXX_*_XXX:
>>
>> 15/03/20 11:25:31 WARN RestStorageService: Retrying request with
>> "AWS4-HMAC-SHA256" signing mechanism: GET
>> https://frankfurt.ingestion.batch.s3.amazonaws.com:443/?max-keys=1&prefix=EAN%2F2015-03-09-72640385%2Finput%2FHotelImageList.gz%2F&delimiter=%2F
>> HTTP/1.1
>> 15/03/20 11:25:31 WARN RestStorageService: Retrying request following
>> error response: GET
>> '/?max-keys=1&prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/&delimiter=/'
>> -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date:
>> Fri, 20 Mar 2015 11:25:31 GMT, Authorization: AWS
>> XXX_MY_KEY_XXX:XXX_I_GUESS_SECRET_XXX], Response Headers:
>> [x-amz-request-id: 7E6F85873D69D14E, x-amz-id-2:
>> rGFW+kRfURzz3DlY/m/M8h054MmHu8bxJAtKVHUmov/VY7pBXvtMvbQTXxA7bffpu4xxf4rGmL4=,
>> x-amz-region: eu-central-1, Content-Type: application/xml,
>> Transfer-Encoding: chunked, Date: Fri, 20 Mar 2015 11:25:31 GMT,
>> Connection: close, Server: AmazonS3]
>> 15/03/20 11:25:32 WARN RestStorageService: Retrying request after
>> automatic adjustment of Host endpoint from "
>> frankfurt.ingestion.batch.s3.amazonaws.com" to "
>> frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com" following
>> request signing error using AWS request signing version 4: GET
>> https://frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com:443/?max-keys=1&prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/&delimiter=/
>> HTTP/1.1
>> 15/03/20 11:25:32 WARN RestStorageService: Retrying request following
>> error response: GET
>> '/?max-keys=1&prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/&delimiter=/'
>> -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date:
>> Fri, 20 Mar 2015 11:25:31 GMT, x-amz-content-sha256:
>> e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, Host:
>> frankfurt.ingestion.batch.s3.amazonaws.com, x-amz-date:
>> 20150320T112531Z, Authorization: AWS4-HMAC-SHA256
>> Credential=XXX_MY_KEY_XXX/20150320/us-east-1/s3/aws4_request,SignedHeaders=date;host;x-amz-content-sha256;x-amz-date,Signature=2098d3175c4304e44be912b770add7594d1d1b44f545c3025be1748672ec60e4],
>> Response Headers: [x-amz-request-id: 5CABCD0D3046B267, x-amz-id-2:
>> V65tW1lbSybbN3R3RMKBjJFz7xUgJDubSUm/XKXTypg7qfDtkSFRt2I9CMo2Qo2OAA+E44hiazg=,
>> Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20
>> Mar 2015 11:25:32 GMT, Connection: close, Server: AmazonS3]
>> Exception in thread "main"
>> org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
>> s3n://frankfurt.ingestion.batch/EAN/2015-03-09-72640385/input/HotelImageList.gz
>>
>>
>> Do you have any Ideas? Was somebody of you already able to access S3 in
>> Frankfurt, if so - how?
>>
>> Cheers Ralf
>>
>>
>>
>


Re: Visualizing Spark Streaming data

2015-03-20 Thread Irfan Ahmad
Grafana allows pretty slick interactive use patterns, especially with
graphite as the back-end. In a multi-user environment, why not have each
user just build their own independent dashboards and name them under some
simple naming convention?


*Irfan Ahmad*
CTO | Co-Founder | *CloudPhysics* 
Best of VMworld Finalist
Best Cloud Management Award
NetworkWorld 10 Startups to Watch
EMA Most Notable Vendor

On Fri, Mar 20, 2015 at 1:06 AM, Harut Martirosyan <
harut.martiros...@gmail.com> wrote:

> Hey Jeffrey.
> Thanks for reply.
>
> I already have something similar, I use Grafana and Graphite, and for
> simple metric streaming we've got all set-up right.
>
> My question is about interactive patterns. For instance, dynamically
> choose an event to monitor, dynamically choose group-by field or any sort
> of filter, then view results. This is easy when you have 1 user, but if you
> have team of analysts all specifying their own criteria, it becomes hard to
> manage them all.
>
> On 20 March 2015 at 12:02, Jeffrey Jedele 
> wrote:
>
>> Hey Harut,
>> I don't think there'll by any general practices as this part heavily
>> depends on your environment, skills and what you want to achieve.
>>
>> If you don't have a general direction yet, I'd suggest you to have a look
>> at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore
>> gets a lot of traction currently.
>>
>> Regards,
>> Jeff
>>
>> 2015-03-20 8:43 GMT+01:00 Harut :
>>
>>> I'm trying to build a dashboard to visualize stream of events coming from
>>> mobile devices.
>>> For example, I have event called add_photo, from which I want to
>>> calculate
>>> trending tags for added photos for last x minutes. Then I'd like to
>>> aggregate that by country, etc. I've built the streaming part, which
>>> reads
>>> from Kafka, and calculates needed results and get appropriate RDDs, the
>>> question is now how to connect it to UI.
>>>
>>> Is there any general practices on how to pass parameters to spark from
>>> some
>>> custom built UI, how to organize data retrieval, what intermediate
>>> storages
>>> to use, etc.
>>>
>>> Thanks in advance.
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> RGRDZ Harut
>


What is the jvm size when start spark-submit through local mode

2015-03-20 Thread Shuai Zheng
Hi,

 

I am curious, when I start a spark program in local mode, which parameter
will be used to decide the jvm memory size for executor?

In theory should be:

--executor-memory 20G

 

But I remember local mode will only start spark executor in the same process
of driver, then should be:

--driver-memory 20G

 

Regards,

 

Shuai



How to handle under-performing nodes in the cluster

2015-03-20 Thread Yiannis Gkoufas
Hi all,

I have 6 nodes in the cluster and one of the nodes is clearly
under-performing:


​
I was wandering what is the impact of having such issues? Also what is the
recommended way to workaround it?

Thanks a lot,
Yiannis


Re: Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)

2015-03-20 Thread Gourav Sengupta
e: Retrying request with
> "AWS4-HMAC-SHA256" signing mechanism: GET
> https://frankfurt.ingestion.batch.s3.amazonaws.com:443/?max-keys=1&prefix=EAN%2F2015-03-09-72640385%2Finput%2FHotelImageList.gz%2F&delimiter=%2F
> HTTP/1.1
> 15/03/20 11:25:31 WARN RestStorageService: Retrying request following
> error response: GET
> '/?max-keys=1&prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/&delimiter=/'
> -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date:
> Fri, 20 Mar 2015 11:25:31 GMT, Authorization: AWS
> XXX_MY_KEY_XXX:XXX_I_GUESS_SECRET_XXX], Response Headers:
> [x-amz-request-id: 7E6F85873D69D14E, x-amz-id-2:
> rGFW+kRfURzz3DlY/m/M8h054MmHu8bxJAtKVHUmov/VY7pBXvtMvbQTXxA7bffpu4xxf4rGmL4=,
> x-amz-region: eu-central-1, Content-Type: application/xml,
> Transfer-Encoding: chunked, Date: Fri, 20 Mar 2015 11:25:31 GMT,
> Connection: close, Server: AmazonS3]
> 15/03/20 11:25:32 WARN RestStorageService: Retrying request after
> automatic adjustment of Host endpoint from "
> frankfurt.ingestion.batch.s3.amazonaws.com" to "
> frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com" following
> request signing error using AWS request signing version 4: GET
> https://frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com:443/?max-keys=1&prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/&delimiter=/
> HTTP/1.1
> 15/03/20 11:25:32 WARN RestStorageService: Retrying request following
> error response: GET
> '/?max-keys=1&prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/&delimiter=/'
> -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date:
> Fri, 20 Mar 2015 11:25:31 GMT, x-amz-content-sha256:
> e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, Host:
> frankfurt.ingestion.batch.s3.amazonaws.com, x-amz-date: 20150320T112531Z,
> Authorization: AWS4-HMAC-SHA256
> Credential=XXX_MY_KEY_XXX/20150320/us-east-1/s3/aws4_request,SignedHeaders=date;host;x-amz-content-sha256;x-amz-date,Signature=2098d3175c4304e44be912b770add7594d1d1b44f545c3025be1748672ec60e4],
> Response Headers: [x-amz-request-id: 5CABCD0D3046B267, x-amz-id-2:
> V65tW1lbSybbN3R3RMKBjJFz7xUgJDubSUm/XKXTypg7qfDtkSFRt2I9CMo2Qo2OAA+E44hiazg=,
> Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20
> Mar 2015 11:25:32 GMT, Connection: close, Server: AmazonS3]
> Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException:
> Input path does not exist:
> s3n://frankfurt.ingestion.batch/EAN/2015-03-09-72640385/input/HotelImageList.gz
>
>
> Do you have any Ideas? Was somebody of you already able to access S3 in
> Frankfurt, if so - how?
>
> Cheers Ralf
>
>
>


Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)

2015-03-20 Thread Ralf Heyde
quest signing version 4: GET
https://frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com:443/?max-keys=1&prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/&delimiter=/
HTTP/1.1
15/03/20 11:25:32 WARN RestStorageService: Retrying request following error
response: GET
'/?max-keys=1&prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/&delimiter=/'
-- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date:
Fri, 20 Mar 2015 11:25:31 GMT, x-amz-content-sha256:
e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, Host:
frankfurt.ingestion.batch.s3.amazonaws.com, x-amz-date: 20150320T112531Z,
Authorization: AWS4-HMAC-SHA256
Credential=XXX_MY_KEY_XXX/20150320/us-east-1/s3/aws4_request,SignedHeaders=date;host;x-amz-content-sha256;x-amz-date,Signature=2098d3175c4304e44be912b770add7594d1d1b44f545c3025be1748672ec60e4],
Response Headers: [x-amz-request-id: 5CABCD0D3046B267, x-amz-id-2:
V65tW1lbSybbN3R3RMKBjJFz7xUgJDubSUm/XKXTypg7qfDtkSFRt2I9CMo2Qo2OAA+E44hiazg=,
Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20
Mar 2015 11:25:32 GMT, Connection: close, Server: AmazonS3]
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException:
Input path does not exist:
s3n://frankfurt.ingestion.batch/EAN/2015-03-09-72640385/input/HotelImageList.gz


Do you have any Ideas? Was somebody of you already able to access S3 in
Frankfurt, if so - how?

Cheers Ralf


ShuffleBlockFetcherIterator: Failed to get block(s)

2015-03-20 Thread Eric Friedman
My job crashes with a bunch of these messages in the YARN logs.

What are the appropriate steps in troubleshooting?

15/03/19 23:29:45 ERROR shuffle.RetryingBlockFetcher: Exception while
beginning fetch of 10 outstanding blocks (after 3 retries)

15/03/19 23:29:45 ERROR storage.ShuffleBlockFetcherIterator: Failed to get
block(s) from :


about Partition Index

2015-03-20 Thread Long Cheng
Dear all,

About the index of each partition of an RDD, I am wondering whether we
can keep their numbering on each physical machine in a hash
partitioning process. For example, a cluster containing three physical
machines A,B,C (all are workers), for an RDD with six partitions,
assume that the two partitions with index 0 and 3 are in A, partitions
with index 1 and 4 are in B and the ones with index 2 and 5 are in C.
Then, if I hash partition the RDD using "partitionBy(new
HashPartitioner(6))", will the new created RDD still have the same
partition index on each machine? Is it possible that the partitions
with index 0 and 3 are now on B but not A? If it is, is there any
method that we can use to keep both the RDDs having the same numbering
on each physical machine?

Thanks in advance.

Long

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Job History Server

2015-03-20 Thread Sean Owen
Uh, does that mean HDP shipped Marcelo's uncommitted patch from
SPARK-1537 anyway? Given the discussion there, that seems kinda
aggressive.

On Wed, Mar 18, 2015 at 8:49 AM, Marcelo Vanzin  wrote:
> Those classes are not part of standard Spark. You may want to contact
> Hortonworks directly if they're suggesting you use those.
>
> On Wed, Mar 18, 2015 at 3:30 AM, patcharee  wrote:
>> Hi,
>>
>> I am using spark 1.3. I would like to use Spark Job History Server. I added
>> the following line into conf/spark-defaults.conf
>>
>> spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService
>> spark.history.provider
>> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
>> spark.yarn.historyServer.address  sandbox.hortonworks.com:19888
>>
>> But got Exception in thread "main" java.lang.ClassNotFoundException:
>> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
>>
>> What class is really needed? How to fix it?
>>
>> Br,
>> Patcharee
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2. loses often all executors

2015-03-20 Thread Akhil Das
Isn't that a feature? Other than running a buggy pipeline, just kills all
executors? You can always handle exceptions with proper try catch in your
code though.

Thanks
Best Regards

On Fri, Mar 20, 2015 at 3:51 PM, mrm  wrote:

> Hi,
>
> I recently changed from Spark 1.1. to Spark 1.2., and I noticed that it
> loses all executors whenever I have any Python code bug (like looking up a
> key in a dictionary that does not exist). In earlier versions, it would
> raise an exception but it would not lose all executors.
>
> Anybody with a similar problem?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark 1.2. loses often all executors

2015-03-20 Thread mrm
Hi,

I recently changed from Spark 1.1. to Spark 1.2., and I noticed that it
loses all executors whenever I have any Python code bug (like looking up a
key in a dictionary that does not exist). In earlier versions, it would
raise an exception but it would not lose all executors. 

Anybody with a similar problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-submit and multiple files

2015-03-20 Thread Petar Zecevic


I tried your program in yarn-client mode and it worked with no 
exception. This is the command I used:


spark-submit --master yarn-client --py-files work.py main.py

(Spark 1.2.1)

On 20.3.2015. 9:47, Guillaume Charhon wrote:

Hi Davies,

I am already using --py-files. The system does use the other file. The 
error I am getting is not trivial. Please check the error log.




On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu > wrote:


You could submit additional Python source via --py-files , for
example:

$ bin/spark-submit --py-files work.py main.py

On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez
mailto:guilla...@databerries.com>> wrote:
> Hello guys,
>
> I am having a hard time to understand how spark-submit behave
with multiple
> files. I have created two code snippets. Each code snippet is
composed of a
> main.py and work.py. The code works if I paste work.py then
main.py in a
> pyspark shell. However both snippets do not work when using
spark submit and
> generate different errors.
>
> Function add_1 definition outside
> http://www.codeshare.io/4ao8B
> https://justpaste.it/jzvj
>
> Embedded add_1 function definition
> http://www.codeshare.io/OQJxq
> https://justpaste.it/jzvn
>
> I am trying a way to make it work.
>
> Thank you for your support.
>
>
>
> --
> View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html
> Sent from the Apache Spark User List mailing list archive at
Nabble.com.
>
>
-
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

> For additional commands, e-mail: user-h...@spark.apache.org

>






Clean the shuffle data during iteration

2015-03-20 Thread James
Hello,

Is that possible to delete shuffle data of previous iteration as it is not
necessary?

Alcaid


Re: Error when using multiple python files spark-submit

2015-03-20 Thread Guillaume Charhon
I see. I will try the other way around.

On Thu, Mar 19, 2015 at 8:06 PM, Davies Liu  wrote:

> the options of spark-submit should come before main.py, or they will
> become the options of main.py, so it should be:
>
>  ../hadoop/spark-install/bin/spark-submit --py-files
>
>  
> /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py
>  --master spark://spark-m:7077 main.py
>
> On Mon, Mar 16, 2015 at 4:11 AM, poiuytrez 
> wrote:
> > I have a spark app which is composed of multiple files.
> >
> > When I launch Spark using:
> >
> > ../hadoop/spark-install/bin/spark-submit main.py --py-files
> >
> /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py
> > --master spark://spark-m:7077
> >
> > I am getting an error:
> >
> > 15/03/13 15:54:24 INFO TaskSetManager: Lost task 6.3 in stage 413.0
> (TID
> > 5817) on executor spark-w-3.c.databerries.internal:
> > org.apache.spark.api.python.PythonException (Traceback (most recent call
> > last):   File "/home/hadoop/spark-install/python/pyspark/worker.py", line
> > 90, in main
> > command = pickleSer._read_with_length(infile)   File
> > "/home/hadoop/spark-install/python/pyspark/serializers.py", line 151, in
> > _read_with_length
> > return self.loads(obj)   File
> > "/home/hadoop/spark-install/python/pyspark/serializers.py", line 396, in
> > loads
> > return cPickle.loads(obj) ImportError: No module named naive
> >
> > It is weird because I do not serialize anything. naive.py is also
> available
> > on every machine at the same path.
> >
> > Any insight on what could be going on? The issue does not happen on my
> > laptop.
> >
> > PS : I am using Spark 1.2.0.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-using-multiple-python-files-spark-submit-tp22080.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Spark-submit and multiple files

2015-03-20 Thread Guillaume Charhon
Hi Davies,

I am already using --py-files. The system does use the other file. The
error I am getting is not trivial. Please check the error log.



On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu  wrote:

> You could submit additional Python source via --py-files , for example:
>
> $ bin/spark-submit --py-files work.py main.py
>
> On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez 
> wrote:
> > Hello guys,
> >
> > I am having a hard time to understand how spark-submit behave with
> multiple
> > files. I have created two code snippets. Each code snippet is composed
> of a
> > main.py and work.py. The code works if I paste work.py then main.py in a
> > pyspark shell. However both snippets do not work when using spark submit
> and
> > generate different errors.
> >
> > Function add_1 definition outside
> > http://www.codeshare.io/4ao8B
> > https://justpaste.it/jzvj
> >
> > Embedded add_1 function definition
> > http://www.codeshare.io/OQJxq
> > https://justpaste.it/jzvn
> >
> > I am trying a way to make it work.
> >
> > Thank you for your support.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Load balancing

2015-03-20 Thread Jeffrey Jedele
Hi Mohit,
it also depends on what the source for your streaming application is.

If you use Kafka, you can easily partition topics and have multiple
receivers on different machines.

If you have sth like a HTTP, socket, etc stream, you probably can't do
that. The Spark RDDs generated by your receiver will be partitioned and
processed in a distributed manner like usual Spark RDDs however. There are
parameters to control that behavior (e.g. defaultParallelism and
blockInterval).

See here for more details:
http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning

Regards,
Jeff

2015-03-20 8:02 GMT+01:00 Akhil Das :

> 1. If you are consuming data from Kafka or any other receiver based
> sources, then you can start 1-2 receivers per worker (assuming you'll have
> min 4 core per worker)
>
> 2. If you are having single receiver or is a fileStream then what you can
> do to distribute the data across machines is to do a repartition.
>
> Thanks
> Best Regards
>
> On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia 
> wrote:
>
>> I am trying to understand how to load balance the incoming data to
>> multiple spark streaming workers. Could somebody help me understand how I
>> can distribute my incoming data from various sources such that incoming
>> data is going to multiple spark streaming nodes? Is it done by spark client
>> with help of spark master similar to hadoop client asking namenodes for the
>> list of datanodes?
>>
>
>


Re: Visualizing Spark Streaming data

2015-03-20 Thread Jeffrey Jedele
I'll stay with my recommendation - that's exactly what Kibana is made for ;)

2015-03-20 9:06 GMT+01:00 Harut Martirosyan :

> Hey Jeffrey.
> Thanks for reply.
>
> I already have something similar, I use Grafana and Graphite, and for
> simple metric streaming we've got all set-up right.
>
> My question is about interactive patterns. For instance, dynamically
> choose an event to monitor, dynamically choose group-by field or any sort
> of filter, then view results. This is easy when you have 1 user, but if you
> have team of analysts all specifying their own criteria, it becomes hard to
> manage them all.
>
> On 20 March 2015 at 12:02, Jeffrey Jedele 
> wrote:
>
>> Hey Harut,
>> I don't think there'll by any general practices as this part heavily
>> depends on your environment, skills and what you want to achieve.
>>
>> If you don't have a general direction yet, I'd suggest you to have a look
>> at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore
>> gets a lot of traction currently.
>>
>> Regards,
>> Jeff
>>
>> 2015-03-20 8:43 GMT+01:00 Harut :
>>
>>> I'm trying to build a dashboard to visualize stream of events coming from
>>> mobile devices.
>>> For example, I have event called add_photo, from which I want to
>>> calculate
>>> trending tags for added photos for last x minutes. Then I'd like to
>>> aggregate that by country, etc. I've built the streaming part, which
>>> reads
>>> from Kafka, and calculates needed results and get appropriate RDDs, the
>>> question is now how to connect it to UI.
>>>
>>> Is there any general practices on how to pass parameters to spark from
>>> some
>>> custom built UI, how to organize data retrieval, what intermediate
>>> storages
>>> to use, etc.
>>>
>>> Thanks in advance.
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> RGRDZ Harut
>


Re: Powered by Spark addition

2015-03-20 Thread Ricardo Almeida
Hello Matei,

Could you please also add our company to the "Powered By" list?
Details are as follows:

Name: Act Now
URL: www.actnowib.com

Description:
Sparks powers NOW APPS, a big data, real-time, predictive analytics
platform.
Using Spark SQL, MLlib and GraphX components for both batch ETL and
analytics applied to telecommunication data, providing faster and more
meaningful insights.and actionable data to the operators.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Powered-by-Spark-addition-tp7422p22161.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Visualizing Spark Streaming data

2015-03-20 Thread Harut Martirosyan
Hey Jeffrey.
Thanks for reply.

I already have something similar, I use Grafana and Graphite, and for
simple metric streaming we've got all set-up right.

My question is about interactive patterns. For instance, dynamically choose
an event to monitor, dynamically choose group-by field or any sort of
filter, then view results. This is easy when you have 1 user, but if you
have team of analysts all specifying their own criteria, it becomes hard to
manage them all.

On 20 March 2015 at 12:02, Jeffrey Jedele  wrote:

> Hey Harut,
> I don't think there'll by any general practices as this part heavily
> depends on your environment, skills and what you want to achieve.
>
> If you don't have a general direction yet, I'd suggest you to have a look
> at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore
> gets a lot of traction currently.
>
> Regards,
> Jeff
>
> 2015-03-20 8:43 GMT+01:00 Harut :
>
>> I'm trying to build a dashboard to visualize stream of events coming from
>> mobile devices.
>> For example, I have event called add_photo, from which I want to calculate
>> trending tags for added photos for last x minutes. Then I'd like to
>> aggregate that by country, etc. I've built the streaming part, which reads
>> from Kafka, and calculates needed results and get appropriate RDDs, the
>> question is now how to connect it to UI.
>>
>> Is there any general practices on how to pass parameters to spark from
>> some
>> custom built UI, how to organize data retrieval, what intermediate
>> storages
>> to use, etc.
>>
>> Thanks in advance.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
RGRDZ Harut


Re: Visualizing Spark Streaming data

2015-03-20 Thread Jeffrey Jedele
Hey Harut,
I don't think there'll by any general practices as this part heavily
depends on your environment, skills and what you want to achieve.

If you don't have a general direction yet, I'd suggest you to have a look
at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore
gets a lot of traction currently.

Regards,
Jeff

2015-03-20 8:43 GMT+01:00 Harut :

> I'm trying to build a dashboard to visualize stream of events coming from
> mobile devices.
> For example, I have event called add_photo, from which I want to calculate
> trending tags for added photos for last x minutes. Then I'd like to
> aggregate that by country, etc. I've built the streaming part, which reads
> from Kafka, and calculates needed results and get appropriate RDDs, the
> question is now how to connect it to UI.
>
> Is there any general practices on how to pass parameters to spark from some
> custom built UI, how to organize data retrieval, what intermediate storages
> to use, etc.
>
> Thanks in advance.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Visualizing Spark Streaming data

2015-03-20 Thread Harut
I'm trying to build a dashboard to visualize stream of events coming from
mobile devices. 
For example, I have event called add_photo, from which I want to calculate
trending tags for added photos for last x minutes. Then I'd like to
aggregate that by country, etc. I've built the streaming part, which reads
from Kafka, and calculates needed results and get appropriate RDDs, the
question is now how to connect it to UI. 

Is there any general practices on how to pass parameters to spark from some
custom built UI, how to organize data retrieval, what intermediate storages
to use, etc.

Thanks in advance.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Load balancing

2015-03-20 Thread Akhil Das
1. If you are consuming data from Kafka or any other receiver based
sources, then you can start 1-2 receivers per worker (assuming you'll have
min 4 core per worker)

2. If you are having single receiver or is a fileStream then what you can
do to distribute the data across machines is to do a repartition.

Thanks
Best Regards

On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia 
wrote:

> I am trying to understand how to load balance the incoming data to
> multiple spark streaming workers. Could somebody help me understand how I
> can distribute my incoming data from various sources such that incoming
> data is going to multiple spark streaming nodes? Is it done by spark client
> with help of spark master similar to hadoop client asking namenodes for the
> list of datanodes?
>