Re: How to close resources shared in executor?
Thanks, Ted. Util.Connection.close() should be called only once, so it can NOT be in a map function val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } As you mentioned: Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Yes, we should call table.close(), but it won’t remove HConnection in HConnectionManager which is a HConnection pool. As I look into the HconnectionManager Javadoc, it seems I have to implement a shutdown hook * pCleanup used to be done inside in a shutdown hook. On startup we'd * register a shutdown hook that called {@link #deleteAllConnections()} * on its way out but the order in which shutdown hooks run is not defined so * were problematic for clients of HConnection that wanted to register their * own shutdown hooks so we removed ours though this shifts the onus for * cleanup to the client. 2014-10-15 22:31 GMT+08:00 Ted Yu yuzhih...@gmail.com: Pardon me - there was typo in previous email. Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Cheers On Wed, Oct 15, 2014 at 7:13 AM, Ted Yu yuzhih...@gmail.com wrote: Have you tried the following ? val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO raofeng...@gmail.com wrote: In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be shared among tasks on the same executor. e.g. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... } However, we don’t how to close the Util.Connection. If we write Util.Connection.close() in the main function, it’ll only run on the driver, not the executor. So, How to make sure every Connection closed before exist?
Problems with ZooKeeper and key canceled
I have a spark cluster on mesos and when I run long running GraphX processing I receive a lot of the following two errors and one by one my slaves stop doing any work for the process until its idle. Any idea what is happening? First type of error message: INFO SendingConnection: Initiating connection INFO SendingConnection: Connected INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@e2e30ea INFO ConnectionManager: Removing SendingConnection INFO ConnectionManager: Removing ReceivingConnection INFO ConnectionManager: Removing SendingConnection INFO ConnectionManager: Removing ReceivingConnection ERROR ConnectionManager: Corresponding SendingConnection ERROR ConnectionManager: Corresponding SendingConnection INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@e2e30ea java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@1968265a INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@1968265a java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) INFO BlockManager: Removing broadcast 95 INFO BlockManager: Removing broadcast 96 INFO BlockManager: Removing broadcast 98 INFO BlockManager: Removing broadcast 101 Second error message: group.cpp:418] Lost connection to ZooKeeper, attempting to reconnect ... slave.cpp:508] Slave asked to shut down by master@10...:5050 because 'health check timed out' slave.cpp:1406] Asked to shut down framework by master@10...:5050 slave.cpp:1431] Shutting down framework slave.cpp:2878] Shutting down executor slave.cpp:3053] Current usage 35.12%. Max allowed age: 3.841638564773842days group.cpp:472] ZooKeeper session expired detector.cpp:138] Detected a new leader: None slave.cpp:582] Lost leading master slave.cpp:636] Detecting new master group.cpp:313] Group process (group(1)@10...:5051) connected to ZooKeeper group.cpp:787] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0) group.cpp:385] Trying to create path '/mesos' in ZooKeeper detector.cpp:138] Detected a new leader: (id='16') slave.cpp:2948] Killing executor containerizer.cpp:882] Destroying container group.cpp:658] Trying to get '/mesos/info_16' in ZooKeeper detector.cpp:426] A new leading master (UPID=master@10...:5050) is detected slave.cpp:589] New master detected at master@10...:5050 slave.cpp:596] Skipping registration because slave is terminating I'm on Spark 1.1 with Mesos 0.20.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problems-with-ZooKeeper-and-key-canceled-tp16541.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to close resources shared in executor?
I may have misunderstood your point. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... table.close() } Did you mean this is enough, and there’s no need to call Util.Connection.close(), or HConnectionManager.deleteAllConnections()? Where is the documentation that statesHconnectionManager would release underlying connection automatically? If that’s true, maybe the Javadoc which recommends a shutdown hook needs update 2014-10-16 14:20 GMT+08:00 Fengyun RAO raofeng...@gmail.com: Thanks, Ted. Util.Connection.close() should be called only once, so it can NOT be in a map function val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } As you mentioned: Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Yes, we should call table.close(), but it won’t remove HConnection in HConnectionManager which is a HConnection pool. As I look into the HconnectionManager Javadoc, it seems I have to implement a shutdown hook * pCleanup used to be done inside in a shutdown hook. On startup we'd * register a shutdown hook that called {@link #deleteAllConnections()} * on its way out but the order in which shutdown hooks run is not defined so * were problematic for clients of HConnection that wanted to register their * own shutdown hooks so we removed ours though this shifts the onus for * cleanup to the client. 2014-10-15 22:31 GMT+08:00 Ted Yu yuzhih...@gmail.com: Pardon me - there was typo in previous email. Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Cheers On Wed, Oct 15, 2014 at 7:13 AM, Ted Yu yuzhih...@gmail.com wrote: Have you tried the following ? val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO raofeng...@gmail.com wrote: In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be shared among tasks on the same executor. e.g. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... } However, we don’t how to close the Util.Connection. If we write Util.Connection.close() in the main function, it’ll only run on the driver, not the executor. So, How to make sure every Connection closed before exist?
RE: Problem executing Spark via JBoss application
Indeed it was a problem on the executor side… I have to figure out how to fix it now ;-) Thanks! Mehdi De : Yana Kadiyska [mailto:yana.kadiy...@gmail.com] Envoyé : mercredi 15 octobre 2014 18:32 À : Mehdi Singer Cc : user@spark.apache.org Objet : Re: Problem executing Spark via JBoss application From this line : Removing executor app-20141015142644-0125/0 because it is EXITED I would guess that you need to examine the executor log to see why the executor actually exited. My guess would be that the executor cannot connect back to your driver. But check the log from the executor. It should be in SPARK_HOME/work/app-id/executor_id/stderr on the worker box, I believe. On Wed, Oct 15, 2014 at 8:56 AM, Mehdi Singer mehdi.sin...@lampiris.bemailto:mehdi.sin...@lampiris.be wrote: Hi, I have a Spark standalone example application which is working fine. I'm now trying to integrate this application into a J2EE application, deployed on JBoss 7.1.1 and accessed via a web service. The JBoss server is installed on my local machine (Windows 7) and the master Spark is remote (Linux). The example simply executes a count on my RDD. When I call the webservice I'm getting the following error at JBoss side when executing the count: 11:48:10,232 ERROR [org.apache.catalina.core.ContainerBase.[jboss.web].[default-host].[/el2-etrm-spark].[ws]] (http--127.0.0.1-8082-3) Servlet.service() pour la servlet ws a généré une exception: java.lang.RuntimeException: org.apache.cxf.interceptor.Fault: Job cancelled because SparkContext was shut down at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:116) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:322) [cxf-api-2.6.9.jar:2.4.3] at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:211) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:213) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:154) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:130) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:221) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doGet(AbstractHTTPServlet.java:146) [cxf-bundle-2.6.2.jar:2.6.2] at javax.servlet.http.HttpServlet.service(HttpServlet.java:734) [jboss-servlet-api_3.0_spec-1.0.0.Final.jar:1.0.0.Final] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:197) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:329) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.orm.jpa.support.OpenEntityManagerInViewFilter.doFilterInternal(OpenEntityManagerInViewFilter.java:180) [spring-orm-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:186) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:160) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:259) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248)
RE: Problem executing Spark via JBoss application
Do you create the application in context of the web service call? Then the application maybe killed after you return from the web service call. However, we would need to see what you do during the web service call, how you invoke the spark application Le 16 oct. 2014 08:50, Mehdi Singer mehdi.sin...@lampiris.be a écrit : Indeed it was a problem on the executor side… I have to figure out how to fix it now ;-) Thanks! Mehdi *De :* Yana Kadiyska [mailto:yana.kadiy...@gmail.com] *Envoyé :* mercredi 15 octobre 2014 18:32 *À :* Mehdi Singer *Cc :* user@spark.apache.org *Objet :* Re: Problem executing Spark via JBoss application From this line : Removing executor app-20141015142644-0125/0 because it is EXITED I would guess that you need to examine the executor log to see why the executor actually exited. My guess would be that the executor cannot connect back to your driver. But check the log from the executor. It should be in SPARK_HOME/work/app-id/executor_id/stderr on the worker box, I believe. On Wed, Oct 15, 2014 at 8:56 AM, Mehdi Singer mehdi.sin...@lampiris.be wrote: Hi, I have a Spark standalone example application which is working fine. I'm now trying to integrate this application into a J2EE application, deployed on JBoss 7.1.1 and accessed via a web service. The JBoss server is installed on my local machine (Windows 7) and the master Spark is remote (Linux). The example simply executes a count on my RDD. When I call the webservice I'm getting the following error at JBoss side when executing the count: 11:48:10,232 ERROR [org.apache.catalina.core.ContainerBase.[jboss.web].[default-host].[/el2-etrm-spark].[ws]] (http--127.0.0.1-8082-3) Servlet.service() pour la servlet ws a généré une exception: java.lang.RuntimeException: org.apache.cxf.interceptor.Fault: Job cancelled because SparkContext was shut down at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:116) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:322) [cxf-api-2.6.9.jar:2.4.3] at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:211) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:213) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:154) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:130) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:221) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doGet(AbstractHTTPServlet.java:146) [cxf-bundle-2.6.2.jar:2.6.2] at javax.servlet.http.HttpServlet.service(HttpServlet.java:734) [jboss-servlet-api_3.0_spec-1.0.0.Final.jar:1.0.0.Final] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:197) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:329) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.orm.jpa.support.OpenEntityManagerInViewFilter.doFilterInternal(OpenEntityManagerInViewFilter.java:180) [spring-orm-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:186) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:160) [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at
Re: How to add HBase dependencies and conf with spark-submit?
Thanks, Soumitra Kumar, I didn’t know why you put hbase-protocol.jar in SPARK_CLASSPATH, while add hbase-protocol.jar, hbase-common.jar, hbase-client.jar, htrace-core.jar in --jar, but it did work. Actually, I put all these four jars in SPARK_CLASSPATH along with HBase conf directory. 2014-10-15 22:39 GMT+08:00 Soumitra Kumar kumar.soumi...@gmail.com: I am writing to HBase, following are my options: export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar spark-submit \ --jars /opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar \ - Original Message - From: Fengyun RAO raofeng...@gmail.com To: user@spark.apache.org, u...@hbase.apache.org Sent: Wednesday, October 15, 2014 6:29:21 AM Subject: Re: How to add HBase dependencies and conf with spark-submit? +user@hbase 2014-10-15 20:48 GMT+08:00 Fengyun RAO raofeng...@gmail.com : We use Spark 1.1, and HBase 0.98.1-cdh5.1.0, and need to read and write an HBase table in Spark program. I notice there are: spark.driver.extraClassPath spark.executor.extraClassPath properties to manage extra ClassPath, over even an deprecated SPARK_CLASSPATH. The problem is what classpath or jars should we append? I can simplely add the whole `hbase classpath`, which is huge, but this leads to dependencies conflict, e.g. HBase uses guava-12 while Spark uses guava-14.
RE: Problem executing Spark via JBoss application
I solved my problem. It was due to a library version used by Spark (snappy-java) that is apparently not compatible with JBoss... I updated the lib version and it's working now. Jörn, this is what I'm doing in my web service call: - Create the Spark context - Create my JavaJdbcRDD - Count the results - Stop the context Do you think it might be dangerous? Do you have recommendations to integrate Spark jobs with web services? Regards, Mehdi De : Jörn Franke [mailto:jornfra...@gmail.com] Envoyé : jeudi 16 octobre 2014 09:22 À : Mehdi Singer Cc : user@spark.apache.org; yana.kadiy...@gmail.com Objet : RE: Problem executing Spark via JBoss application Do you create the application in context of the web service call? Then the application maybe killed after you return from the web service call. However, we would need to see what you do during the web service call, how you invoke the spark application Le 16 oct. 2014 08:50, Mehdi Singer mehdi.sin...@lampiris.bemailto:mehdi.sin...@lampiris.be a écrit : Indeed it was a problem on the executor side… I have to figure out how to fix it now ;-) Thanks! Mehdi De : Yana Kadiyska [mailto:yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com] Envoyé : mercredi 15 octobre 2014 18:32 À : Mehdi Singer Cc : user@spark.apache.orgmailto:user@spark.apache.org Objet : Re: Problem executing Spark via JBoss application From this line : Removing executor app-20141015142644-0125/0 because it is EXITED I would guess that you need to examine the executor log to see why the executor actually exited. My guess would be that the executor cannot connect back to your driver. But check the log from the executor. It should be in SPARK_HOME/work/app-id/executor_id/stderr on the worker box, I believe. On Wed, Oct 15, 2014 at 8:56 AM, Mehdi Singer mehdi.sin...@lampiris.bemailto:mehdi.sin...@lampiris.be wrote: Hi, I have a Spark standalone example application which is working fine. I'm now trying to integrate this application into a J2EE application, deployed on JBoss 7.1.1 and accessed via a web service. The JBoss server is installed on my local machine (Windows 7) and the master Spark is remote (Linux). The example simply executes a count on my RDD. When I call the webservice I'm getting the following error at JBoss side when executing the count: 11:48:10,232 ERROR [org.apache.catalina.core.ContainerBase.[jboss.web].[default-host].[/el2-etrm-spark].[ws]] (http--127.0.0.1-8082-3) Servlet.service() pour la servlet ws a généré une exception: java.lang.RuntimeException: org.apache.cxf.interceptor.Fault: Job cancelled because SparkContext was shut down at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:116) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:322) [cxf-api-2.6.9.jar:2.4.3] at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) [cxf-api-2.6.9.jar:2.6.9] at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:211) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:213) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:154) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:130) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:221) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doGet(AbstractHTTPServlet.java:146) [cxf-bundle-2.6.2.jar:2.6.2] at javax.servlet.http.HttpServlet.service(HttpServlet.java:734) [jboss-servlet-api_3.0_spec-1.0.0.Final.jar:1.0.0.Final] at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:197) [cxf-bundle-2.6.2.jar:2.6.2] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:329) [jbossweb-7.0.13.Final.jar:] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248) [jbossweb-7.0.13.Final.jar:] at org.springframework.orm.jpa.support.OpenEntityManagerInViewFilter.doFilterInternal(OpenEntityManagerInViewFilter.java:180) [spring-orm-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280) [jbossweb-7.0.13.Final.jar:]
Re: distributing Scala Map datatypes to RDD
Wow, it really was that easy! The implicit joining works a treat. Many thanks, Jon On 13 October 2014 22:58, Stephen Boesch java...@gmail.com wrote: is the following what you are looking for? scala sc.parallelize(myMap.map{ case (k,v) = (k,v) }.toSeq) res2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at console:21 2014-10-13 14:02 GMT-07:00 jon.g.massey jon.g.mas...@gmail.com: Hi guys, Just starting out with Spark and following through a few tutorials, it seems the easiest way to get ones source data into an RDD is using the sc.parallelize function. Unfortunately, my local data is in multiple instances of MapK,V types, and the parallelize function only works on objects with the Seq trait, and produces an RDD which seemingly doesn't then have the notion of Keys and Values which I require for joins (amongst other functions). Is there a way of using a SparkContext to create a distributed RDD from a local Map, rather than from a Hadoop or text file source? Thanks, Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/distributing-Scala-Map-datatypes-to-RDD-tp16320.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark can't find jars
Hi, I have created a JIRA (SPARK-3967https://issues.apache.org/jira/browse/SPARK-3967), can you please confirm that you are hit by the same issue? Thanks, Christophe. On 15/10/2014 09:49, Christophe Préaud wrote: Hi Jimmy, Did you try my patch? The problem on my side was that the hadoop.tmp.dir (in hadoop core-site.xml) was not handled properly by Spark when it is set on multiple partitions/disks, i.e.: property namehadoop.tmp.dir/name valuefile:/d1/yarn/local,file:/d2/yarn/local,file:/d3/yarn/local,file:/d4/yarn/local,file:/d5/yarn/local,file:/d6/yarn/local,file:/d7/yarn/local/value /property Hence, you won't be hit by this bug if your hadoop.tmp.dir is set on one partition only. If your hadoop.tmp.dir is also set on several partitions, I agree that it looks like a bug in Spark. Christophe. On 14/10/2014 18:50, Jimmy McErlain wrote: So the only way that I could make this work was to build a fat jar file as suggested earlier. To me (and I am no expert) it seems like this is a bug. Everything was working for me prior to our upgrade to Spark 1.1 on Hadoop 2.2 but now it seems to not... ie packaging my jars locally then pushing them out to the cluster and pointing them to corresponding dependent jars Sorry I cannot be more help! J [https://mailfoogae.appspot.com/t?sender=aamltbXlAc2VsbHBvaW50cy5jb20%3Dtype=zerocontentguid=c1a21a6a-dbf9-453d-8c2a-b5e6a8d5ca56]ᐧ JIMMY MCERLAIN DATA SCIENTIST (NERD) . . . . . . . . . . . . . . . . . . [http://assetsw.sellpoint.net/IA/creative_services/logo_2014/sellpoints_logo_black_transparent_170x81.png] IF WE CAN’T DOUBLE YOUR SALES, ONE OF US IS IN THE WRONG BUSINESS. E: ji...@sellpoints.commailto:ji...@sellpoints.com M: 510.303.7751 On Tue, Oct 14, 2014 at 4:59 AM, Christophe Préaud christophe.pre...@kelkoo.commailto:christophe.pre...@kelkoo.com wrote: Hello, I have already posted a message with the exact same problem, and proposed a patch (the subject is Application failure in yarn-cluster mode). Can you test it, and see if it works for you? I would be glad too if someone can confirm that it is a bug in Spark 1.1.0. Regards, Christophe. On 14/10/2014 03:15, Jimmy McErlain wrote: BTW this has always worked for me before until we upgraded the cluster to Spark 1.1.1... J [https://mailfoogae.appspot.com/t?sender=aamltbXlAc2VsbHBvaW50cy5jb20%3Dtype=zerocontentguid=92430839-642b-4921-8d42-f266e48bcdfe]ᐧ JIMMY MCERLAIN DATA SCIENTIST (NERD) . . . . . . . . . . . . . . . . . . [http://assetsw.sellpoint.net/IA/creative_services/logo_2014/sellpoints_logo_black_transparent_170x81.png] IF WE CAN’T DOUBLE YOUR SALES, ONE OF US IS IN THE WRONG BUSINESS. E: ji...@sellpoints.commailto:ji...@sellpoints.com M: 510.303.7751tel:510.303.7751 On Mon, Oct 13, 2014 at 5:39 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.commailto:aharipriy...@gmail.com wrote: Helo, Can you check if the jar file is available in the target-scala-2.10 folder? When you use sbt package to make the jar file, that is where the jar file would be located. The following command works well for me: spark-submit --class “Classname --master yarn-cluster jarfile(withcomplete path) Can you try checking with this initially and later add other options? On Mon, Oct 13, 2014 at 7:36 PM, Jimmy ji...@sellpoints.commailto:ji...@sellpoints.com wrote: Having the exact same error with the exact same jar Do you work for Altiscale? :) J Sent from my iPhone On Oct 13, 2014, at 5:33 PM, Andy Srine andy.sr...@gmail.commailto:andy.sr...@gmail.com wrote: Hi Guys, Spark rookie here. I am getting a file not found exception on the --jars. This is on the yarn cluster mode and I am running the following command on our recently upgraded Spark 1.1.1 environment. ./bin/spark-submit --verbose --master yarn --deploy-mode cluster --class myEngine --driver-memory 1g --driver-library-path /hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-0.4.18-201406111750.jar --executor-memory 5g --executor-cores 5 --jars /home/andy/spark/lib/joda-convert-1.2.jar --queue default --num-executors 4 /home/andy/spark/lib/my-spark-lib_1.0.jar This is the error I am hitting. Any tips would be much appreciated. The file permissions looks fine on my local disk. 14/10/13 22:49:39 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED 14/10/13 22:49:39 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. Exception in thread Driver java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Re: Application failure in yarn-cluster mode
Hi, I have been able to reproduce this problem on our dev environment, I am fairly sure now that it is indeed a bug. As a consequence, I have created a JIRA (SPARK-3967https://issues.apache.org/jira/browse/SPARK-3967) for this issue, which is triggered when yarn.nodemanager.local-dirs (not hadoop.tmp.dir, as I said below) is set to a comma-separated list of directories which are located on different disks/partitions. Regards, Christophe. On 14/10/2014 09:37, Christophe Préaud wrote: Hi, Sorry to insist, but I really feel like the problem described below is a bug in Spark. Can anybody confirm if it is a bug, or a (configuration?) problem on my side? Thanks, Christophe. On 10/10/2014 18:24, Christophe Préaud wrote: Hi, After updating from spark-1.0.0 to spark-1.1.0, my spark applications failed most of the time (but not always) in yarn-cluster mode (but not in yarn-client mode). Here is my configuration: * spark-1.1.0 * hadoop-2.2.0 And the hadoop.tmp.dir definition in the hadoop core-site.xml file (each directory is on its own partition, on different disks): property namehadoop.tmp.dir/name valuefile:/d1/yarn/local,file:/d2/yarn/local,file:/d3/yarn/local,file:/d4/yarn/local,file:/d5/yarn/local,file:/d6/yarn/local,file:/d7/yarn/local/value /property After investigating, it turns out that the problem is when the executor fetches a jar file: the jar is downloaded in a temporary file, always in /d1/yarn/local (see hadoop.tmp.dir definition above), and then moved in one of the temporary directory defined in hadoop.tmp.dir: * if it is the same than the temporary file (i.e. /d1/yarn/local), then the application continues normally * if it is another one (i.e. /d2/yarn/local, /d3/yarn/local,...), it fails with the following error: 14/10/10 14:33:51 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 0) java.io.FileNotFoundException: ./logReader-1.0.10.jar (Permission denied) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at com.google.common.io.Files$FileByteSink.openStream(Files.java:223) at com.google.common.io.Files$FileByteSink.openStream(Files.java:211) at com.google.common.io.ByteSource.copyTo(ByteSource.java:203) at com.google.common.io.Files.copy(Files.java:436) at com.google.common.io.Files.move(Files.java:651) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:440) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:325) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:323) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:323) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I have no idea why the move fails when the source and target files are not on the same partition, for the moment I have worked around the problem with the attached patch (i.e. I ensure that the temp file and the moved file are always on the same partition). Any thought about this problem? Thanks! Christophe. Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur. Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur. Kelkoo SAS Société par Actions Simplifiée Au capital de €
Re: spark1.0 principal component analysis
Hi, I don't think anybody answered this question... fintis wrote How do I match the principal components to the actual features since there is some sorting? Would anybody be able to shed a little light on it since I too am struggling with this? Many thanks!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-principal-component-analysis-tp9249p16556.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-default.conf description
I'm running Spark1.1.0 on YARN(Hadoop-2.4.1) and try to use spark.yarn.appMasterEnv.* to execute some scripts. In spark-default.conf, I set environment variables like this, but this description is redundant. spark.yarn.appMasterEnv.SCRIPT_DIR /home/kuromtsu/spark-1.1.0/scripts spark.yarn.appMasterEnv.SCRIT1 /home/kuromtsu/spark-1.1.0/scripts/scritp1.sh spark.yarn.appMasterEnv.SCRIT2 /home/kuromtsu/spark-1.1.0/scripts/scritp3.sh spark.yarn.appMasterEnv.SCRIT3 /home/kuromtsu/spark-1.1.0/scripts/scritp4.sh Wold anybody know more simple desciption? Regards, Kuromatsu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting the value from DStream[Int]
you can do score.print to see the values, and if you want to do some operations with these values then you have to do a map on that dstream (score.map(myInt = myInt + 5)) Thanks Best Regards On Thu, Oct 16, 2014 at 5:19 AM, SK skrishna...@gmail.com wrote: Hi, As a result of a reduction operation, the resultant value score is a DStream[Int] . How can I get the simple Int value? I tried score[0], and score._1, but neither worked and can't find a getValue() in the DStream API. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-value-from-DStream-Int-tp16525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: submitted uber-jar not seeing spark-assembly.jar at worker
Hello Owen, I used maven build to make use of the guava collections package renaming, sbt keeps the old Guava package names intact... Finally it turned out that I have just upgraded to the latest version of spark-cassandra-connector: 1.1.0-alpha3 and when I step back to 1.1.0-alpha2 everything started to work again... though I needed the manual build with the Guava-conflict solved. Thanks for you help. On Wed, Oct 15, 2014 at 9:01 AM, Sean Owen so...@cloudera.com wrote: How did you recompile and deploy Spark to your cluster? it sounds like a problem with not getting the assembly deployed correctly, rather than your app. On Tue, Oct 14, 2014 at 10:35 PM, Tamas Sandor tsan...@gmail.com wrote: Hi, I'm rookie in spark, but hope someone can help me out. I'm writing an app that I'm submitting to my spark-master that has a worker on a separate node. It uses spark-cassandra-connector, and since it depends on guava-v16 and it conflicts with the default spark-1.1.0-assembly's guava-v14.1 I built the latest from spark git master (it was fixed in late Sept), so now I have a working spark-assembly-1.2.0-SNAPSHOT-hadoop2.4.0 running. I have my uber-jar that has hadoop-client and spark-assembly as scope:provided, excluded from the deployed jar and than it gets submitted to a spark-master from the node. From the logs I see taskSetManager throws me an error coming from my worker node saying java.lang.NoClassDefFoundError:org/apache/spark/Partition - I guess valid since my jar has no spark deps inline (uber) but why it cannot see the workers classpath - this what a provided scope would mean here? How can I fix that? Am I missing something obvious? Thank you for your help. Regards, Tamas
GraphX Performance
Hi, I am writting to know if there is any performance data on GraphX? I run 4 workes in AWS (c3.xlarge), 4g memory for executor, 85,331,846 edges from( http://socialcomputing.asu.edu/pages/dataset http://socialcomputing.asu.edu/pages/datasetss). For PageRank algorithm, the job can not be completed withon 1 hour. I am wondering if my program has some problem or it's peformance issue. Please suggest if you have such experience. Thanks! Jarred
Re: How to write data into Hive partitioned Parquet table?
Support for dynamic partitioning is available in master and will be part of Spark 1.2 On Thu, Oct 16, 2014 at 1:08 AM, Banias H banias4sp...@gmail.com wrote: I got tipped by an expert that the error of Unsupported language features in query that I had was due to the fact that SparkSQL does not support dynamic partitions, and I can do saveAsParquetFile() for each partition. My inefficient implementation is to: //1. run the query without DISTRIBUTE BY field1 SORT BY field2. JavaSchemaRDD rawRdd = hiveCtx.sql(INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table); rawRdd.registerAsTempTable(temp); //2. Get a list of unique partition_field values JavaSchemaRDD partFieldsRdd = hiveCtx.sql(SELECT DISTINCT partition_field FROM temp); //3. Iterate each partition_field value. Run a query to get JavaSchemaRDD. Then save the result as ParquetFile for (Row row : partFieldsRdd.toArray()) { String partitionVal = row.toString(0); hiveCtx.sql(SELECT * FROM temp WHERE partition_field=+partitionVal). saveAsParquetFile(partition_field=+partitionVal); } It ran and produced the desired output. However Hive runs orders of magnitude faster than the code above. Anyone who can shed some lights on a more efficient implementation is much appreciated. Many thanks. Regards, BH On Tue, Oct 14, 2014 at 8:44 PM, Banias H banias4sp...@gmail.com wrote: Hi, I am still new to Spark. Sorry if similar questions are asked here before. I am trying to read a Hive table; then run a query and save the result into a Hive partitioned Parquet table. For example, I was able to run the following in Hive: INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2 But when I tried running it in spark-sql, it gave me the following error: java.lang.RuntimeException: Unsupported language features in query: INSERT INTO TABLE ... I also tried the following Java code and I saw the same error: SparkConf sparkConf = new SparkConf().setAppName(Example); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaHiveContext hiveCtx = new JavaHiveContext(ctx); JavaSchemaRDD rdd = hiveCtx.sql(INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2); ... rdd.count(); //Just for running the query If I take out INSERT INTO TABLE target_table PARTITION (partition_field) from the sql statement and run that in hiveCtx.sql(), I got a RDD but I only seem to do rdd.saveAsParquetFile(target_table_location). But that is not partitioned correctly. Any help is much appreciated. Thanks. Regards, BH
Unit testing: Mocking out Spark classes
Hello all, I am trying to unit test my classes involved my Spark job. I am trying to mock out the Spark classes (like SparkContext and Broadcast) so that I can unit test my classes in isolation. However I have realised that these are classes instead of traits. My first question is why? It is quite hard to mock out classes using ScalaTest+ScalaMock as the classes which need to be mocked out need to be annotated with org.scalamock.annotation.mock as per http://www.scalatest.org/user_guide/testing_with_mock_objects#generatedMocks. I cannot do that in my case as I am trying to mock out the spark classes. Am I missing something? Is there a better way to do this? val sparkContext = mock[SparkInteraction] val trainingDatasetLoader = mock[DatasetLoader] val broadcastTrainingDatasetLoader = mock[Broadcast[DatasetLoader]] def transformerFunction(source: Iterator[(HubClassificationData, String)]): Iterator[String] = { source.map(_._2) } val classificationResultsRDD = mock[RDD[String]] val classificationResults = Array(,,) val inputRDD = mock[RDD[(HubClassificationData, String)]] inSequence{ inAnyOrder{ (sparkContext.broadcast[DatasetLoader] _).expects(trainingDatasetLoader).returns(broadcastTrainingDatasetLoader) } } val sparkInvoker = new SparkJobInvoker(sparkContext, trainingDatasetLoader) when(inputRDD.mapPartitions(transformerFunction)).thenReturn(classificationResultsRDD) sparkInvoker.invoke(inputRDD) Thanks, Saket
Re: SparkSQL: set hive.metastore.warehouse.dir in CLI doesn't work
The warehouse location need to be specified before the |HiveContext| initialization, you can set it via: |./bin/spark-sql --hiveconf hive.metastore.warehouse.dir=/home/spark/hive/warehouse | On 10/15/14 8:55 PM, Hao Ren wrote: Hi, The following query in sparkSQL 1.1.0 CLI doesn't work. *SET hive.metastore.warehouse.dir=/home/spark/hive/warehouse ; create table test as select v1.*, v2.card_type, v2.card_upgrade_time_black, v2.card_upgrade_time_gold from customer v1 left join customer_loyalty v2 on v1.account_id = v2.account_id limit 5 ;* StackTrack = org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(*message:file:/user/hive/warehouse/test* is not a directory or unable to create one) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:602) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:559) at org.apache.spark.sql.hive.HiveMetastoreCatalog.createTable(HiveMetastoreCatalog.scala:99) at org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:116) at org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:111) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) at org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$.apply(HiveMetastoreCatalog.scala:111) at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358) at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:98) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:58) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:291) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:413) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: MetaException(message:file:/user/hive/warehouse/test is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1060) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1107) at sun.reflect.GeneratedMethodAccessor133.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103) at com.sun.proxy.$Proxy15.create_table_with_environment_context(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:482) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:471) at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy16.createTable(Unknown Source) at
Re: YARN deployment of Spark and Thrift JDBC server
On 10/16/14 12:44 PM, neeraj wrote: I would like to reiterate that I don't have Hive installed on the Hadoop cluster. I have some queries on following comment from Cheng Lian-2: The Thrift server is used to interact with existing Hive data, and thus needs Hive Metastore to access Hive catalog. In your case, you need to build Spark with sbt/sbt -Phive,hadoop-2.4 clean package. But since you’ve already started Thrift server successfully, this step should already have been done properly. 1. Even though, I don't have Hive installed, How can I connect my application (Microsoft Excel etc.) to Spark SQL. Do I must have Hive installed. Are you trying to use Excel as a data source of Spark SQL, or using Spark SQL as a data source of Excel? You can use Spark SQL in your own Spark applications without involving Hive, but the Thrift server is designed to interact to existing Hive data. Actually it's just a HiveServer2 port for Spark SQL. 2. Where can I download/get Spark SQL JDBC/ODBC drivers as I could not find it on databricks site. 3. Could somebody point me to steps to connect Excel with Spark SQL and get some data SQL. Is this possible at all. I think this article from Denny Lee can be helpful, although it's about Tableau rather than Excel: https://www.concur.com/blog/en-us/connect-tableau-to-sparksql 4. Which all applications can be used to connect Spark SQL. In theory, all applications that support ODBC/JDBC can connect to Spark SQL. Regards, Neeraj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/YARN-deployment-of-Spark-and-Thrift-JDBC-server-tp16374p16537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SparkSQL] Convert JavaSchemaRDD to SchemaRDD
Why do you need to convert a JavaSchemaRDD to SchemaRDD? Are you trying to use some API that doesn't exist in JavaSchemaRDD? On 10/15/14 5:50 PM, Earthson wrote: I don't know why the JavaSchemaRDD.baseSchemaRDD is private[sql]. And I found that DataTypeConversions is protected[sql]. Finally I find this solution: pre code jrdd.registerTempTable(transform_tmp) jrdd.sqlContext.sql(select * from transform_tmp) /code /pre Could Any One tell me that: Is it a good idea for me to *use catalyst as DSL's execution engine?* I am trying to build a DSL, And I want to confirm this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Convert-JavaSchemaRDD-to-SchemaRDD-tp16482.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL IndexOutOfBoundsException when reading from Parquet
Hello Terry, I guess you hit this bug https://issues.apache.org/jira/browse/SPARK-3559. The list of needed column ids was messed up. Can you try the master branch or apply the code change https://github.com/apache/spark/commit/e10d71e7e58bf2ec0f1942cb2f0602396ab866b4 to your 1.1 and see if the problem is resolved? Thanks, Yin On Wed, Oct 15, 2014 at 12:08 PM, Terry Siu terry@smartfocus.com wrote: Hi Yin, pqt_rdt_snappy has 76 columns. These two parquet tables were created via Hive 0.12 from existing Avro data using CREATE TABLE following by an INSERT OVERWRITE. These are partitioned tables - pqt_rdt_snappy has one partition while pqt_segcust_snappy has two partitions. For pqt_segcust_snappy, I noticed that when I populated it with a single INSERT OVERWRITE over all the partitions and then executed the Spark code, it would report an illegal index value of 29. However, if I manually did INSERT OVERWRITE for every single partition, I would get an illegal index value of 21. I don’t know if this will help in debugging, but here’s the DESCRIBE output for pqt_segcust_snappy: OK col_namedata_type comment customer_id string from deserializer age_range string from deserializer gender string from deserializer last_tx_datebigint from deserializer last_tx_date_ts string from deserializer last_tx_date_dt string from deserializer first_tx_date bigint from deserializer first_tx_date_tsstring from deserializer first_tx_date_dtstring from deserializer second_tx_date bigint from deserializer second_tx_date_ts string from deserializer second_tx_date_dt string from deserializer third_tx_date bigint from deserializer third_tx_date_tsstring from deserializer third_tx_date_dtstring from deserializer frequency double from deserializer tx_size double from deserializer recency double from deserializer rfm double from deserializer tx_countbigint from deserializer sales double from deserializer coll_def_id string None seg_def_id string None # Partition Information # col_name data_type comment coll_def_id string None seg_def_id string None Time taken: 0.788 seconds, Fetched: 29 row(s) As you can see, I have 21 data columns, followed by the 2 partition columns, coll_def_id and seg_def_id. Output shows 29 rows, but that looks like it’s just counting the rows in the console output. Let me know if you need more information. Thanks -Terry From: Yin Huai huaiyin@gmail.com Date: Tuesday, October 14, 2014 at 6:29 PM To: Terry Siu terry@smartfocus.com Cc: Michael Armbrust mich...@databricks.com, user@spark.apache.org user@spark.apache.org Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet Hello Terry, How many columns does pqt_rdt_snappy have? Thanks, Yin On Tue, Oct 14, 2014 at 11:52 AM, Terry Siu terry@smartfocus.com wrote: Hi Michael, That worked for me. At least I’m now further than I was. Thanks for the tip! -Terry From: Michael Armbrust mich...@databricks.com Date: Monday, October 13, 2014 at 5:05 PM To: Terry Siu terry@smartfocus.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet There are some known bug with the parquet serde and spark 1.1. You can try setting spark.sql.hive.convertMetastoreParquet=true to cause spark sql to use built in parquet support when the serde looks like parquet. On Mon, Oct 13, 2014 at 2:57 PM, Terry Siu terry@smartfocus.com wrote: I am currently using Spark 1.1.0 that has been compiled against Hadoop 2.3. Our cluster is CDH5.1.2 which is runs Hive 0.12. I have two external Hive tables that point to Parquet (compressed with Snappy), which were converted over from Avro if that matters. I am trying to perform a join with these two Hive tables, but am encountering an exception. In a nutshell, I launch a spark shell, create my HiveContext (pointing to the correct metastore on our cluster), and then proceed to do the following: scala val hc = new HiveContext(sc) scala val txn = hc.sql(“select * from pqt_rdt_snappy where transdate = 132537600 and translate = 134006399”)
Re: Play framework
We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome App) // call conf.set for any other configuration you want val sc = new SparkContext(sparkConf) It is important to keep in mind you cannot have multiple local contexts (you can create them but you'll get odd errors), so if you are running things in parallel within your app (even unit tests) you'd need to share a context in this case. If you are running sequentially you can create a new local context each time, but you must make sure to call SparkContext.stop() when you're done. Running against a cluster is a bit more complicated because you need to add all your dependency jars. I'm not sure how to get this to work with play run. I stick to building the app with play dist and then running against the packaged application, because it very conveniently provides all the dependencies in a lib folder. Here is some code to load all the paths you need from the dist: def libs : Seq[String] = { val libDir = play.api.Play.application.getFile(lib) logger.info(sSparkContext will be initialized with libraries from directory $libDir) return if ( libDir.exists ) { libDir.listFiles().map(_.getCanonicalFile().getAbsolutePath()).filter(_.endsWith(.jar)) } else { throw new IllegalStateException(slib dir is missing: $libDir) } } Creating the context is similar to above, but with this extra line: conf.setJars(libs) I hope this helps. I should note that I don't use play run very much, at least not for when I'm actually executing Spark jobs. So I'm not sure if this integrates properly with that. I have unit tests which execute on Spark and have executed the dist package both locally and on a cluster. To make working with the dist locally easier, I wrote myself a little shell script to unzip and run the dist. On Wed, Oct 15, 2014 at 10:51 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – Has anybody figured out how to integrate a Play application with Spark and run it on a Spark cluster using spark-submit script? I have seen some blogs about creating a simple Play app and running it locally on a dev machine with sbt run command. However, those steps don’t work for Spark-submit. If you have figured out how to build and run a Play app with Spark-submit, I would appreciate if you could share the steps and the sbt settings for your Play app. Thanks, Mohammed -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: Unit testing: Mocking out Spark classes
Mocking these things is difficult; executing your unit tests in a local Spark context is preferred, as recommended in the programming guide http://spark.apache.org/docs/latest/programming-guide.html#unit-testing. I know this may not technically be a unit test, but it is hopefully close enough. You can load your test data using SparkContext.parallelize and retrieve the data (for verification) using RDD.collect. On Thu, Oct 16, 2014 at 9:07 AM, Saket Kumar saket.ku...@bgch.co.uk wrote: Hello all, I am trying to unit test my classes involved my Spark job. I am trying to mock out the Spark classes (like SparkContext and Broadcast) so that I can unit test my classes in isolation. However I have realised that these are classes instead of traits. My first question is why? It is quite hard to mock out classes using ScalaTest+ScalaMock as the classes which need to be mocked out need to be annotated with org.scalamock.annotation.mock as per http://www.scalatest.org/user_guide/testing_with_mock_objects#generatedMocks. I cannot do that in my case as I am trying to mock out the spark classes. Am I missing something? Is there a better way to do this? val sparkContext = mock[SparkInteraction] val trainingDatasetLoader = mock[DatasetLoader] val broadcastTrainingDatasetLoader = mock[Broadcast[DatasetLoader]] def transformerFunction(source: Iterator[(HubClassificationData, String)]): Iterator[String] = { source.map(_._2) } val classificationResultsRDD = mock[RDD[String]] val classificationResults = Array(,,) val inputRDD = mock[RDD[(HubClassificationData, String)]] inSequence{ inAnyOrder{ (sparkContext.broadcast[DatasetLoader] _).expects(trainingDatasetLoader).returns(broadcastTrainingDatasetLoader) } } val sparkInvoker = new SparkJobInvoker(sparkContext, trainingDatasetLoader) when(inputRDD.mapPartitions(transformerFunction)).thenReturn(classificationResultsRDD) sparkInvoker.invoke(inputRDD) Thanks, Saket -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Help required on exercise Data Exploratin using Spark SQL
Hi, I'm exploring an exercise Data Exploratin using Spark SQL from Spark Summit 2014. While running command val wikiData = sqlContext.parquetFile(data/wiki_parquet).. I'm getting the following output which doesn't match with the expected output. *Output i'm getting*: val wikiData1 = sqlContext.parquetFile(/data/wiki_parquet/part-r-1.parquet) 14/10/16 19:26:49 INFO parquet.ParquetTypesConverter: Falling back to schema conversion from Parquet types; result: ArrayBuffer(id#5, title#6, modified#7L, text#8, username#9) wikiData1: org.apache.spark.sql.SchemaRDD = SchemaRDD[1] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == ParquetTableScan [id#5,title#6,modified#7L,text#8,username#9], (ParquetRelation /data/wiki_parquet/part-r-1.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@27a5dac0, []), [] *Expected Output*: wikiData: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == ParquetTableScan [id#0,title#1,modified#2L,text#3,username#4], (ParquetRelation data/wiki_parquet), [] Please help with the possible issue. I'm using pre-built package of Spark with Hadoop 2.4 Please let me know in case of more information is required. Regards, Neeraj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-required-on-exercise-Data-Exploratin-using-Spark-SQL-tp16569.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Larger heap leads to perf degradation due to GC
I just want to pitch in and say that I ran into the same problem with running with 64GB executors. For example, some of the tasks take 5 minutes to execute, out of which 4 minutes are spent in GC. I'll try out smaller executors. On Mon, Oct 6, 2014 at 6:35 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, The other option to consider is using G1 GC, which should behave better with large heaps. But pointers are not compressed in heaps 32 GB in size, so you may be better off staying under 32 GB. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Oct 6, 2014 at 8:08 PM, Mingyu Kim m...@palantir.com wrote: Ok, cool. This seems to be general issues in JVM with very large heaps. I agree that the best workaround would be to keep the heap size below 32GB. Thanks guys! Mingyu From: Arun Ahuja aahuj...@gmail.com Date: Monday, October 6, 2014 at 7:50 AM To: Andrew Ash and...@andrewash.com Cc: Mingyu Kim m...@palantir.com, user@spark.apache.org user@spark.apache.org, Dennis Lawler dlaw...@palantir.com Subject: Re: Larger heap leads to perf degradation due to GC We have used the strategy that you suggested, Andrew - using many workers per machine and keeping the heaps small ( 20gb). Using a large heap resulted in workers hanging or not responding (leading to timeouts). The same dataset/job for us will fail (most often due to akka disassociated or fetch failures errors) with 10 cores / 100 executors, 60 gb per executor while succceed with 1 core / 1000 executors / 6gb per executor. When the job does succceed with more cores per executor and larger heap it is usually much slower than the smaller executors (the same 8-10 min job taking 15-20 min to complete) The unfortunate downside of this has been, we have had some large broadcast variables which may not fit into memory (and unnecessarily duplicated) when using the smaller executors. Most of this is anecdotal but for the most part we have had more success and consistency with more executors with smaller memory requirements. On Sun, Oct 5, 2014 at 7:20 PM, Andrew Ash and...@andrewash.com wrote: Hi Mingyu, Maybe we should be limiting our heaps to 32GB max and running multiple workers per machine to avoid large GC issues. For a 128GB memory, 32 core machine, this could look like: SPARK_WORKER_INSTANCES=4 SPARK_WORKER_MEMORY=32 SPARK_WORKER_CORES=8 Are people running with large (32GB+) executor heaps in production? I'd be curious to hear if so. Cheers! Andrew On Thu, Oct 2, 2014 at 1:30 PM, Mingyu Kim m...@palantir.com wrote: This issue definitely needs more investigation, but I just wanted to quickly check if anyone has run into this problem or has general guidance around it. We’ve seen a performance degradation with a large heap on a simple map task (I.e. No shuffle). We’ve seen the slowness starting around from 50GB heap. (I.e. spark.executor.memoty=50g) And, when we checked the CPU usage, there were just a lot of GCs going on. Has anyone seen a similar problem? Thanks, Mingyu
Spark SQL DDL, DML commands
Hi, Does Spark SQL have DDL, DML commands to be executed directly. If yes, please share the link. If No, please help me understand why is it not there? Regards, Neeraj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DDL-DML-commands-tp16572.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to close resources shared in executor?
Which hbase release are you using ? Let me refer to 0.94 code hbase. Take a look at the following method in src/main/java/org/apache/hadoop/hbase/client/HTable.java : public void close() throws IOException { ... if (cleanupConnectionOnClose) { if (this.connection != null) { this.connection.close(); When Connection.getTable() is called, the following is invoked: public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool) which sets cleanupConnectionOnClose to true. w.r.t. javadoc, the paragraph on shutdown hook is in HConnectionManager.java of 0.94 You don't need to use shutdown hook for 0.94+ Cheers On Wed, Oct 15, 2014 at 11:41 PM, Fengyun RAO raofeng...@gmail.com wrote: I may have misunderstood your point. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... table.close() } Did you mean this is enough, and there’s no need to call Util.Connection.close(), or HConnectionManager.deleteAllConnections()? Where is the documentation that statesHconnectionManager would release underlying connection automatically? If that’s true, maybe the Javadoc which recommends a shutdown hook needs update 2014-10-16 14:20 GMT+08:00 Fengyun RAO raofeng...@gmail.com: Thanks, Ted. Util.Connection.close() should be called only once, so it can NOT be in a map function val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } As you mentioned: Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Yes, we should call table.close(), but it won’t remove HConnection in HConnectionManager which is a HConnection pool. As I look into the HconnectionManager Javadoc, it seems I have to implement a shutdown hook * pCleanup used to be done inside in a shutdown hook. On startup we'd * register a shutdown hook that called {@link #deleteAllConnections()} * on its way out but the order in which shutdown hooks run is not defined so * were problematic for clients of HConnection that wanted to register their * own shutdown hooks so we removed ours though this shifts the onus for * cleanup to the client. 2014-10-15 22:31 GMT+08:00 Ted Yu yuzhih...@gmail.com: Pardon me - there was typo in previous email. Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Cheers On Wed, Oct 15, 2014 at 7:13 AM, Ted Yu yuzhih...@gmail.com wrote: Have you tried the following ? val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO raofeng...@gmail.com wrote: In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be shared among tasks on the same executor. e.g. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... } However, we don’t how to close the Util.Connection. If we write Util.Connection.close() in the main function, it’ll only run on the driver, not the executor. So, How to make sure every Connection closed before exist?
Re: YARN deployment of Spark and Thrift JDBC server
1. I'm trying to use Spark SQL as data source.. is it possible? 2. Please share the link of ODBC/ JDBC drivers at databricks.. i'm not able to find the same. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/YARN-deployment-of-Spark-and-Thrift-JDBC-server-tp16374p16571.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
TaskNotSerializableException when running through Spark shell
Hi, Can anyone explain how things get captured in a closure when runing through the REPL. For example: def foo(..) = { .. } rdd.map(foo) sometimes complains about classes not being serializable that are completely unrelated to foo. This happens even when I write it such: object Foo { def foo(..) = { .. } } rdd.map(Foo.foo) It also doesn't happen all the time.
Re: YARN deployment of Spark and Thrift JDBC server
On 10/16/14 10:48 PM, neeraj wrote: 1. I'm trying to use Spark SQL as data source.. is it possible? Unfortunately Spark SQL ODBC/JDBC support are based on the Thrift server, so at least you need HDFS and a working Hive Metastore instance (used to persist catalogs) to make things work. 2. Please share the link of ODBC/ JDBC drivers at databricks.. i'm not able to find the same. Sorry, forgot to mention that Denny's article mentioned the ODBC driver link: http://www.datastax.com/download#dl-datastax-drivers For JDBC access, you can just use Hive 0.12.0 JDBC driver, the Thrift server is compatible with it. P.S. The ODBC driver is not from Databricks, but provided by 3rd party companies like DataStax and Simba. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/YARN-deployment-of-Spark-and-Thrift-JDBC-server-tp16374p16571.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help required on exercise Data Exploratin using Spark SQL
Hi Neeraj, The Spark Summit 2014 tutorial uses Spark 1.0. I guess you're using Spark 1.1? Parquet support got polished quite a bit since then, and changed the string representation of the query plan, but this output should be OK :) Cheng On 10/16/14 10:45 PM, neeraj wrote: Hi, I'm exploring an exercise Data Exploratin using Spark SQL from Spark Summit 2014. While running command val wikiData = sqlContext.parquetFile(data/wiki_parquet).. I'm getting the following output which doesn't match with the expected output. *Output i'm getting*: val wikiData1 = sqlContext.parquetFile(/data/wiki_parquet/part-r-1.parquet) 14/10/16 19:26:49 INFO parquet.ParquetTypesConverter: Falling back to schema conversion from Parquet types; result: ArrayBuffer(id#5, title#6, modified#7L, text#8, username#9) wikiData1: org.apache.spark.sql.SchemaRDD = SchemaRDD[1] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == ParquetTableScan [id#5,title#6,modified#7L,text#8,username#9], (ParquetRelation /data/wiki_parquet/part-r-1.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@27a5dac0, []), [] *Expected Output*: wikiData: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == ParquetTableScan [id#0,title#1,modified#2L,text#3,username#4], (ParquetRelation data/wiki_parquet), [] Please help with the possible issue. I'm using pre-built package of Spark with Hadoop 2.4 Please let me know in case of more information is required. Regards, Neeraj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-required-on-exercise-Data-Exploratin-using-Spark-SQL-tp16569.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Folding an RDD in order
Hi, I'm working on a problem where I'd like to sum items in an RDD *in order (* approximately*)*. I am currently trying to implement this using a fold, but I'm having some issues because the sorting key of my data is not the same as the folding key for my data. I have data that looks like this: user_id, transaction_timestamp, transaction_amount And I'm interested in doing a foldByKey on user_id to sum transaction amounts - taking care to note approximately when a user surpasses a total transaction threshold. I'm using RangePartitioner to make sure that data is ordered sequentially between partitions, and I'd also make sure that data is sorted within partitions, though I'm not sure how to do this exactly (I was going to look at the code for sortByKey to figure this out - I believe sorting in place in a mapPartitions should work). What do you think about the approach? Here's some sample code that demonstrates what I'm thinking: def myFold(V1:Float, V2:Float) : Float = { val partialSum = V1 + V2 if (partialSum = 500) { // make a note of it, do things } return partialSum } val rawData = sc.textFile(hdfs://path/to/data).map{ x = // load data l = x.split() (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, transaction_timestamp:long, transaction_amount:float } val keyByTimestamp = rawData.map(x= (x._2, (x._1, x._3))) // rearrange to make timestamp the key (for sorting), convert to PairRDD val sortedByTimestamp = keyByTimestamp.sortByKey() val partitionedByTimestamp = sortedByTimestamp.partitionBy( new org.apache.spark.RangePartitioner(partitions=500, rdd=sortedByTimestamp)).persist() // By this point, the RDD should be sorted and partitioned according to the timestamp. However, I need to now make user_id the key, // because the output must be per user. At this point, since I change the keys of the PairRDD, I understand that I lose the partitioning // the consequence of this is that I can no longer be sure in my fold function that the ordering is retained. val keyByUser = partitionedByTimestamp.map(x = (x._2._1, x._2._2)) val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold) finalResult.saveAsTextFile(hdfs://...) The problem as you'd expect takes place in the folding function, after I've re-arranged my RDD to no longer be keyed by timestamp (when I produce keyByUser, I lose the correct partitioning). As I've read in the documentation, partitioning is not preserved when keys are changed (which makes sense). Reading this thread: https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 it appears that one possible solution might be to subclass RDD (à la MappedValuesRDD) to define my own RDD that retains the partitions of its parent. This seems simple enough, but I've never done anything like that before, but I'm not sure where to start. I'm also willing to write my own custom partitioner class, but it appears that the getPartition method only accepts a key argument - and since the value I need to partition on in the final step (the timestamp) would be in the Value, my partitioner class doesn't have the data it needs to make the right decision. I cannot have timestamp in my key. Alternatively, has anyone else encountered a problem like this (i.e. an approximately ordered sum) and did they find a good solution? Does my approach of subclassing RDD make sense? Would there be some way to finagle a custom partitioner into making this work? Perhaps this might be a job for some other tool, like spark streaming? Thanks, Michael
PySpark Error on Windows with sc.wholeTextFiles
Hi, I'm running into an error on Windows (x64, 8.1) running Spark 1.1.0 (pre-builet for Hadoop 2.4: spark-1.1.0-bin-hadoop2.4.tgzhttp://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop2.4.tgz) with Java SE Version 8 Update 20 (build 1.8.0_20-b26); just getting started with Spark. When running sc.wholeTextFiles() on a directory, I can run the command but not do anything with the resulting RDD - specifically, I get an error in py4j.protocol.Py4JJavaError; the error is unspecified, though the location is included. I've attached the traceback below. In this situation, I'm trying to load all files from a folder on the local filesystem, located at D:\testdata. The folder contains one file, which can be loaded successfully with sc.textFile(d:/testdata/filename) - no problems at all - so I do not believe the file is throwing the error. Is there any advice on what I should look at further to isolate or fix the error? Am I doing something obviously wrong? Thanks, Michael Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Python version 2.7.7 (default, Jun 11 2014 10:40:02) SparkContext available as sc. file = sc.textFile(d:/testdata/cbcc5b470ec06f212990c68c8f76e887b884) file.count() 732 file.first() u'!DOCTYPE html' data = sc.wholeTextFiles('d:/testdata') data.first() Traceback (most recent call last): File stdin, line 1, in module File D:\spark\python\pyspark\rdd.py, line 1167, in first return self.take(1)[0] File D:\spark\python\pyspark\rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File D:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py, line 538, in __call__ File D:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o21.partitions. : java.lang.NullPointerException at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:445) at org.apache.hadoop.util.Shell.run(Shell.java:418) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650) at org.apache.hadoop.util.Shell.execCommand(Shell.java:739) at org.apache.hadoop.util.Shell.execCommand(Shell.java:722) at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097) at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:559) at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:534) at org.apache.hadoop.fs.LocatedFileStatus.init(LocatedFileStatus.java:42) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1697) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1679) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:302) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:263) at org.apache.spark.input.WholeTextFileInputFormat.setMaxSplitSize(WholeTextFileInputFormat.scala:54) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaPairRDD.partitions(JavaPairRDD.scala:44) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) data.count() Traceback (most recent call last): File stdin, line 1, in module File D:\spark\python\pyspark\rdd.py, line 847, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File D:\spark\python\pyspark\rdd.py, line 838, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File D:\spark\python\pyspark\rdd.py, line 759, in reduce vals = self.mapPartitions(func).collect() File
Re: Spark SQL DDL, DML commands
what is your meaning of executed directly”? Best Regards, Yi Tian tianyi.asiai...@gmail.com On Oct 16, 2014, at 22:50, neeraj neeraj_gar...@infosys.com wrote: Hi, Does Spark SQL have DDL, DML commands to be executed directly. If yes, please share the link. If No, please help me understand why is it not there? Regards, Neeraj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DDL-DML-commands-tp16572.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Running an action inside a loop across multiple RDDs + java.io.NotSerializableException
Hi, my programming model requires me to generate multiple RDDs for various datasets across a single run and then run an action on it - E.g. MyFunc myFunc = ... //It implements VoidFunction //set some extra variables - all serializable ... for (JavaRDDString rdd: rddList) { ... sc.foreach(myFunc); } The problem I'm seeing is that after the first run of the loop - which succeeds on foreach, the second one fails with java.io.NotSerializableException for a specific object I'm setting. In my particular case, the object contains a reference to org.apache.hadoop.conf.Configuration. Question is: 1. Why does this succeed the first time, and fail the second? 2. Any alternatives to this programming model? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-an-action-inside-a-loop-across-multiple-RDDs-java-io-NotSerializableException-tp16580.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Standalone Apps and ClassNotFound
I'm relatively new to Spark and have got a couple of questions: * I've got an IntelliJ SBT project that's using Spark Streaming with a custom RabbitMQ receiver in the same project. When I run it against local[2], all's well. When I put in spark://masterip:7077, I get a ClassNotFoundException for RmqReceiver (the name of the custom receiver). Note, this is being executed inside IntelliJ, and no jars are built in the target folder. I guess using spark-submit would work, but was wondering if there's a way to simply run the app in IntelliJ and have it work. * I see there's an sc.addJars(..) method that would (I imagine) submit additional jars. Is there a way for it to submit the current project's classes as well. Or would building and submitting the package take care of this? Any pointers are appreciated. Regards, Ashic.
Re: Running an action inside a loop across multiple RDDs + java.io.NotSerializableException
Excuse me - the line inside the loop should read: rdd.foreach(myFunc) - not sc. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-an-action-inside-a-loop-across-multiple-RDDs-java-io-NotSerializableException-tp16580p16581.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Folding an RDD in order
Hi Michael, I'm not sure I fully understood your question, but I think RDD.aggregate can be helpful in your case. You can see it as a more general version of fold. Cheng On 10/16/14 11:15 PM, Michael Misiewicz wrote: Hi, I'm working on a problem where I'd like to sum items in an RDD /in order (/approximately/)/. I am currently trying to implement this using a fold, but I'm having some issues because the sorting key of my data is not the same as the folding key for my data. I have data that looks like this: user_id, transaction_timestamp, transaction_amount And I'm interested in doing a foldByKey on user_id to sum transaction amounts - taking care to note approximately when a user surpasses a total transaction threshold. I'm using RangePartitioner to make sure that data is ordered sequentially between partitions, and I'd also make sure that data is sorted within partitions, though I'm not sure how to do this exactly (I was going to look at the code for sortByKey to figure this out - I believe sorting in place in a mapPartitions should work). What do you think about the approach? Here's some sample code that demonstrates what I'm thinking: def myFold(V1:Float, V2:Float) : Float = { val partialSum = V1 + V2 if (partialSum = 500) { // make a note of it, do things } return partialSum } val rawData = sc.textFile(hdfs://path/to/data).map{ x = // load data l = x.split() (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, transaction_timestamp:long, transaction_amount:float } val keyByTimestamp = rawData.map(x= (x._2, (x._1, x._3))) // rearrange to make timestamp the key (for sorting), convert to PairRDD val sortedByTimestamp = keyByTimestamp.sortByKey() val partitionedByTimestamp = sortedByTimestamp.partitionBy( new org.apache.spark.RangePartitioner(partitions=500, rdd=sortedByTimestamp)).persist() // By this point, the RDD should be sorted and partitioned according to the timestamp. However, I need to now make user_id the key, // because the output must be per user. At this point, since I change the keys of the PairRDD, I understand that I lose the partitioning // the consequence of this is that I can no longer be sure in my fold function that the ordering is retained. val keyByUser = partitionedByTimestamp.map(x = (x._2._1, x._2._2)) val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold) finalResult.saveAsTextFile(hdfs://...) The problem as you'd expect takes place in the folding function, after I've re-arranged my RDD to no longer be keyed by timestamp (when I produce keyByUser, I lose the correct partitioning). As I've read in the documentation, partitioning is not preserved when keys are changed (which makes sense). Reading this thread: https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 https://groups.google.com/forum/#%21topic/spark-users/Fx7DNtWiSx4 it appears that one possible solution might be to subclass RDD (à la MappedValuesRDD) to define my own RDDthat retains the partitions of its parent. This seems simple enough, but I've never done anything like that before, but I'm not sure where to start. I'm also willing to write my own custom partitioner class, but it appears that the getPartitionmethod only accepts a key argument - and since the value I need to partition on in the final step (the timestamp) would be in the Value, my partitioner class doesn't have the data it needs to make the right decision. I cannot have timestamp in my key. Alternatively, has anyone else encountered a problem like this (i.e. an approximately ordered sum) and did they find a good solution? Does my approach of subclassing RDDmake sense? Would there be some way to finagle a custom partitioner into making this work? Perhaps this might be a job for some other tool, like spark streaming? Thanks, Michael
Re: Spark SQL DDL, DML commands
I guess you're referring to the simple SQL dialect recognized by the SqlParser component. Spark SQL supports most DDL and DML of Hive. But the simple SQL dialect is still very limited. Usually it's used together with some Spark application written in Java/Scala/Python. Within a Spark application, you can always register case class RDDs as temporary table, which partly replaces the functionality of DDL/DML in pure SQL scripts. On the other hand, we do plan to support SQL 92 in the future. On 10/16/14 10:50 PM, neeraj wrote: Hi, Does Spark SQL have DDL, DML commands to be executed directly. If yes, please share the link. If No, please help me understand why is it not there? Regards, Neeraj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DDL-DML-commands-tp16572.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running an action inside a loop across multiple RDDs + java.io.NotSerializableException
You can first union them into a single RDD and then call |foreach|. In Scala: |rddList.reduce(_.union(_)).foreach(myFunc) | For the serialization issue, I don’t have any clue unless more code can be shared. On 10/16/14 11:39 PM, /soumya/ wrote: Hi, my programming model requires me to generate multiple RDDs for various datasets across a single run and then run an action on it - E.g. MyFunc myFunc = ... //It implements VoidFunction //set some extra variables - all serializable ... for (JavaRDDString rdd: rddList) { ... sc.foreach(myFunc); } The problem I'm seeing is that after the first run of the loop - which succeeds on foreach, the second one fails with java.io.NotSerializableException for a specific object I'm setting. In my particular case, the object contains a reference to org.apache.hadoop.conf.Configuration. Question is: 1. Why does this succeed the first time, and fail the second? 2. Any alternatives to this programming model? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-an-action-inside-a-loop-across-multiple-RDDs-java-io-NotSerializableException-tp16580.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to add HBase dependencies and conf with spark-submit?
Great, it worked. I don't have an answer what is special about SPARK_CLASSPATH vs --jars, just found the working setting through trial an error. - Original Message - From: Fengyun RAO raofeng...@gmail.com To: Soumitra Kumar kumar.soumi...@gmail.com Cc: user@spark.apache.org, u...@hbase.apache.org Sent: Thursday, October 16, 2014 12:50:01 AM Subject: Re: How to add HBase dependencies and conf with spark-submit? Thanks, Soumitra Kumar, I didn’t know why you put hbase-protocol.jar in SPARK_CLASSPATH, while add hbase-protocol.jar , hbase-common.jar , hbase-client.jar , htrace-core.jar in --jar, but it did work. Actually, I put all these four jars in SPARK_CLASSPATH along with HBase conf directory. 2014-10-15 22:39 GMT+08:00 Soumitra Kumar kumar.soumi...@gmail.com : I am writing to HBase, following are my options: export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar spark-submit \ --jars /opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar \ - Original Message - From: Fengyun RAO raofeng...@gmail.com To: user@spark.apache.org , u...@hbase.apache.org Sent: Wednesday, October 15, 2014 6:29:21 AM Subject: Re: How to add HBase dependencies and conf with spark-submit? +user@hbase 2014-10-15 20:48 GMT+08:00 Fengyun RAO raofeng...@gmail.com : We use Spark 1.1, and HBase 0.98.1-cdh5.1.0, and need to read and write an HBase table in Spark program. I notice there are: spark.driver.extraClassPath spark.executor.extraClassPath properties to manage extra ClassPath, over even an deprecated SPARK_CLASSPATH. The problem is what classpath or jars should we append? I can simplely add the whole `hbase classpath`, which is huge, but this leads to dependencies conflict, e.g. HBase uses guava-12 while Spark uses guava-14. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Folding an RDD in order
Thanks for the suggestion! That does look really helpful, I see what you mean about it being more general than fold. I think I will replace my fold with aggregate - it should give me more control over the process. I think the problem will still exist though - which is that I can't get the correct partitioning I need. When I change my key to user_id, I lose the timestamp partitioning. My problem is that I'm trying to retain a parent RDD's partitioning in an RDD that no longer has the same keys as its parent. On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian lian.cs@gmail.com wrote: Hi Michael, I'm not sure I fully understood your question, but I think RDD.aggregate can be helpful in your case. You can see it as a more general version of fold. Cheng On 10/16/14 11:15 PM, Michael Misiewicz wrote: Hi, I'm working on a problem where I'd like to sum items in an RDD *in order (*approximately*)*. I am currently trying to implement this using a fold, but I'm having some issues because the sorting key of my data is not the same as the folding key for my data. I have data that looks like this: user_id, transaction_timestamp, transaction_amount And I'm interested in doing a foldByKey on user_id to sum transaction amounts - taking care to note approximately when a user surpasses a total transaction threshold. I'm using RangePartitioner to make sure that data is ordered sequentially between partitions, and I'd also make sure that data is sorted within partitions, though I'm not sure how to do this exactly (I was going to look at the code for sortByKey to figure this out - I believe sorting in place in a mapPartitions should work). What do you think about the approach? Here's some sample code that demonstrates what I'm thinking: def myFold(V1:Float, V2:Float) : Float = { val partialSum = V1 + V2 if (partialSum = 500) { // make a note of it, do things } return partialSum } val rawData = sc.textFile(hdfs://path/to/data).map{ x = // load data l = x.split() (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, transaction_timestamp:long, transaction_amount:float } val keyByTimestamp = rawData.map(x= (x._2, (x._1, x._3))) // rearrange to make timestamp the key (for sorting), convert to PairRDD val sortedByTimestamp = keyByTimestamp.sortByKey() val partitionedByTimestamp = sortedByTimestamp.partitionBy( new org.apache.spark.RangePartitioner(partitions=500, rdd=sortedByTimestamp)).persist() // By this point, the RDD should be sorted and partitioned according to the timestamp. However, I need to now make user_id the key, // because the output must be per user. At this point, since I change the keys of the PairRDD, I understand that I lose the partitioning // the consequence of this is that I can no longer be sure in my fold function that the ordering is retained. val keyByUser = partitionedByTimestamp.map(x = (x._2._1, x._2._2)) val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold) finalResult.saveAsTextFile(hdfs://...) The problem as you'd expect takes place in the folding function, after I've re-arranged my RDD to no longer be keyed by timestamp (when I produce keyByUser, I lose the correct partitioning). As I've read in the documentation, partitioning is not preserved when keys are changed (which makes sense). Reading this thread: https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 it appears that one possible solution might be to subclass RDD (à la MappedValuesRDD) to define my own RDD that retains the partitions of its parent. This seems simple enough, but I've never done anything like that before, but I'm not sure where to start. I'm also willing to write my own custom partitioner class, but it appears that the getPartition method only accepts a key argument - and since the value I need to partition on in the final step (the timestamp) would be in the Value, my partitioner class doesn't have the data it needs to make the right decision. I cannot have timestamp in my key. Alternatively, has anyone else encountered a problem like this (i.e. an approximately ordered sum) and did they find a good solution? Does my approach of subclassing RDD make sense? Would there be some way to finagle a custom partitioner into making this work? Perhaps this might be a job for some other tool, like spark streaming? Thanks, Michael
Re: Folding an RDD in order
I note that one of the listing variants of aggregateByKey accepts a partitioner as an argument: def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)] Would it be possible to extract my sorted parent's partitioner and pass that into aggregateByKey on the re-keyed data being aggregated? On Thu, Oct 16, 2014 at 12:01 PM, Michael Misiewicz mmisiew...@gmail.com wrote: Thanks for the suggestion! That does look really helpful, I see what you mean about it being more general than fold. I think I will replace my fold with aggregate - it should give me more control over the process. I think the problem will still exist though - which is that I can't get the correct partitioning I need. When I change my key to user_id, I lose the timestamp partitioning. My problem is that I'm trying to retain a parent RDD's partitioning in an RDD that no longer has the same keys as its parent. On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian lian.cs@gmail.com wrote: Hi Michael, I'm not sure I fully understood your question, but I think RDD.aggregate can be helpful in your case. You can see it as a more general version of fold. Cheng On 10/16/14 11:15 PM, Michael Misiewicz wrote: Hi, I'm working on a problem where I'd like to sum items in an RDD *in order (*approximately*)*. I am currently trying to implement this using a fold, but I'm having some issues because the sorting key of my data is not the same as the folding key for my data. I have data that looks like this: user_id, transaction_timestamp, transaction_amount And I'm interested in doing a foldByKey on user_id to sum transaction amounts - taking care to note approximately when a user surpasses a total transaction threshold. I'm using RangePartitioner to make sure that data is ordered sequentially between partitions, and I'd also make sure that data is sorted within partitions, though I'm not sure how to do this exactly (I was going to look at the code for sortByKey to figure this out - I believe sorting in place in a mapPartitions should work). What do you think about the approach? Here's some sample code that demonstrates what I'm thinking: def myFold(V1:Float, V2:Float) : Float = { val partialSum = V1 + V2 if (partialSum = 500) { // make a note of it, do things } return partialSum } val rawData = sc.textFile(hdfs://path/to/data).map{ x = // load data l = x.split() (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, transaction_timestamp:long, transaction_amount:float } val keyByTimestamp = rawData.map(x= (x._2, (x._1, x._3))) // rearrange to make timestamp the key (for sorting), convert to PairRDD val sortedByTimestamp = keyByTimestamp.sortByKey() val partitionedByTimestamp = sortedByTimestamp.partitionBy( new org.apache.spark.RangePartitioner(partitions=500, rdd=sortedByTimestamp)).persist() // By this point, the RDD should be sorted and partitioned according to the timestamp. However, I need to now make user_id the key, // because the output must be per user. At this point, since I change the keys of the PairRDD, I understand that I lose the partitioning // the consequence of this is that I can no longer be sure in my fold function that the ordering is retained. val keyByUser = partitionedByTimestamp.map(x = (x._2._1, x._2._2)) val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold) finalResult.saveAsTextFile(hdfs://...) The problem as you'd expect takes place in the folding function, after I've re-arranged my RDD to no longer be keyed by timestamp (when I produce keyByUser, I lose the correct partitioning). As I've read in the documentation, partitioning is not preserved when keys are changed (which makes sense). Reading this thread: https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 it appears that one possible solution might be to subclass RDD (à la MappedValuesRDD) to define my own RDD that retains the partitions of its parent. This seems simple enough, but I've never done anything like that before, but I'm not sure where to start. I'm also willing to write my own custom partitioner class, but it appears that the getPartition method only accepts a key argument - and since the value I need to partition on in the final step (the timestamp) would be in the Value, my partitioner class doesn't have the data it needs to make the right decision. I cannot have timestamp in my key. Alternatively, has anyone else encountered a problem like this (i.e. an approximately ordered sum) and did they find a good solution? Does my approach of subclassing RDD make sense? Would there be some way to finagle a custom partitioner into making this work? Perhaps this might be a job for some other tool, like spark streaming? Thanks, Michael
Re: PySpark Error on Windows with sc.wholeTextFiles
It's a bug, could you file a JIRA for this? Thanks! Davies On Thu, Oct 16, 2014 at 8:28 AM, Griffiths, Michael (NYC-RPM) michael.griffi...@reprisemedia.com wrote: Hi, I’m running into an error on Windows (x64, 8.1) running Spark 1.1.0 (pre-builet for Hadoop 2.4: spark-1.1.0-bin-hadoop2.4.tgz) with Java SE Version 8 Update 20 (build 1.8.0_20-b26); just getting started with Spark. When running sc.wholeTextFiles() on a directory, I can run the command but not do anything with the resulting RDD – specifically, I get an error in py4j.protocol.Py4JJavaError; the error is unspecified, though the location is included. I’ve attached the traceback below. In this situation, I’m trying to load all files from a folder on the local filesystem, located at D:\testdata. The folder contains one file, which can be loaded successfully with sc.textFile(“d:/testdata/filename”) – no problems at all – so I do not believe the file is throwing the error. Is there any advice on what I should look at further to isolate or fix the error? Am I doing something obviously wrong? Thanks, Michael Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Python version 2.7.7 (default, Jun 11 2014 10:40:02) SparkContext available as sc. file = sc.textFile(d:/testdata/cbcc5b470ec06f212990c68c8f76e887b884) file.count() 732 file.first() u'!DOCTYPE html' data = sc.wholeTextFiles('d:/testdata') data.first() Traceback (most recent call last): File stdin, line 1, in module File D:\spark\python\pyspark\rdd.py, line 1167, in first return self.take(1)[0] File D:\spark\python\pyspark\rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File D:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py, line 538, in __call__ File D:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o21.partitions. : java.lang.NullPointerException at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:445) at org.apache.hadoop.util.Shell.run(Shell.java:418) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650) at org.apache.hadoop.util.Shell.execCommand(Shell.java:739) at org.apache.hadoop.util.Shell.execCommand(Shell.java:722) at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097) at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:559) at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:534) at org.apache.hadoop.fs.LocatedFileStatus.init(LocatedFileStatus.java:42) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1697) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1679) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:302) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:263) at org.apache.spark.input.WholeTextFileInputFormat.setMaxSplitSize(WholeTextFileInputFormat.scala:54) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaPairRDD.partitions(JavaPairRDD.scala:44) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) data.count() Traceback (most recent call last): File stdin, line 1, in module File D:\spark\python\pyspark\rdd.py, line 847, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File
Re: Folding an RDD in order
RDD.aggregate doesn’t require the RDD elements to be pairs, so you don’t need to use user_id to be the key or the RDD. For example, you can use an empty Map as the zero value of the aggregation. The key of the Map is the user_id you extracted from each tuple, and the value is the aggregated value. |keyByTimestamp.aggregate(Map.empty[String,Float].withDefaultValue(0.0))({ (agg, rec) = val (time, (user, amount)) = rec agg.updated(user, agg(user) + amount) }, { (lhs, rhs) = lhs.keys.foldLeft(rhs) { (combined, user) = combined.updated(user, lhs(user) + rhs(user)) } }) | Of course, you may use mutable Map for optimized performance. One thing to notice, foldByKey is a transformation, while aggregate is an action. The final result of the code above is a single Map object rather than an RDD. If this map can be very large (say you have billions of users), then aggregate may OOM. On 10/17/14 12:01 AM, Michael Misiewicz wrote: Thanks for the suggestion! That does look really helpful, I see what you mean about it being more general than fold. I think I will replace my fold with aggregate - it should give me more control over the process. I think the problem will still exist though - which is that I can't get the correct partitioning I need. When I change my key to user_id, I lose the timestamp partitioning. My problem is that I'm trying to retain a parent RDD's partitioning in an RDD that no longer has the same keys as its parent. On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Hi Michael, I'm not sure I fully understood your question, but I think RDD.aggregate can be helpful in your case. You can see it as a more general version of fold. Cheng On 10/16/14 11:15 PM, Michael Misiewicz wrote: Hi, I'm working on a problem where I'd like to sum items in an RDD /in order (/approximately/)/. I am currently trying to implement this using a fold, but I'm having some issues because the sorting key of my data is not the same as the folding key for my data. I have data that looks like this: user_id, transaction_timestamp, transaction_amount And I'm interested in doing a foldByKey on user_id to sum transaction amounts - taking care to note approximately when a user surpasses a total transaction threshold. I'm using RangePartitioner to make sure that data is ordered sequentially between partitions, and I'd also make sure that data is sorted within partitions, though I'm not sure how to do this exactly (I was going to look at the code for sortByKey to figure this out - I believe sorting in place in a mapPartitions should work). What do you think about the approach? Here's some sample code that demonstrates what I'm thinking: def myFold(V1:Float, V2:Float) : Float = { val partialSum = V1 + V2 if (partialSum = 500) { // make a note of it, do things } return partialSum } val rawData = sc.textFile(hdfs://path/to/data).map{ x = // load data l = x.split() (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, transaction_timestamp:long, transaction_amount:float } val keyByTimestamp = rawData.map(x= (x._2, (x._1, x._3))) // rearrange to make timestamp the key (for sorting), convert to PairRDD val sortedByTimestamp = keyByTimestamp.sortByKey() val partitionedByTimestamp = sortedByTimestamp.partitionBy( new org.apache.spark.RangePartitioner(partitions=500, rdd=sortedByTimestamp)).persist() // By this point, the RDD should be sorted and partitioned according to the timestamp. However, I need to now make user_id the key, // because the output must be per user. At this point, since I change the keys of the PairRDD, I understand that I lose the partitioning // the consequence of this is that I can no longer be sure in my fold function that the ordering is retained. val keyByUser = partitionedByTimestamp.map(x = (x._2._1, x._2._2)) val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold) finalResult.saveAsTextFile(hdfs://...) The problem as you'd expect takes place in the folding function, after I've re-arranged my RDD to no longer be keyed by timestamp (when I produce keyByUser, I lose the correct partitioning). As I've read in the documentation, partitioning is not preserved when keys are changed (which makes sense). Reading this thread: https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 https://groups.google.com/forum/#%21topic/spark-users/Fx7DNtWiSx4 it appears that one possible solution might be to subclass RDD (à la MappedValuesRDD) to define my own RDDthat retains the partitions of its parent. This seems simple enough, but I've never done anything like that before, but I'm not sure where to start. I'm also willing to write my own custom partitioner
Re: Play framework
Mohammed, Jumping in for Daniel, we actually address the configuration issue by pulling values from environment variables or command line options. Maybe that may handle at least some of your needs. For the akka issue, here is the akka version we include in build.sbt: com.typesafe.akka %% akka-actor % 2.2.1 -Suren On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.com wrote: Daniel, Thanks for sharing this. It is very helpful. The reason I want to use Spark submit is that it provides more flexibility. For example, with spark-submit, I don’t need to hard code the master info in the code. I can easily change the config without having to change and recompile code. Do you mind sharing the sbt build file for your play app? I tried to build an uber jar using sbt-assembly. It gets built, but when I run it, it throws all sorts of exception. I have seen some blog posts that Spark and Play use different version of the Akka library. So I included Akka in my build.scala file, but still cannot get rid of Akka related exceptions. I suspect that the settings in the build.scala file for my play project is incorrect. Mohammed *From:* Daniel Siegmann [mailto:daniel.siegm...@velos.io] *Sent:* Thursday, October 16, 2014 7:15 AM *To:* Mohammed Guller *Cc:* user@spark.apache.org *Subject:* Re: Play framework We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome App) // call conf.set for any other configuration you want val sc = new SparkContext(sparkConf) It is important to keep in mind you cannot have multiple local contexts (you can create them but you'll get odd errors), so if you are running things in parallel within your app (even unit tests) you'd need to share a context in this case. If you are running sequentially you can create a new local context each time, but you must make sure to call SparkContext.stop() when you're done. Running against a cluster is a bit more complicated because you need to add all your dependency jars. I'm not sure how to get this to work with play run. I stick to building the app with play dist and then running against the packaged application, because it very conveniently provides all the dependencies in a lib folder. Here is some code to load all the paths you need from the dist: def libs : Seq[String] = { val libDir = play.api.Play.application.getFile(lib) logger.info(sSparkContext will be initialized with libraries from directory $libDir) return if ( libDir.exists ) { libDir.listFiles().map(_.getCanonicalFile().getAbsolutePath()).filter(_.endsWith(.jar)) } else { throw new IllegalStateException(slib dir is missing: $libDir) } } Creating the context is similar to above, but with this extra line: conf.setJars(libs) I hope this helps. I should note that I don't use play run very much, at least not for when I'm actually executing Spark jobs. So I'm not sure if this integrates properly with that. I have unit tests which execute on Spark and have executed the dist package both locally and on a cluster. To make working with the dist locally easier, I wrote myself a little shell script to unzip and run the dist. On Wed, Oct 15, 2014 at 10:51 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – Has anybody figured out how to integrate a Play application with Spark and run it on a Spark cluster using spark-submit script? I have seen some blogs about creating a simple Play app and running it locally on a dev machine with sbt run command. However, those steps don’t work for Spark-submit. If you have figured out how to build and run a Play app with Spark-submit, I would appreciate if you could share the steps and the sbt settings for your Play app. Thanks, Mohammed -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Spark output to s3 extremely slow
Hi Rafal, Thanks for the explanation and solution! I need to write maybe 100 GB to s3. I will try your way and see whether it works for me. Thanks again! On Wed, Oct 15, 2014 at 1:44 AM, Rafal Kwasny m...@entropy.be wrote: Hi, How large is the dataset you're saving into S3? Actually saving to S3 is done in two steps: 1) writing temporary files 2) commiting them to proper directory Step 2) could be slow because S3 do not have a quick atomic move operation, you have to copy (server side but still takes time) and then delete the original. I've overcome this but using a jobconf with NullOutputCommitter jobConf.setOutputCommitter(classOf[NullOutputCommitter]) Where NullOutputCommiter is a Class that doesn't do anything: class NullOutputCommitter extends OutputCommitter { def abortTask(taskContext: TaskAttemptContext) = { } override def cleanupJob(jobContext: JobContext ) = { } def commitTask(taskContext: TaskAttemptContext ) = { } def needsTaskCommit(taskContext: TaskAttemptContext ) = { false } def setupJob(jobContext: JobContext) { } def setupTask(taskContext: TaskAttemptContext) { } } This works but maybe someone has a better solution. /Raf anny9699 wrote: Hi, I found writing output back to s3 using rdd.saveAsTextFile() is extremely slow, much slower than reading from s3. Is there a way to make it faster? The rdd has 150 partitions so parallelism is enough I assume. Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-output-to-s3-extremely-slow-tp16447.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ALS implicit error pyspark
Hi, I am trying to use ALS.trainImplicit method in the pyspark.mllib.recommendation. However it didn't work. So I tried use the example in the python API documentation such as: /r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) / It didn't work neither. After searching in google, I found that there are only two overloads for ALS.trainImplicit in the scala script. So I tried /model = ALS.trainImplicit(ratings, 1, 1)/, it worked. But if I set the iterations other than 1, /model = ALS.trainImplicit(ratings, 1, 2)/ or /model = ALS.trainImplicit(ratings, 4, 2)/ for example, it generated error. The information is as follows: count at ALS.scala:314 Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times, most recent failure: Lost task 6.3 in stage 189.0 (TID 626, ip-172-31-35-239.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: It is really strange, because count at ALS.scala:314 is already out the loop of iterations. Any idea? Thanks a lot for advance. FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the cases. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark assembly for YARN/CDH5
Does anyone know if there Spark assemblies are created and available for download that have been built for CDH5 and YARN? Thanks, Philip - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running an action inside a loop across multiple RDDs + java.io.NotSerializableException
Sorry - I'll furnish some details below. However, union is not an option for the business logic I have. The function will generate a specific file based on a variable passed in as the setter for the function. This variable changes with each RDD. I annotated the log line where the first run succeeds. Logs output to give you some context: ... 14/10/16 15:32:25 INFO SparkContext: Starting job: count at GenerateJSONContent.java:145 14/10/16 15:32:25 INFO DAGScheduler: Got job 0 (count at GenerateJSONContent.java:145) with 1 output partitions (allowLocal=false) 14/10/16 15:32:25 INFO DAGScheduler: Final stage: Stage 0(count at GenerateJSONContent.java:145) 14/10/16 15:32:25 INFO DAGScheduler: Parents of final stage: List() 14/10/16 15:32:25 INFO DAGScheduler: Missing parents: List() 14/10/16 15:32:25 INFO DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82), which has no missing parents 14/10/16 15:32:25 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82) 14/10/16 15:32:25 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/10/16 15:32:25 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL) 14/10/16 15:32:25 INFO TaskSetManager: Serialized task 0.0:0 as 2048 bytes in 3 ms 14/10/16 15:32:25 INFO Executor: Running task ID 0 14/10/16 15:32:25 INFO Executor: Fetching http://172.16.1.204:42232/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar with timestamp 1413473544838 14/10/16 15:32:25 INFO Utils: Fetching http://172.16.1.204:42232/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar to /tmp/fetchFileTemp7432615579770034188.tmp 14/10/16 15:32:26 INFO Executor: Adding file:/tmp/spark-a471145f-ab44-447b-b48c-bb499024d756/rickshaw-spark-0.0.1-SNAPSHOT.jar to class loader 14/10/16 15:32:26 INFO Executor: Serialized size of result for 0 is 597 14/10/16 15:32:26 INFO Executor: Sending result for 0 directly to driver 14/10/16 15:32:26 INFO Executor: Finished task ID 0 14/10/16 15:32:26 INFO DAGScheduler: Completed ResultTask(0, 0) 14/10/16 15:32:26 INFO DAGScheduler: Stage 0 (count at GenerateJSONContent.java:145) finished in 0.791 s 14/10/16 15:32:26 INFO SparkContext: Job finished: count at GenerateJSONContent.java:145, took 0.889171151 s 14/10/16 15:32:26 INFO GenerateJSONContent: Running for tag :PublicationProductId 14/10/16 15:32:26 INFO TaskSetManager: Finished TID 0 in 778 ms on localhost (progress: 1/1) 14/10/16 15:32:26 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/10/16 15:32:26 INFO SparkContext: Starting job: foreach at GenerateJSONContent.java:156 14/10/16 15:32:26 INFO DAGScheduler: Got job 1 (foreach at GenerateJSONContent.java:156) with 1 output partitions (allowLocal=false) 14/10/16 15:32:26 INFO DAGScheduler: Final stage: Stage 1(foreach at GenerateJSONContent.java:156) 14/10/16 15:32:26 INFO DAGScheduler: Parents of final stage: List() 14/10/16 15:32:26 INFO DAGScheduler: Missing parents: List() 14/10/16 15:32:26 INFO DAGScheduler: Submitting Stage 1 (ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82), which has no missing parents 14/10/16 15:32:26 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82) 14/10/16 15:32:26 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/10/16 15:32:26 INFO TaskSetManager: Starting task 1.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL) 14/10/16 15:32:26 INFO TaskSetManager: Serialized task 1.0:0 as 2792 bytes in 1 ms 14/10/16 15:32:26 INFO Executor: Running task ID 1 14/10/16 15:32:35 INFO Executor: Serialized size of result for 1 is 559 14/10/16 15:32:35 INFO Executor: Sending result for 1 directly to driver 14/10/16 15:32:35 INFO Executor: Finished task ID 1 14/10/16 15:32:35 INFO DAGScheduler: Completed ResultTask(1, 0) 14/10/16 15:32:35 INFO DAGScheduler: Stage 1 (foreach at GenerateJSONContent.java:156) finished in 9.098 s 14/10/16 15:32:35 INFO SparkContext: Job finished: foreach at GenerateJSONContent.java:156, took 9.112736939 s 14/10/16 15:32:35 INFO TaskSetManager: Finished TID 1 in 9095 ms on localhost (progress: 1/1) 14/10/16 15:32:35 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/10/16 15:32:36 INFO SparkContext: Starting job: count at GenerateJSONContent.java:145 14/10/16 15:32:36 INFO DAGScheduler: Got job 2 (count at GenerateJSONContent.java:145) with 1 output partitions (allowLocal=false) 14/10/16 15:32:36 INFO DAGScheduler: Final stage: Stage 2(count at GenerateJSONContent.java:145) 14/10/16 15:32:36 INFO DAGScheduler: Parents of final stage: List() 14/10/16 15:32:36 INFO DAGScheduler: Missing parents: List() 14/10/16 15:32:36 INFO DAGScheduler: Submitting Stage 2 (ParallelCollectionRDD[1] at parallelize at GenerateJSONContent.java:82), which has no missing parents 14/10/16 15:32:36
Re: How to make operation like cogrop() , groupbykey() on pair RDD = [ [ ], [ ] , [ ] ]
Hi, You just need add list() in the sorted function. For example, map((lambda (x,y): (x, (list(y[0]), list(y[1], sorted(list(rdd1.cogroup(rdd2).collect( I think you just forget the list... PS: your post has NOT been accepted by the mailing list yet. Best Gen pm wrote Hi , Thanks for reply , now after doing cogroup mentioned in below, merge_rdd = map((lambda (x,y): (x, (list(y[0]), list(y[1], sorted((rdd1.cogroup(rdd2).collect( map((lambda (x,y): (x, (list(y[0]), list(y[1], sorted((merge_rdd.cogroup(rdd3).collect( i m getting output like [((u'abc', u'0010'), ([( pyspark.resultiterable.ResultIterable at 0x4b1b4d0 , pyspark.resultiterable.ResultIterable at 0x4b1b550 )], [[(u'address, u'2017 CAN'), (u'address_city', u'VESTAVIA '), ]])), ((u'abc', u'0020'), ([( pyspark.resultiterable.ResultIterable at 0x4b1bd50 , pyspark.resultiterable.ResultIterable at 0x4b1bf10 )], [[(u'address', u'2017 CAN'), (u'address_city', u'VESTAV'), ]]))] How to show value for object pyspark.resultiterable.ResultIterable at 0x4b1b4d0. I want to show data for pyspark.resultiterable.ResultIterable at 0x4b1bd50. Could please tell me the way to show data for those object . I m using python Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-operation-like-cogrop-groupbykey-on-pair-RDD-tp16487p16598.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TaskNotSerializableException when running through Spark shell
I actually only ran into this issue recently after we upgraded to Spark 1.1. Within the REPL for Spark 1.0 everything works fine but within the REPL for 1.1, it is not. FYI I am also only doing simple regex matching functions within an RDD... Now when I am running the same code as App everything is working fine... it leads me to believe that it is a bug within the REPL for 1.1 Can anyone else confirm this? ᐧ *JIMMY MCERLAIN* DATA SCIENTIST (NERD) *. . . . . . . . . . . . . . . . . .* *IF WE CAN’T DOUBLE YOUR SALES,* *ONE OF US IS IN THE WRONG BUSINESS.* *E*: ji...@sellpoints.com *M*: *510.303.7751* On Thu, Oct 16, 2014 at 7:56 AM, Akshat Aranya aara...@gmail.com wrote: Hi, Can anyone explain how things get captured in a closure when runing through the REPL. For example: def foo(..) = { .. } rdd.map(foo) sometimes complains about classes not being serializable that are completely unrelated to foo. This happens even when I write it such: object Foo { def foo(..) = { .. } } rdd.map(Foo.foo) It also doesn't happen all the time.
Re: Play framework
We integrated Spark into Play and use SparkSQL extensively on an ec2 spark cluster on Hadoop hdfs 1.2.1 and tachyon 0.4. Step 1: Create a play scala application as usual Step 2. In Build.sbt put all your spark dependencies. What works for us is Play 2.2.3 Scala 2.10.4 Spark 1.1. We have Akka 2.2.3. This is straight forward step3: As Daniel mentioned, create spark context within Play. And rest of the application is as usual. Step4: Create a full jar using Play Package and use that package to be included in library of jars passed to spark context. Step 5: Play run as usual. It works very well, and the convenience is, we have all scala application throughout. Regards Raju From: Surendranauth Hiraman suren.hira...@velos.io Sent: Thursday, October 16, 2014 12:42 PM To: Mohammed Guller Cc: Daniel Siegmann; user@spark.apache.org Subject: Re: Play framework Mohammed, Jumping in for Daniel, we actually address the configuration issue by pulling values from environment variables or command line options. Maybe that may handle at least some of your needs. For the akka issue, here is the akka version we include in build.sbt: com.typesafe.akka %% akka-actor % 2.2.1 -Suren On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Daniel, Thanks for sharing this. It is very helpful. The reason I want to use Spark submit is that it provides more flexibility. For example, with spark-submit, I don’t need to hard code the master info in the code. I can easily change the config without having to change and recompile code. Do you mind sharing the sbt build file for your play app? I tried to build an uber jar using sbt-assembly. It gets built, but when I run it, it throws all sorts of exception. I have seen some blog posts that Spark and Play use different version of the Akka library. So I included Akka in my build.scala file, but still cannot get rid of Akka related exceptions. I suspect that the settings in the build.scala file for my play project is incorrect. Mohammed From: Daniel Siegmann [mailto:daniel.siegm...@velos.iomailto:daniel.siegm...@velos.io] Sent: Thursday, October 16, 2014 7:15 AM To: Mohammed Guller Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome App) // call conf.set for any other configuration you want val sc = new SparkContext(sparkConf) It is important to keep in mind you cannot have multiple local contexts (you can create them but you'll get odd errors), so if you are running things in parallel within your app (even unit tests) you'd need to share a context in this case. If you are running sequentially you can create a new local context each time, but you must make sure to call SparkContext.stop() when you're done. Running against a cluster is a bit more complicated because you need to add all your dependency jars. I'm not sure how to get this to work with play run. I stick to building the app with play dist and then running against the packaged application, because it very conveniently provides all the dependencies in a lib folder. Here is some code to load all the paths you need from the dist: def libs : Seq[String] = { val libDir = play.api.Play.application.getFile(lib) logger.infohttp://logger.info(sSparkContext will be initialized with libraries from directory $libDir) return if ( libDir.exists ) { libDir.listFiles().map(_.getCanonicalFile().getAbsolutePath()).filter(_.endsWith(.jar)) } else { throw new IllegalStateException(slib dir is missing: $libDir) } } Creating the context is similar to above, but with this extra line: conf.setJars(libs) I hope this helps. I should note that I don't use play run very much, at least not for when I'm actually executing Spark jobs. So I'm not sure if this integrates properly with that. I have unit tests which execute on Spark and have executed the dist package both locally and on a cluster. To make working with the dist locally easier, I wrote myself a little shell script to unzip and run the dist. On Wed, Oct 15, 2014 at 10:51 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Hi – Has anybody figured out how to integrate a Play application with Spark and run it on a Spark cluster using spark-submit script? I have seen some blogs about
reverse an rdd
hello... what is the best way to iterate through an rdd backward (last element first, first element last)? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reverse-an-rdd-tp16602.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can's create Kafka stream in spark shell
Thanks Akhil. I tried spark-submit and saw the same issue. I double checked the versions and they look ok. Are you seeing any obvious issues? sbt: name := Simple Project version := 1.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.0, org.apache.spark %% spark-streaming % 1.1.0, org.apache.spark %% spark-streaming-kafka % 1.1.0, org.apache.kafka %% kafka % 0.8.0 ) spark-1.1.0-bin-hadoop1/bin/spark-submit --class main.scala.SimpleApp --master local[2] simple-project_2.10-1.1.jar --jars spark-streaming-kafka_2.10-1.1.0.jar,kafka_2.10-0.8.0.jar Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at main.scala.SimpleApp$delayedInit$body.apply(SimpleApp.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at main.scala.SimpleApp$.main(SimpleApp.scala:11) at main.scala.SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more On Tue, Oct 14, 2014 at 12:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have the same version of spark-streaming-kafka http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 jar and spark in your classpath. Thanks Best Regards On Tue, Oct 14, 2014 at 9:02 AM, Gary Zhao garyz...@gmail.com wrote: Hello I'm trying to connect kafka in spark shell, but failed as below. Could you take a look what I missed. scala val kafkaStream = KafkaUtils.createStream(ssc, test-vip.snc1:2181, test_spark, Map(user-test-1)) error: bad symbolic reference. A signature in KafkaUtils.class refers to term serializer in value kafka which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling KafkaUtils.class. Thanks Gary
Re: Spark assembly for YARN/CDH5
https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/spark/spark-assembly_2.10/ ? I'm not sure why the 5.2 + 1.1 final artifacts don't show up there yet though. On Thu, Oct 16, 2014 at 2:12 PM, Philip Ogren philip.og...@oracle.com wrote: Does anyone know if there Spark assemblies are created and available for download that have been built for CDH5 and YARN? Thanks, Philip - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can's create Kafka stream in spark shell
Can you try: sbt: name := Simple Project version := 1.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.0, org.apache.spark %% spark-streaming % 1.1.0, org.apache.spark %% spark-streaming-kafka % 1.1.0 ) Thanks Best Regards On Fri, Oct 17, 2014 at 12:36 AM, Gary Zhao garyz...@gmail.com wrote: Thanks Akhil. I tried spark-submit and saw the same issue. I double checked the versions and they look ok. Are you seeing any obvious issues? sbt: name := Simple Project version := 1.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.0, org.apache.spark %% spark-streaming % 1.1.0, org.apache.spark %% spark-streaming-kafka % 1.1.0, org.apache.kafka %% kafka % 0.8.0 ) spark-1.1.0-bin-hadoop1/bin/spark-submit --class main.scala.SimpleApp --master local[2] simple-project_2.10-1.1.jar --jars spark-streaming-kafka_2.10-1.1.0.jar,kafka_2.10-0.8.0.jar Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at main.scala.SimpleApp$delayedInit$body.apply(SimpleApp.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at main.scala.SimpleApp$.main(SimpleApp.scala:11) at main.scala.SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more On Tue, Oct 14, 2014 at 12:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have the same version of spark-streaming-kafka http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 jar and spark in your classpath. Thanks Best Regards On Tue, Oct 14, 2014 at 9:02 AM, Gary Zhao garyz...@gmail.com wrote: Hello I'm trying to connect kafka in spark shell, but failed as below. Could you take a look what I missed. scala val kafkaStream = KafkaUtils.createStream(ssc, test-vip.snc1:2181, test_spark, Map(user-test-1)) error: bad symbolic reference. A signature in KafkaUtils.class refers to term serializer in value kafka which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling KafkaUtils.class. Thanks Gary
Re: reverse an rdd
Since you're concerned with the particular ordering, you will need to sort your RDD to ensure the ordering you have in mind. Simply reverse the Ordering with Ordering.reverse() and sort by that instead, and then use toLocalIterator() I suppose. Depending on what you're really trying to achieve, there may be a better way. On Thu, Oct 16, 2014 at 2:49 PM, ll duy.huynh@gmail.com wrote: hello... what is the best way to iterate through an rdd backward (last element first, first element last)? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reverse-an-rdd-tp16602.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS implicit error pyspark
I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most recent call last): File stdin, line 1, in module * File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in trainImplicit 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed intentionally) ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): TaskKilled (killed intentionally) : An error occurred while calling o32.trainImplicitALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at
Re: ALS implicit error pyspark
It seems a bug, Could you create a JIRA for it? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most recent call last): File stdin, line 1, in module * File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in trainImplicit 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed intentionally) ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): TaskKilled (killed intentionally) : An error occurred while calling o32.trainImplicitALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at
hi all
Hi, I just wanted to say hi all to the Spark community. I'm developing some stuff right now using Spark (we've started very recently). As the API documentation of Spark is really really good, I like to get deeper knowledge of the internal stuff -you know, the goodies. Watching movies from Spark Summits helps, nevertheless I hope to learn a lot from reading this mailing list. Regrads, Pawel Szulc
Re: reverse an rdd
Just to have this clear, can you answer with quick yes or no: Does it mean that when I create RDD from a file and I simply iterate through it like this: sc.textFile(some_text_file.txt).foreach(line = println(line)) then the actual lines might come in different order then they are in the file? On Thu, Oct 16, 2014 at 9:13 PM, Sean Owen so...@cloudera.com wrote: Since you're concerned with the particular ordering, you will need to sort your RDD to ensure the ordering you have in mind. Simply reverse the Ordering with Ordering.reverse() and sort by that instead, and then use toLocalIterator() I suppose. Depending on what you're really trying to achieve, there may be a better way. On Thu, Oct 16, 2014 at 2:49 PM, ll duy.huynh@gmail.com wrote: hello... what is the best way to iterate through an rdd backward (last element first, first element last)? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reverse-an-rdd-tp16602.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: reverse an rdd
Nevermind, I've just run the code in the REPL. Indeed if we do not sort, then the order is totally random. Which actually makes sens if you think about it On Thu, Oct 16, 2014 at 9:58 PM, Paweł Szulc paul.sz...@gmail.com wrote: Just to have this clear, can you answer with quick yes or no: Does it mean that when I create RDD from a file and I simply iterate through it like this: sc.textFile(some_text_file.txt).foreach(line = println(line)) then the actual lines might come in different order then they are in the file? On Thu, Oct 16, 2014 at 9:13 PM, Sean Owen so...@cloudera.com wrote: Since you're concerned with the particular ordering, you will need to sort your RDD to ensure the ordering you have in mind. Simply reverse the Ordering with Ordering.reverse() and sort by that instead, and then use toLocalIterator() I suppose. Depending on what you're really trying to achieve, there may be a better way. On Thu, Oct 16, 2014 at 2:49 PM, ll duy.huynh@gmail.com wrote: hello... what is the best way to iterate through an rdd backward (last element first, first element last)? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reverse-an-rdd-tp16602.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to name a DStream
Hello, I am debugging my code to find out what else to cache. Following is a line in log: 14/10/16 12:00:01 INFO TransformedDStream: Persisting RDD 6 for time 141348600 ms to StorageLevel(true, true, false, false, 1) at time 141348600 ms Is there a way to name a DStream? RDD has a name method, but DStream does not. Please let me know if there a way to map the DStream to a location in my source. Thanks, -Soumitra. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Play framework
Thanks, Suren and Raju. Raju – if I remember correctly, Play package command just creates a jar for your app. That jar file will not include other dependencies. So it is not really a full jar as you mentioned below. So how you are passing all the other dependency jars to spark? Can you share that piece of code? Also is there any specific reason why you are not using play dist instead? Mohammed From: US Office Admin [mailto:ad...@vectorum.com] Sent: Thursday, October 16, 2014 11:41 AM To: Surendranauth Hiraman; Mohammed Guller Cc: Daniel Siegmann; user@spark.apache.org Subject: Re: Play framework We integrated Spark into Play and use SparkSQL extensively on an ec2 spark cluster on Hadoop hdfs 1.2.1 and tachyon 0.4. Step 1: Create a play scala application as usual Step 2. In Build.sbt put all your spark dependencies. What works for us is Play 2.2.3 Scala 2.10.4 Spark 1.1. We have Akka 2.2.3. This is straight forward step3: As Daniel mentioned, create spark context within Play. And rest of the application is as usual. Step4: Create a full jar using Play Package and use that package to be included in library of jars passed to spark context. Step 5: Play run as usual. It works very well, and the convenience is, we have all scala application throughout. Regards Raju From: Surendranauth Hiraman suren.hira...@velos.iomailto:suren.hira...@velos.io Sent: Thursday, October 16, 2014 12:42 PM To: Mohammed Guller Cc: Daniel Siegmann; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework Mohammed, Jumping in for Daniel, we actually address the configuration issue by pulling values from environment variables or command line options. Maybe that may handle at least some of your needs. For the akka issue, here is the akka version we include in build.sbt: com.typesafe.akka %% akka-actor % 2.2.1 -Suren On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Daniel, Thanks for sharing this. It is very helpful. The reason I want to use Spark submit is that it provides more flexibility. For example, with spark-submit, I don’t need to hard code the master info in the code. I can easily change the config without having to change and recompile code. Do you mind sharing the sbt build file for your play app? I tried to build an uber jar using sbt-assembly. It gets built, but when I run it, it throws all sorts of exception. I have seen some blog posts that Spark and Play use different version of the Akka library. So I included Akka in my build.scala file, but still cannot get rid of Akka related exceptions. I suspect that the settings in the build.scala file for my play project is incorrect. Mohammed From: Daniel Siegmann [mailto:daniel.siegm...@velos.iomailto:daniel.siegm...@velos.io] Sent: Thursday, October 16, 2014 7:15 AM To: Mohammed Guller Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome App) // call conf.set for any other configuration you want val sc = new SparkContext(sparkConf) It is important to keep in mind you cannot have multiple local contexts (you can create them but you'll get odd errors), so if you are running things in parallel within your app (even unit tests) you'd need to share a context in this case. If you are running sequentially you can create a new local context each time, but you must make sure to call SparkContext.stop() when you're done. Running against a cluster is a bit more complicated because you need to add all your dependency jars. I'm not sure how to get this to work with play run. I stick to building the app with play dist and then running against the packaged application, because it very conveniently provides all the dependencies in a lib folder. Here is some code to load all the paths you need from the dist: def libs : Seq[String] = { val libDir = play.api.Play.application.getFile(lib) logger.infohttp://logger.info(sSparkContext will be initialized with libraries from directory $libDir) return if ( libDir.exists ) { libDir.listFiles().map(_.getCanonicalFile().getAbsolutePath()).filter(_.endsWith(.jar)) } else { throw new IllegalStateException(slib dir is missing: $libDir) } } Creating the context is similar to above, but with this extra line: conf.setJars(libs) I hope this
Re: Exception while reading SendingConnection to ConnectionManagerId
Does anyone know anything re: this error? Thank you! On Wed, Oct 15, 2014 at 3:38 PM, Jimmy Li jimmy...@bluelabs.com wrote: Hi there, I'm running spark on ec2, and am running into an error there that I don't get locally. Here's the error: 11335 [handle-read-write-executor-3] ERROR org.apache.spark.network.SendingConnection - Exception while reading SendingConnection to ConnectionManagerId([IP HERE]) java.nio.channels.ClosedChannelException Does anyone know what might be causing this? Spark is running on my ec2 instances. Thanks, Jimmy
Re: TF-IDF in Spark 1.1.0
Thanks for the response. Appreciate the help! Burke On Tue, Oct 14, 2014 at 3:00 PM, Xiangrui Meng men...@gmail.com wrote: You cannot recover the document from the TF-IDF vector, because HashingTF is not reversible. You can assign each document a unique ID, and join back the result after training. HasingTF can transform individual record: val docs: RDD[(String, Seq[String])] = ... val tf = new HashingTF() val tfWithId: RDD[(String, Vector)] = docs.mapValues(tf.transform) ... Best, Xiangrui On Tue, Oct 14, 2014 at 9:15 AM, Burke Webster burke.webs...@gmail.com wrote: I'm following the Mllib example for TF-IDF and ran into a problem due to my lack of knowledge of Scala and spark. Any help would be greatly appreciated. Following the Mllib example I could do something like this: import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.feature.IDF val sc: SparkContext = ... val documents: RDD[Seq[String]] = sc.textFile(...).map(_.split( ).toSeq) val hashingTF = new HashingTF() val tf: RDD[Vector] = hasingTF.transform(documents) tf.cache() val idf = new IDF().fit(tf) val tfidf: RDD[Vector] = idf.transform(tf) As a result I would have an RDD containing the TF-IDF vectors for the input documents. My question is how do I map the vector back to the original input document? My end goal is to compute document similarity using cosine similarity. From what I can tell, I can compute TF-IDF, apply the L2 norm, and then compute the dot-product. Has anybody done this? Currently, my example looks more like this: import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.IDF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext val sc: SparkContext = ... // input is sequence file of the form (docid: Text, content: Text) val data: RDD[(String, String)] = sc.sequenceFile[String, String](“corpus”) val docs: RDD[(String, Seq[String])] = data.mapValues(v = v.split( ).toSeq) val hashingTF = new HashingTF() val tf: RDD[(String, Vector)] = hashingTF.?? I'm trying to maintain some linking from the document identifier to it's eventual vertex representation. I'm I going about this incorrectly? Thanks
Re: Spark assembly for YARN/CDH5
Hi Philip, The assemblies are part of the CDH distribution. You can get them here: http://www.cloudera.com/content/cloudera/en/downloads/cdh/cdh-5-2-0.html As of Spark 1.1 (and, thus, CDH 5.2), assemblies are not published to maven repositories anymore (you can see commit [1] for details). [1] https://github.com/apache/spark/commit/f493f7982b50 On Thu, Oct 16, 2014 at 11:12 AM, Philip Ogren philip.og...@oracle.com wrote: Does anyone know if there Spark assemblies are created and available for download that have been built for CDH5 and YARN? Thanks, Philip - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Bug? job fails to run when given options on spark-submit (but starts and fails without)
TL;DR - a spark SQL job fails with an OOM (Out of heap space) error. If given --executor-memory values, it won't even start. Even (!) if the values given ARE THE SAME AS THE DEFAULT. Without --executor-memory: 14/10/16 17:14:58 INFO TaskSetManager: Serialized task 1.0:64 as 14710 bytes in 1 ms 14/10/16 17:14:58 WARN TaskSetManager: Lost TID 26 (task 1.0:25) 14/10/16 17:14:58 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:609) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) ... USING --executor-memory (WITH ANY VALUE), even 1G which is the default: Parsed arguments: master spark://redacted:7077 deployMode null executorMemory 1G ... System properties: spark.executor.memory - 1G spark.eventLog.enabled - true ... 14/10/16 17:14:23 INFO TaskSchedulerImpl: Adding task set 1.0 with 678 tasks 14/10/16 17:14:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Spark 1.0.0. Is this a bug?
Re: Can's create Kafka stream in spark shell
Same error. I saw someone reported the same issue, e.g. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-kafka-error-td9106.html Should I use sbt assembly? It failed for deduplicate though. error] (*:assembly) deduplicate: different file contents found in the following: [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA [error] Total time: 4 s, completed Oct 16, 2014 1:58:41 PM On Thu, Oct 16, 2014 at 12:11 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you try: sbt: name := Simple Project version := 1.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.0, org.apache.spark %% spark-streaming % 1.1.0, org.apache.spark %% spark-streaming-kafka % 1.1.0 ) Thanks Best Regards On Fri, Oct 17, 2014 at 12:36 AM, Gary Zhao garyz...@gmail.com wrote: Thanks Akhil. I tried spark-submit and saw the same issue. I double checked the versions and they look ok. Are you seeing any obvious issues? sbt: name := Simple Project version := 1.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.0, org.apache.spark %% spark-streaming % 1.1.0, org.apache.spark %% spark-streaming-kafka % 1.1.0, org.apache.kafka %% kafka % 0.8.0 ) spark-1.1.0-bin-hadoop1/bin/spark-submit --class main.scala.SimpleApp --master local[2] simple-project_2.10-1.1.jar --jars spark-streaming-kafka_2.10-1.1.0.jar,kafka_2.10-0.8.0.jar Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at main.scala.SimpleApp$delayedInit$body.apply(SimpleApp.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at main.scala.SimpleApp$.main(SimpleApp.scala:11) at main.scala.SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more On Tue, Oct 14, 2014 at 12:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have the same version of spark-streaming-kafka http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 jar and spark in your classpath. Thanks Best Regards On Tue, Oct 14, 2014 at 9:02 AM, Gary Zhao garyz...@gmail.com wrote: Hello I'm trying to connect kafka in spark shell, but failed as below. Could you take a look what I missed. scala val kafkaStream = KafkaUtils.createStream(ssc, test-vip.snc1:2181, test_spark, Map(user-test-1)) error: bad symbolic reference. A signature in KafkaUtils.class refers to term serializer in value kafka which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling KafkaUtils.class. Thanks Gary
Re: ALS implicit error pyspark
On Thu, Oct 16, 2014 at 9:53 AM, Gen gen.tan...@gmail.com wrote: Hi, I am trying to use ALS.trainImplicit method in the pyspark.mllib.recommendation. However it didn't work. So I tried use the example in the python API documentation such as: /r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) / It didn't work neither. After searching in google, I found that there are only two overloads for ALS.trainImplicit in the scala script. So I tried /model = ALS.trainImplicit(ratings, 1, 1)/, it worked. But if I set the iterations other than 1, /model = ALS.trainImplicit(ratings, 1, 2)/ or /model = ALS.trainImplicit(ratings, 4, 2)/ for example, it generated error. The information is as follows: The Python API has default values for all other arguments, so you should call with only rank=1 (no default iterations in Scala). I'm curious that how can you meet this problem? count at ALS.scala:314 Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times, most recent failure: Lost task 6.3 in stage 189.0 (TID 626, ip-172-31-35-239.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: It is really strange, because count at ALS.scala:314 is already out the loop of iterations. Any idea? Thanks a lot for advance. FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the cases. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS implicit error pyspark
Could you post the code that have problem with pyspark? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most recent call last): File stdin, line 1, in module * File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in trainImplicit 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed intentionally) ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): TaskKilled (killed intentionally) : An error occurred while calling o32.trainImplicitALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at
EC2 cluster set up and access to HBase in a different cluster
The plan is to create an EC2 cluster and run the (py) spark on it. Input data is from s3, output data goes to an hbase in a persistent cluster (also EC2). My questions are: 1. I need to install some software packages on all the workers (sudo apt-get install ...). Is there a better way to do this than going to every node to manually install them? 2. I assume the spark can access the hbase which is in a different cluster. Am I correct? if yes, how? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EC2-cluster-set-up-and-access-to-HBase-in-a-different-cluster-tp16622.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
scala: java.net.BindException?
hello... does anyone know how to resolve this issue? i'm running this locally on my computer. keep getting this BindException. much appreciated. 14/10/16 17:48:13 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext.init(SparkContext.scala:224) at nn.SimpleNeuralNetwork$delayedInit$body.apply(SimpleNeuralNetwork.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scala-java-net-BindException-tp16624.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS implicit error pyspark
I can run the following code against Spark 1.1 sc = SparkContext() r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) Davies On Thu, Oct 16, 2014 at 2:45 PM, Davies Liu dav...@databricks.com wrote: Could you post the code that have problem with pyspark? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen gen.tan...@gmail.com wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most recent call last): File stdin, line 1, in module * File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in trainImplicit 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed intentionally) ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): TaskKilled (killed intentionally) : An error occurred while calling o32.trainImplicitALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
Strange duplicates in data when scaling up
I have a flatmap function that shouldn't possibly emit duplicates and yet it does. The output of my function is a HashSet so the function itself cannot output duplicates and yet I see many copies of keys emmited from it (in one case up to 62). The curious thing is I can't get this to happen until I ramp up the size of the input lines to about 100,000. For example: (3587005221,[[(80530632,0.20824391739360665)], [(80530632,0.20824391739360665)]]) Will expand to (3587005221,(80530632,0.37312230565577803)) (3587005221,(80530632,0.37312230565577803)) (3587005221,(80530632,0.37312230565577803)) . . . (3587005221,(80530632,0.37312230565577803)) 62 total times If I run this line only as input I only get the one line of output as expected. It seems to be a scaling up issue. My code is as follows: JavaPairRDDLong,IterableIterableTuple2Integer,Double preAggData = indidKeyedJoinedData.groupByKey(); JavaPairRDDLong,Tuple2Integer,Double aggregatedData = preAggData.flatMapToPair(new AggregateLikeSims()); Where: static class AggregateLikeSims implements PairFlatMapFunctionTuple2Long,IterableIterableTuple2Integer,Double, Long,Tuple2Integer,Double{ HashSetTuple2Long, Tuple2Integer, Double output = new HashSetTuple2Long, Tuple2Integer, Double(); MapInteger,ListDouble intermediateMap = new HashMapInteger,ListDouble(); IteratorTuple2Integer,Double intIterator; Tuple2Integer,Double currentTuple; Double MAX_RECO_VALUE = 1.0; IteratorIterableTuple2Integer,Double itIterator; AccumulatorInteger accum; @Override public IterableTuple2Long, Tuple2Integer, Double call(Tuple2Long,IterableIterableTuple2Integer,Double inTuple){ itIterator = inTuple._2.iterator(); while(itIterator.hasNext()){ intIterator = itIterator.next().iterator(); while(intIterator.hasNext()){ currentTuple = intIterator.next(); if (intermediateMap.containsKey(currentTuple._1)){ intermediateMap.get(currentTuple._1).add(currentTuple._2); } else { ListDouble listOfDoubles = new ArrayListDouble(); listOfDoubles.add(currentTuple._2); intermediateMap.put(currentTuple._1, listOfDoubles); } } } IteratorMap.EntryInteger,ListDouble it = intermediateMap.entrySet().iterator(); while (it.hasNext()) { Map.EntryInteger,ListDouble pairs = it.next(); if (pairs.getValue().size() 1) { output.add(new Tuple2Long, Tuple2Integer, Double(inTuple._1,new Tuple2Integer,Double(pairs.getKey(),aggregate(pairs.getValue(); } else { output.add(new Tuple2Long, Tuple2Integer, Double(inTuple._1,new Tuple2Integer,Double(pairs.getKey(),pairs.getValue().get(0; } it.remove(); } return output; } private double aggregate(ListDouble simsList) { if (simsList == null) { return 0; } if (simsList.size() == 1) { return simsList.get(0); }
Spark streaming on data at rest.
Apologies if this is something very obvious but I've perused the spark streaming guide and this isn't very evident to me still. So I have files with data of the format: timestamp,column1,column2,column3.. etc. and I'd like to use the spark streaming's window operations on them. However from what I notice, the streams are expected to be live. Is there a way to do window operations on timestamps from my dataset without somehow replaying the messages? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-on-data-at-rest-tp16627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: EC2 cluster set up and access to HBase in a different cluster
Maybe I should create a private AMI to use for my question No.1? Assuming I use the default instance type as the base image.. Anyone tried this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EC2-cluster-set-up-and-access-to-HBase-in-a-different-cluster-tp16622p16628.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Print dependency graph as DOT file
Hello, Is there a way to print the dependency graph of complete program or RDD/DStream as a DOT file? It would be very helpful to have such a thing. Thanks, -Soumitra. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Play framework
Hi, Below is the link for a simple Play + SparkSQL example - http://blog.knoldus.com/2014/07/14/play-with-spark-building-apache-spark-with-play-framework-part-3/ https://github.com/knoldus/Play-Spark-Scala Manu On Thu, Oct 16, 2014 at 1:00 PM, Mohammed Guller moham...@glassbeam.com wrote: Thanks, Suren and Raju. Raju – if I remember correctly, Play package command just creates a jar for your app. That jar file will not include other dependencies. So it is not really a full jar as you mentioned below. So how you are passing all the other dependency jars to spark? Can you share that piece of code? Also is there any specific reason why you are not using play dist instead? Mohammed *From:* US Office Admin [mailto:ad...@vectorum.com] *Sent:* Thursday, October 16, 2014 11:41 AM *To:* Surendranauth Hiraman; Mohammed Guller *Cc:* Daniel Siegmann; user@spark.apache.org *Subject:* Re: Play framework We integrated Spark into Play and use SparkSQL extensively on an ec2 spark cluster on Hadoop hdfs 1.2.1 and tachyon 0.4. Step 1: Create a play scala application as usual Step 2. In Build.sbt put all your spark dependencies. What works for us is Play 2.2.3 Scala 2.10.4 Spark 1.1. We have Akka 2.2.3. This is straight forward step3: As Daniel mentioned, create spark context within Play. And rest of the application is as usual. Step4: Create a full jar using Play Package and use that package to be included in library of jars passed to spark context. Step 5: Play run as usual. It works very well, and the convenience is, we have all scala application throughout. Regards Raju -- *From:* Surendranauth Hiraman suren.hira...@velos.io *Sent:* Thursday, October 16, 2014 12:42 PM *To:* Mohammed Guller *Cc:* Daniel Siegmann; user@spark.apache.org *Subject:* Re: Play framework Mohammed, Jumping in for Daniel, we actually address the configuration issue by pulling values from environment variables or command line options. Maybe that may handle at least some of your needs. For the akka issue, here is the akka version we include in build.sbt: com.typesafe.akka %% akka-actor % 2.2.1 -Suren On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.com wrote: Daniel, Thanks for sharing this. It is very helpful. The reason I want to use Spark submit is that it provides more flexibility. For example, with spark-submit, I don’t need to hard code the master info in the code. I can easily change the config without having to change and recompile code. Do you mind sharing the sbt build file for your play app? I tried to build an uber jar using sbt-assembly. It gets built, but when I run it, it throws all sorts of exception. I have seen some blog posts that Spark and Play use different version of the Akka library. So I included Akka in my build.scala file, but still cannot get rid of Akka related exceptions. I suspect that the settings in the build.scala file for my play project is incorrect. Mohammed *From:* Daniel Siegmann [mailto:daniel.siegm...@velos.io] *Sent:* Thursday, October 16, 2014 7:15 AM *To:* Mohammed Guller *Cc:* user@spark.apache.org *Subject:* Re: Play framework We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome App) // call conf.set for any other configuration you want val sc = new SparkContext(sparkConf) It is important to keep in mind you cannot have multiple local contexts (you can create them but you'll get odd errors), so if you are running things in parallel within your app (even unit tests) you'd need to share a context in this case. If you are running sequentially you can create a new local context each time, but you must make sure to call SparkContext.stop() when you're done. Running against a cluster is a bit more complicated because you need to add all your dependency jars. I'm not sure how to get this to work with play run. I stick to building the app with play dist and then running against the packaged application, because it very conveniently provides all the dependencies in a lib folder. Here is some code to load all the paths you need from the dist: def libs : Seq[String] = { val libDir = play.api.Play.application.getFile(lib) logger.info(sSparkContext will be initialized with libraries from directory $libDir) return if ( libDir.exists ) {
local class incompatible: stream classdesc serialVersionUID
I’ve read several discussions of the error here and so have wiped all cluster machines and copied the master’s spark build to the rest of the cluster. I’ve built my job on the master using the correct Spark version as a dependency and even build that version of Spark. I still get the incompatible serialVersionUID error. If I run the job locally with master = local[8] it completes fine. I thought I had incompatible builds but in the end I’m not quite sure what this error is telling me 14/10/16 15:21:03 WARN scheduler.TaskSetManager: Loss was due to java.io.InvalidClassException java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID = 385418487991259089, local class serialVersionUID = -6766554341038829528 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:560) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1599) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1494) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1599) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1494) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1748) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1814) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1773) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Join with large data set
Hi, I have a rdd which is my application data and is huge. I want to join this with reference data which is also huge to fit in-memory and thus I do not want to use Broadcast variable. What other options do I have to perform such joins? I am using Cassandra as my data store, so should I just query cassandra to get the reference data needed? Also when I join two rdds, will it result in rdd scan or would spark do a hash partition on the two rdds to get the data with same keys on same node? Thanks Ankur
Spark Hive Snappy Error
Hi, When trying Spark with Hive table, I got the “java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I” error, val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(“select count(1) from q8_national_market_share sqlContext.sql(select count(1) from q8_national_market_share).collect().foreach(println) java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) at org.apache.spark.sql.hive.HadoopTableReader.init(TableReader.scala:68) at org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:68) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) at org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:364) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:184) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:146) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at
Re: local class incompatible: stream classdesc serialVersionUID
Yes, I removed my Spark dir and scp’ed the master’s build to all cluster machines suspecting that problem. My app (Apache Mahout) had Spark 1.0.1 in the POM but changing it to 1.0.2 (the Spark version installed) gave another error. I guess I’ll have to install Spark 1.0.1 or get Mahout to update their dependencies. On Oct 16, 2014, at 4:03 PM, Paweł Szulc paul.sz...@gmail.com wrote: This looks like typical issue with serialization of same class between different versions of an application. I've ran into similar (yet not the same) issues before. Are you 100% sure that you have the same version of Apache Spark on each node of the cluster? And I am not only asking about current project version (1.0.0, 1.1.0 etc.) but also about package type (hadoop 1.x, hadoop 2.x). On Fri, Oct 17, 2014 at 12:35 AM, Pat Ferrel p...@occamsmachete.com wrote: I’ve read several discussions of the error here and so have wiped all cluster machines and copied the master’s spark build to the rest of the cluster. I’ve built my job on the master using the correct Spark version as a dependency and even build that version of Spark. I still get the incompatible serialVersionUID error. If I run the job locally with master = local[8] it completes fine. I thought I had incompatible builds but in the end I’m not quite sure what this error is telling me 14/10/16 15:21:03 WARN scheduler.TaskSetManager: Loss was due to java.io.InvalidClassException java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID = 385418487991259089, local class serialVersionUID = -6766554341038829528 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:560) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1599) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1494) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1599) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1494) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1748) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1814) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1773) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: scala: java.net.BindException?
thanks marcelo. i only instantiated sparkcontext once, at the beginning, in this code. the exception was thrown right at the beginning. i also tried to run other programs, which worked fine previously, but now also got the same error. it looks like it put global block on creating sparkcontext that prevents any program to create a sparkcontext. On Oct 16, 2014 6:26 PM, Marcelo Vanzin van...@cloudera.com wrote: This error is not fatal, since Spark will retry on a different port.. but this might be a problem, for different reasons, if somehow your code is trying to instantiate multiple SparkContexts. I assume nn.SimpleNeuralNetwork is part of your application, and since it seems to be instantiating a new SparkContext and also is being called from an iteration, that looks sort of fishy. On Thu, Oct 16, 2014 at 2:51 PM, ll duy.huynh@gmail.com wrote: hello... does anyone know how to resolve this issue? i'm running this locally on my computer. keep getting this BindException. much appreciated. 14/10/16 17:48:13 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext.init(SparkContext.scala:224) at nn.SimpleNeuralNetwork$delayedInit$body.apply(SimpleNeuralNetwork.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scala-java-net-BindException-tp16624.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Exception Logging
I need help to better trap Exception in the map functions. What is the best way to catch the exception and provide some helpful diagnostic information such as source of the input such as file name (and ideally line number if I am processing a text file)? -Yao
object in an rdd: serializable?
i got an exception complaining about serializable. the sample code is below... class HelloWorld(val count: Int) { ... ... } object Test extends App { ... val data = sc.parallelize(List(new HelloWorld(1), new HelloWorld(2))) ... } what is the best way to serialize HelloWorld so that it can be contained in an RDD? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/object-in-an-rdd-serializable-tp16638.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception Logging
you can out a try catch block in the map function and log the exception. The only tricky part is that the exception log will be located in the executor machine. Even if you don't do any trapping you should see the exception stacktrace in the executors' stderr log which is visible through the UI (if your app crashes the executor you can still see it as the last executor that ran on a given worker). But things like println and logging work inside map, you just have to remember everything happens on the remote machine On Thu, Oct 16, 2014 at 8:11 PM, Ge, Yao (Y.) y...@ford.com wrote: I need help to better trap Exception in the map functions. What is the best way to catch the exception and provide some helpful diagnostic information such as source of the input such as file name (and ideally line number if I am processing a text file)? -Yao
RE: Play framework
Manu, I had looked at that example before starting this thread. I was specifically looking for some suggestions on how to run a Play app with the Spark-submit script on a real cluster. Mohammed From: Manu Suryavansh [mailto:suryavanshi.m...@gmail.com] Sent: Thursday, October 16, 2014 3:32 PM To: Mohammed Guller Cc: US Office Admin; Surendranauth Hiraman; Daniel Siegmann; user@spark.apache.org Subject: Re: Play framework Hi, Below is the link for a simple Play + SparkSQL example - http://blog.knoldus.com/2014/07/14/play-with-spark-building-apache-spark-with-play-framework-part-3/ https://github.com/knoldus/Play-Spark-Scala Manu On Thu, Oct 16, 2014 at 1:00 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Thanks, Suren and Raju. Raju – if I remember correctly, Play package command just creates a jar for your app. That jar file will not include other dependencies. So it is not really a full jar as you mentioned below. So how you are passing all the other dependency jars to spark? Can you share that piece of code? Also is there any specific reason why you are not using play dist instead? Mohammed From: US Office Admin [mailto:ad...@vectorum.commailto:ad...@vectorum.com] Sent: Thursday, October 16, 2014 11:41 AM To: Surendranauth Hiraman; Mohammed Guller Cc: Daniel Siegmann; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework We integrated Spark into Play and use SparkSQL extensively on an ec2 spark cluster on Hadoop hdfs 1.2.1 and tachyon 0.4. Step 1: Create a play scala application as usual Step 2. In Build.sbt put all your spark dependencies. What works for us is Play 2.2.3 Scala 2.10.4 Spark 1.1. We have Akka 2.2.3. This is straight forward step3: As Daniel mentioned, create spark context within Play. And rest of the application is as usual. Step4: Create a full jar using Play Package and use that package to be included in library of jars passed to spark context. Step 5: Play run as usual. It works very well, and the convenience is, we have all scala application throughout. Regards Raju From: Surendranauth Hiraman suren.hira...@velos.iomailto:suren.hira...@velos.io Sent: Thursday, October 16, 2014 12:42 PM To: Mohammed Guller Cc: Daniel Siegmann; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework Mohammed, Jumping in for Daniel, we actually address the configuration issue by pulling values from environment variables or command line options. Maybe that may handle at least some of your needs. For the akka issue, here is the akka version we include in build.sbt: com.typesafe.akka %% akka-actor % 2.2.1 -Suren On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Daniel, Thanks for sharing this. It is very helpful. The reason I want to use Spark submit is that it provides more flexibility. For example, with spark-submit, I don’t need to hard code the master info in the code. I can easily change the config without having to change and recompile code. Do you mind sharing the sbt build file for your play app? I tried to build an uber jar using sbt-assembly. It gets built, but when I run it, it throws all sorts of exception. I have seen some blog posts that Spark and Play use different version of the Akka library. So I included Akka in my build.scala file, but still cannot get rid of Akka related exceptions. I suspect that the settings in the build.scala file for my play project is incorrect. Mohammed From: Daniel Siegmann [mailto:daniel.siegm...@velos.iomailto:daniel.siegm...@velos.io] Sent: Thursday, October 16, 2014 7:15 AM To: Mohammed Guller Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome App) // call conf.set for any other configuration you want val sc = new SparkContext(sparkConf) It is important to keep in mind you cannot have multiple local contexts (you can create them but you'll get odd errors), so if you are running things in parallel within your app (even unit tests) you'd need to share a context in this case. If you are running sequentially you can create a new local context each time, but you must make sure to call SparkContext.stop() when you're done. Running against a cluster is a bit more complicated because you need to add all your dependency jars. I'm not sure how to get this
Re: [SparkSQL] Convert JavaSchemaRDD to SchemaRDD
I'm trying to give API interface to Java users. And I need to accept their JavaSchemaRDDs, and convert it to SchemaRDD for Scala users. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Convert-JavaSchemaRDD-to-SchemaRDD-tp16482p16641.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Hive Snappy Error
Hi Arthur, I think this is a known issue in Spark, you can check (https://issues.apache.org/jira/browse/SPARK-3958). I’m curious about it, can you always reproduce this issue, Is this issue related to some specific data sets, would you mind giving me some information about you workload, Spark configuration, JDK version and OS version? Thanks Jerry From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] Sent: Friday, October 17, 2014 7:13 AM To: user Cc: arthur.hk.c...@gmail.com Subject: Spark Hive Snappy Error Hi, When trying Spark with Hive table, I got the “java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I” error, val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(“select count(1) from q8_national_market_share sqlContext.sql(select count(1) from q8_national_market_share).collect().foreach(println) java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) at org.apache.spark.sql.hive.HadoopTableReader.init(TableReader.scala:68) at org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:68) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) at org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:364) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:184) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:146) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at $iwC$$iwC$$iwC$$iwC.init(console:15) at $iwC$$iwC$$iwC.init(console:20) at $iwC$$iwC.init(console:22) at $iwC.init(console:24) at init(console:26) at .init(console:30) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at
how to build spark 1.1.0 to include org.apache.commons.math3 ?
HI All, I try to build spark 1.1.0 using sbt with command: sbt/sbt -Dhadoop.version=2.2.0 -Pyarn assembly but the resulting spark-assembly-1.1.0-hadoop2.2.0.jar still missing the apache commons math3 classes. How to add the math3 into package? Best regards, Henry The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
Re: Play framework
The remaining dependencies (Spark libraries) are available for the context from the sparkhome. I have installed spark such that all the slaves to have same sparkhome. Code looks like this. val conf = new SparkConf() .setSparkHome(/home/dev/spark) .setMaster(spark://99.99.99.999:7077) .setAppName(xxx) .setJars(Seq(/home/dev/play/target/scala-2.10/xxx_2.10-1.0.jar)) val sc = new SparkContext(sparkConf) If you have more dependancies, you can keep adding them to the setJars. Raju From: Mohammed Guller moham...@glassbeam.com Sent: Thursday, October 16, 2014 4:00 PM To: US Office Admin; Surendranauth Hiraman Cc: Daniel Siegmann; user@spark.apache.org Subject: RE: Play framework Thanks, Suren and Raju. Raju – if I remember correctly, Play package command just creates a jar for your app. That jar file will not include other dependencies. So it is not really a full jar as you mentioned below. So how you are passing all the other dependency jars to spark? Can you share that piece of code? Also is there any specific reason why you are not using play dist instead? Mohammed From: US Office Admin [mailto:ad...@vectorum.com] Sent: Thursday, October 16, 2014 11:41 AM To: Surendranauth Hiraman; Mohammed Guller Cc: Daniel Siegmann; user@spark.apache.org Subject: Re: Play framework We integrated Spark into Play and use SparkSQL extensively on an ec2 spark cluster on Hadoop hdfs 1.2.1 and tachyon 0.4. Step 1: Create a play scala application as usual Step 2. In Build.sbt put all your spark dependencies. What works for us is Play 2.2.3 Scala 2.10.4 Spark 1.1. We have Akka 2.2.3. This is straight forward step3: As Daniel mentioned, create spark context within Play. And rest of the application is as usual. Step4: Create a full jar using Play Package and use that package to be included in library of jars passed to spark context. Step 5: Play run as usual. It works very well, and the convenience is, we have all scala application throughout. Regards Raju From: Surendranauth Hiraman suren.hira...@velos.iomailto:suren.hira...@velos.io Sent: Thursday, October 16, 2014 12:42 PM To: Mohammed Guller Cc: Daniel Siegmann; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework Mohammed, Jumping in for Daniel, we actually address the configuration issue by pulling values from environment variables or command line options. Maybe that may handle at least some of your needs. For the akka issue, here is the akka version we include in build.sbt: com.typesafe.akka %% akka-actor % 2.2.1 -Suren On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Daniel, Thanks for sharing this. It is very helpful. The reason I want to use Spark submit is that it provides more flexibility. For example, with spark-submit, I don’t need to hard code the master info in the code. I can easily change the config without having to change and recompile code. Do you mind sharing the sbt build file for your play app? I tried to build an uber jar using sbt-assembly. It gets built, but when I run it, it throws all sorts of exception. I have seen some blog posts that Spark and Play use different version of the Akka library. So I included Akka in my build.scala file, but still cannot get rid of Akka related exceptions. I suspect that the settings in the build.scala file for my play project is incorrect. Mohammed From: Daniel Siegmann [mailto:daniel.siegm...@velos.iomailto:daniel.siegm...@velos.io] Sent: Thursday, October 16, 2014 7:15 AM To: Mohammed Guller Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome App) // call conf.set for any other configuration you want val sc = new SparkContext(sparkConf) It is important to keep in mind you cannot have multiple local contexts (you can create them but you'll get odd errors), so if you are running things in parallel within your app (even unit tests) you'd need to share a context in this case. If you are running sequentially you can create a new local context each time, but you must make sure to call SparkContext.stop() when you're done. Running against a cluster is a bit more complicated because you need to add all your dependency jars. I'm not sure how to get this to work with play run. I stick to building the app with play dist
RE: Play framework
What about all the play dependencies since the jar created by the ‘Play package’ won’t include the play jar or any of the 100+ jars on which play itself depends? Mohammed From: US Office Admin [mailto:ad...@vectorum.com] Sent: Thursday, October 16, 2014 7:05 PM To: Mohammed Guller; Surendranauth Hiraman Cc: Daniel Siegmann; user@spark.apache.org Subject: Re: Play framework The remaining dependencies (Spark libraries) are available for the context from the sparkhome. I have installed spark such that all the slaves to have same sparkhome. Code looks like this. val conf = new SparkConf() .setSparkHome(/home/dev/spark) .setMaster(spark://99.99.99.999:7077) .setAppName(xxx) .setJars(Seq(/home/dev/play/target/scala-2.10/xxx_2.10-1.0.jar)) val sc = new SparkContext(sparkConf) If you have more dependancies, you can keep adding them to the setJars. Raju From: Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com Sent: Thursday, October 16, 2014 4:00 PM To: US Office Admin; Surendranauth Hiraman Cc: Daniel Siegmann; user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Play framework Thanks, Suren and Raju. Raju – if I remember correctly, Play package command just creates a jar for your app. That jar file will not include other dependencies. So it is not really a full jar as you mentioned below. So how you are passing all the other dependency jars to spark? Can you share that piece of code? Also is there any specific reason why you are not using play dist instead? Mohammed From: US Office Admin [mailto:ad...@vectorum.com] Sent: Thursday, October 16, 2014 11:41 AM To: Surendranauth Hiraman; Mohammed Guller Cc: Daniel Siegmann; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework We integrated Spark into Play and use SparkSQL extensively on an ec2 spark cluster on Hadoop hdfs 1.2.1 and tachyon 0.4. Step 1: Create a play scala application as usual Step 2. In Build.sbt put all your spark dependencies. What works for us is Play 2.2.3 Scala 2.10.4 Spark 1.1. We have Akka 2.2.3. This is straight forward step3: As Daniel mentioned, create spark context within Play. And rest of the application is as usual. Step4: Create a full jar using Play Package and use that package to be included in library of jars passed to spark context. Step 5: Play run as usual. It works very well, and the convenience is, we have all scala application throughout. Regards Raju From: Surendranauth Hiraman suren.hira...@velos.iomailto:suren.hira...@velos.io Sent: Thursday, October 16, 2014 12:42 PM To: Mohammed Guller Cc: Daniel Siegmann; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework Mohammed, Jumping in for Daniel, we actually address the configuration issue by pulling values from environment variables or command line options. Maybe that may handle at least some of your needs. For the akka issue, here is the akka version we include in build.sbt: com.typesafe.akka %% akka-actor % 2.2.1 -Suren On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Daniel, Thanks for sharing this. It is very helpful. The reason I want to use Spark submit is that it provides more flexibility. For example, with spark-submit, I don’t need to hard code the master info in the code. I can easily change the config without having to change and recompile code. Do you mind sharing the sbt build file for your play app? I tried to build an uber jar using sbt-assembly. It gets built, but when I run it, it throws all sorts of exception. I have seen some blog posts that Spark and Play use different version of the Akka library. So I included Akka in my build.scala file, but still cannot get rid of Akka related exceptions. I suspect that the settings in the build.scala file for my play project is incorrect. Mohammed From: Daniel Siegmann [mailto:daniel.siegm...@velos.iomailto:daniel.siegm...@velos.io] Sent: Thursday, October 16, 2014 7:15 AM To: Mohammed Guller Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Play framework We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome App) // call conf.set for any other configuration you want val sc = new SparkContext(sparkConf) It is important to keep in mind you cannot have multiple local contexts (you can create them but you'll get odd
Re: object in an rdd: serializable?
make it a case class should work. On Thu, Oct 16, 2014 at 8:30 PM, ll duy.huynh@gmail.com wrote: i got an exception complaining about serializable. the sample code is below... class HelloWorld(val count: Int) { ... ... } object Test extends App { ... val data = sc.parallelize(List(new HelloWorld(1), new HelloWorld(2))) ... } what is the best way to serialize HelloWorld so that it can be contained in an RDD? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/object-in-an-rdd-serializable-tp16638.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Play framework
In our case, Play libraries are not required to run spark jobs. Hence they are available only on master and play runs as a regular scala application. I can't think of a case where you need play to run on slaves. Raju On Thu, Oct 16, 2014 at 10:21 PM, Mohammed Guller moham...@glassbeam.com wrote: What about all the play dependencies since the jar created by the ‘Play package’ won’t include the play jar or any of the 100+ jars on which play itself depends? Mohammed *From:* US Office Admin [mailto:ad...@vectorum.com] *Sent:* Thursday, October 16, 2014 7:05 PM *To:* Mohammed Guller; Surendranauth Hiraman *Cc:* Daniel Siegmann; user@spark.apache.org *Subject:* Re: Play framework The remaining dependencies (Spark libraries) are available for the context from the sparkhome. I have installed spark such that all the slaves to have same sparkhome. Code looks like this. val conf = new SparkConf() .setSparkHome(/home/dev/spark) .setMaster(spark://99.99.99.999:7077) .setAppName(xxx) .setJars(Seq(/home/dev/play/target/scala-2.10/xxx_2.10-1.0.jar)) val sc = new SparkContext(sparkConf) If you have more dependancies, you can keep adding them to the setJars. Raju -- *From:* Mohammed Guller moham...@glassbeam.com *Sent:* Thursday, October 16, 2014 4:00 PM *To:* US Office Admin; Surendranauth Hiraman *Cc:* Daniel Siegmann; user@spark.apache.org *Subject:* RE: Play framework Thanks, Suren and Raju. Raju – if I remember correctly, Play package command just creates a jar for your app. That jar file will not include other dependencies. So it is not really a full jar as you mentioned below. So how you are passing all the other dependency jars to spark? Can you share that piece of code? Also is there any specific reason why you are not using play dist instead? Mohammed *From:* US Office Admin [mailto:ad...@vectorum.com ad...@vectorum.com] *Sent:* Thursday, October 16, 2014 11:41 AM *To:* Surendranauth Hiraman; Mohammed Guller *Cc:* Daniel Siegmann; user@spark.apache.org *Subject:* Re: Play framework We integrated Spark into Play and use SparkSQL extensively on an ec2 spark cluster on Hadoop hdfs 1.2.1 and tachyon 0.4. Step 1: Create a play scala application as usual Step 2. In Build.sbt put all your spark dependencies. What works for us is Play 2.2.3 Scala 2.10.4 Spark 1.1. We have Akka 2.2.3. This is straight forward step3: As Daniel mentioned, create spark context within Play. And rest of the application is as usual. Step4: Create a full jar using Play Package and use that package to be included in library of jars passed to spark context. Step 5: Play run as usual. It works very well, and the convenience is, we have all scala application throughout. Regards Raju -- *From:* Surendranauth Hiraman suren.hira...@velos.io *Sent:* Thursday, October 16, 2014 12:42 PM *To:* Mohammed Guller *Cc:* Daniel Siegmann; user@spark.apache.org *Subject:* Re: Play framework Mohammed, Jumping in for Daniel, we actually address the configuration issue by pulling values from environment variables or command line options. Maybe that may handle at least some of your needs. For the akka issue, here is the akka version we include in build.sbt: com.typesafe.akka %% akka-actor % 2.2.1 -Suren On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.com wrote: Daniel, Thanks for sharing this. It is very helpful. The reason I want to use Spark submit is that it provides more flexibility. For example, with spark-submit, I don’t need to hard code the master info in the code. I can easily change the config without having to change and recompile code. Do you mind sharing the sbt build file for your play app? I tried to build an uber jar using sbt-assembly. It gets built, but when I run it, it throws all sorts of exception. I have seen some blog posts that Spark and Play use different version of the Akka library. So I included Akka in my build.scala file, but still cannot get rid of Akka related exceptions. I suspect that the settings in the build.scala file for my play project is incorrect. Mohammed *From:* Daniel Siegmann [mailto:daniel.siegm...@velos.io] *Sent:* Thursday, October 16, 2014 7:15 AM *To:* Mohammed Guller *Cc:* user@spark.apache.org *Subject:* Re: Play framework We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new
Re: spark1.0 principal component analysis
computePrincipalComponents returns a local matrix X, whose columns are the principal components (ordered), while those column vectors are in the same feature space as the input feature vectors. -Xiangrui On Thu, Oct 16, 2014 at 2:39 AM, al123 ant.lay...@hotmail.co.uk wrote: Hi, I don't think anybody answered this question... fintis wrote How do I match the principal components to the actual features since there is some sorting? Would anybody be able to shed a little light on it since I too am struggling with this? Many thanks!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-principal-component-analysis-tp9249p16556.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to close resources shared in executor?
Thanks, Ted, We use CDH 5.1 and the HBase version is 0.98.1-cdh5.1.0, in which the javadoc of HConnectionManager.java still recommends shutdown hook. I look into val table = Util.Connection.getTable(user), and find it didn't invoke public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool) but public HTable(TableName tableName, final HConnection connection, final ExecutorService pool) throws IOException { if (connection == null || connection.isClosed()) { throw new IllegalArgumentException(Connection is null or closed.); } this.tableName = tableName; this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); this.pool = pool; this.finishSetup(); } in which cleanupConnectionOnClose is false 2014-10-16 22:51 GMT+08:00 Ted Yu yuzhih...@gmail.com: Which hbase release are you using ? Let me refer to 0.94 code hbase. Take a look at the following method in src/main/java/org/apache/hadoop/hbase/client/HTable.java : public void close() throws IOException { ... if (cleanupConnectionOnClose) { if (this.connection != null) { this.connection.close(); When Connection.getTable() is called, the following is invoked: public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool) which sets cleanupConnectionOnClose to true. w.r.t. javadoc, the paragraph on shutdown hook is in HConnectionManager.java of 0.94 You don't need to use shutdown hook for 0.94+ Cheers On Wed, Oct 15, 2014 at 11:41 PM, Fengyun RAO raofeng...@gmail.com wrote: I may have misunderstood your point. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... table.close() } Did you mean this is enough, and there’s no need to call Util.Connection.close(), or HConnectionManager.deleteAllConnections()? Where is the documentation that statesHconnectionManager would release underlying connection automatically? If that’s true, maybe the Javadoc which recommends a shutdown hook needs update 2014-10-16 14:20 GMT+08:00 Fengyun RAO raofeng...@gmail.com: Thanks, Ted. Util.Connection.close() should be called only once, so it can NOT be in a map function val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } As you mentioned: Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Yes, we should call table.close(), but it won’t remove HConnection in HConnectionManager which is a HConnection pool. As I look into the HconnectionManager Javadoc, it seems I have to implement a shutdown hook * pCleanup used to be done inside in a shutdown hook. On startup we'd * register a shutdown hook that called {@link #deleteAllConnections()} * on its way out but the order in which shutdown hooks run is not defined so * were problematic for clients of HConnection that wanted to register their * own shutdown hooks so we removed ours though this shifts the onus for * cleanup to the client. 2014-10-15 22:31 GMT+08:00 Ted Yu yuzhih...@gmail.com: Pardon me - there was typo in previous email. Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Cheers On Wed, Oct 15, 2014 at 7:13 AM, Ted Yu yuzhih...@gmail.com wrote: Have you tried the following ? val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO raofeng...@gmail.com wrote: In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be shared among tasks on the same executor. e.g. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... } However, we don’t how to close the Util.Connection. If we write Util.Connection.close() in the main function, it’ll only run on the driver, not the executor. So, How to make sure every Connection closed before exist?
Re: How to close resources shared in executor?
Looking at Apache 0.98 code, you can follow the example in the class javadoc (line 144 of HConnectionManager.java): * HTableInterface table = connection.getTable(table1); * try { * // Use the table as needed, for a single operation and a single thread * } finally { * table.close(); * connection.close(); * } Cheers On Thu, Oct 16, 2014 at 9:03 PM, Fengyun RAO raofeng...@gmail.com wrote: Thanks, Ted, We use CDH 5.1 and the HBase version is 0.98.1-cdh5.1.0, in which the javadoc of HConnectionManager.java still recommends shutdown hook. I look into val table = Util.Connection.getTable(user), and find it didn't invoke public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool) but public HTable(TableName tableName, final HConnection connection, final ExecutorService pool) throws IOException { if (connection == null || connection.isClosed()) { throw new IllegalArgumentException(Connection is null or closed.); } this.tableName = tableName; this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); this.pool = pool; this.finishSetup(); } in which cleanupConnectionOnClose is false 2014-10-16 22:51 GMT+08:00 Ted Yu yuzhih...@gmail.com: Which hbase release are you using ? Let me refer to 0.94 code hbase. Take a look at the following method in src/main/java/org/apache/hadoop/hbase/client/HTable.java : public void close() throws IOException { ... if (cleanupConnectionOnClose) { if (this.connection != null) { this.connection.close(); When Connection.getTable() is called, the following is invoked: public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool) which sets cleanupConnectionOnClose to true. w.r.t. javadoc, the paragraph on shutdown hook is in HConnectionManager.java of 0.94 You don't need to use shutdown hook for 0.94+ Cheers On Wed, Oct 15, 2014 at 11:41 PM, Fengyun RAO raofeng...@gmail.com wrote: I may have misunderstood your point. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... table.close() } Did you mean this is enough, and there’s no need to call Util.Connection.close(), or HConnectionManager.deleteAllConnections()? Where is the documentation that statesHconnectionManager would release underlying connection automatically? If that’s true, maybe the Javadoc which recommends a shutdown hook needs update 2014-10-16 14:20 GMT+08:00 Fengyun RAO raofeng...@gmail.com: Thanks, Ted. Util.Connection.close() should be called only once, so it can NOT be in a map function val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } As you mentioned: Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Yes, we should call table.close(), but it won’t remove HConnection in HConnectionManager which is a HConnection pool. As I look into the HconnectionManager Javadoc, it seems I have to implement a shutdown hook * pCleanup used to be done inside in a shutdown hook. On startup we'd * register a shutdown hook that called {@link #deleteAllConnections()} * on its way out but the order in which shutdown hooks run is not defined so * were problematic for clients of HConnection that wanted to register their * own shutdown hooks so we removed ours though this shifts the onus for * cleanup to the client. 2014-10-15 22:31 GMT+08:00 Ted Yu yuzhih...@gmail.com: Pardon me - there was typo in previous email. Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Cheers On Wed, Oct 15, 2014 at 7:13 AM, Ted Yu yuzhih...@gmail.com wrote: Have you tried the following ? val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO raofeng...@gmail.com wrote: In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be shared among tasks on the same executor. e.g. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... } However, we don’t how to close the Util.Connection. If we write Util.Connection.close() in the main function, it’ll only run on the driver, not the executor. So, How to make sure every Connection closed before exist?
error when maven build spark 1.1.0 with message You have 1 Scalastyle violation
Hi All, I'm using windows 8.1 to build spark 1.1.0 using this command: C:\apache-maven-3.0.5\bin\mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package -e Below is the error message: [ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execution: You have 1 Scalastyle violation(s). - [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execution at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59) at org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196) at org.apache.maven.cli.MavenCli.main(MavenCli.java:141) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:290) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:230) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:409) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:352) Caused by: org.apache.maven.plugin.MojoExecutionException: Failed during scalastyle execution at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.performCheck(ScalastyleViolationCheckMojo.java:238) at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.execute(ScalastyleViolationCheckMojo.java:199) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:101) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:209) ... 19 more Caused by: org.apache.maven.plugin.MojoFailureException: You have 1 Scalastyle violation(s). at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.performCheck(ScalastyleViolationCheckMojo.java:230) ... 22 more [ERROR] [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn goals -rf :spark-mllib_2.10 Best regards, Henry The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.