Re: Using functional programming rather than SQL
Kevin gave you the answer you need, but I'd like to comment on your subject line. SQL is a limited form of FP. Sure, there are no anonymous functions and other limitations, but it's declarative, like good FP programs should be, and it offers an important subset of the operators ("combinators") you want. Also, on a practical note, use the DataFrame API whenever you can, rather than dropping down to the RDD API, because the DataFrame API is far more performant. It's a classic case where restricting your options enables more aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark Summit East nicely made this point. http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott <kevin.r.mell...@gmail.com> wrote: > In your example, the *rs* instance should be a DataFrame object. In other > words, the result of *HiveContext.sql* is a DataFrame that you can > manipulate using *filter, map, *etc. > > > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext > > > On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh < > mich.talebza...@cloudtechnologypartners.co.uk> wrote: > >> Hi, >> >> I have data stored in Hive tables that I want to do simple manipulation. >> >> Currently in Spark I perform the following with getting the result set >> using SQL from Hive tables, registering as a temporary table in Spark >> >> Now Ideally I can get the result set into a DF and work on DF to slice >> and dice the data using functional programming with filter, map. split etc. >> >> I wanted to get some ideas on how to go about it. >> >> thanks >> >> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> >> HiveContext.sql("use oraclehadoop") >> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, >> SUM(s.amount_sold) AS TotalSales >> FROM smallsales s, times t, channels c >> WHERE s.time_id = t.time_id >> AND s.channel_id = c.channel_id >> GROUP BY t.calendar_month_desc, c.channel_desc >> """) >> *rs.registerTempTable("tmp")* >> >> >> HiveContext.sql(""" >> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales >> from tmp >> ORDER BY MONTH, CHANNEL >> """).collect.foreach(println) >> HiveContext.sql(""" >> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES >> FROM tmp >> GROUP BY channel_desc >> order by SALES DESC >> """).collect.foreach(println) >> >> >> -- >> >> Dr Mich Talebzadeh >> >> LinkedIn >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> http://talebzadehmich.wordpress.com >> >> NOTE: The information in this email is proprietary and confidential. This >> message is for the designated recipient only, if you are not the intended >> recipient, you should destroy it immediately. Any information in this >> message shall not be understood as given or endorsed by Cloud Technology >> Partners Ltd, its subsidiaries or their employees, unless expressly so >> stated. It is the responsibility of the recipient to ensure that this email >> is virus free, therefore neither Cloud Technology partners Ltd, its >> subsidiaries nor their employees accept any responsibility. >> >> >> >
Re: [Spark Streaming] Spark Streaming dropping last lines
Here's a wild guess; it might be the fact that your first command uses tail -f, so it doesn't close the input file handle when it hits the end of the available bytes, while your second use of nc does this. If so, the last few lines might be stuck in a buffer waiting to be forwarded. If so, Spark wouldn't see these bytes. You could test this by using nc or another program on the other end of the socket and see if it receives all the bytes. What happens if you add the -q 10 option to your nc command in the first case? That is, force it to close when no more bytes are seen for 10 seconds? HTH, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Wed, Feb 10, 2016 at 3:51 PM, Nipun Arora <nipunarora2...@gmail.com> wrote: > Hi All, > > I apologize for reposting, I wonder if anyone can explain this behavior? > And what would be the best way to resolve this without introducing > something like kafka in the midst. > I basically have a logstash instance, and would like to stream output of > logstash to spark_streaming without introducing a new message passing > service like kafka/redis in the midst. > > We will eventually probably use kafka, but for now I need guaranteed > delivery. > > For the tail -f |nc -lk command, I wait for a significant > time after spark stops receiving any data in it's microbatches. I confirm > that it's not getting any data, i.e. the file end has probably been reached > by printing the first two lines of every micro-batch. > > Thanks > Nipun > > > > On Mon, Feb 8, 2016 at 10:05 PM Nipun Arora <nipunarora2...@gmail.com> > wrote: > >> I have a spark-streaming service, where I am processing and detecting >> anomalies on the basis of some offline generated model. I feed data into >> this service from a log file, which is streamed using the following command >> >> tail -f | nc -lk >> >> Here the spark streaming service is taking data from port . Once >> spark has finished processing, and is showing that it is processing empty >> micro-batches, I kill both spark, and the netcat process above. However, I >> observe that the last few lines are being dropped in some cases, i.e. spark >> streaming does not receive those log lines or they are not processed. >> >> However, I also observed that if I simply take the logfile as standard >> input instead of tailing it, the connection is closed at the end of the >> file, and no lines are dropped: >> >> nc -q 10 -lk < logfile >> >> Can anyone explain why this behavior is happening? And what could be a >> better resolution to the problem of streaming log data to spark streaming >> instance? >> >> >> Thanks >> >> Nipun >> >
Re: Trying to understand dynamic resource allocation
It works on Mesos, too. I'm not sure about Standalone mode. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Mon, Jan 11, 2016 at 10:01 AM, Nick Peterson <nrpeter...@gmail.com> wrote: > My understanding is that dynamic allocation is only enabled for > Spark-on-Yarn. Those settings likely have no impact in standalone mode. > > Nick > > On Mon, Jan 11, 2016, 5:10 AM Yiannis Gkoufas <johngou...@gmail.com> > wrote: > >> Hi, >> >> I am exploring a bit the dynamic resource allocation provided by the >> Standalone Cluster Mode and I was wondering whether this behavior I am >> experiencing is expected. >> In my configuration I have 3 slaves with 24 cores each. >> I have in my spark-defaults.conf: >> >> spark.shuffle.service.enabled true >> spark.dynamicAllocation.enabled true >> spark.dynamicAllocation.minExecutors 1 >> spark.dynamicAllocation.maxExecutors 6 >> spark.executor.cores 4 >> >> When I submit a first Job it takes up all of the 72 cores of the cluster. >> When I submit the second Job while the first one is running I get the >> error: >> >> Initial job has not accepted any resources; check your cluster UI to >> ensure that workers are registered and have sufficient resources >> >> Is this the expected behavior? >> >> Thanks a lot >> >> >>
Re: Spark Context not getting initialized in local mode
ClassNotFoundException usually means one of a few problems: 1. Your app assembly is missing the jar files with those classes. 2. You mixed jar files from imcompatible versions in your assembly. 3. You built with one version of Spark and deployed to another. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Fri, Jan 8, 2016 at 1:24 AM, Rahul Kumar <rahul.kuma...@snapdeal.com> wrote: > > > > > Hi all, > I am trying to start solr with a custom plugin which uses spark library. I > am trying to initialize sparkcontext in local mode. I have made a fat jar > for this plugin using maven shade and put it in the lib directory. *While > starting solr it is not able to initialize sparkcontext.* It says class > not found exception for AkkaRpcEnvFactory. Can anyone please help. > > *It gives the following error:* > > 3870 [coreLoadExecutor-4-thread-1] ERROR org.apache.spark.SparkContext – > Error initializing SparkContext. > java.lang.ClassNotFoundException:org.apache.spark.rpc.akka.AkkaRpcEnvFactory > > *Here is the detailed error* > > java -jar start.jar0[main] INFO org.eclipse.jetty.server.Server – > jetty-8.1.10.v2013031227 [main] INFO > org.eclipse.jetty.deploy.providers.ScanningAppProvider – Deployment monitor > /home/rahul/solr-4.7.2/example/contexts at interval 040 [main] INFO > org.eclipse.jetty.deploy.DeploymentManager – Deployable added: > /home/rahul/solr-4.7.2/example/contexts/solr-jetty-context.xml1095 [main] > INFO org.eclipse.jetty.webapp.StandardDescriptorProcessor – NO JSP Support > for /solr, did not find org.apache.jasper.servlet.JspServlet1155 [main] INFO > org.apache.solr.servlet.SolrDispatchFilter – SolrDispatchFilter.init()1189 > [main] INFO org.apache.solr.core.SolrResourceLoader – JNDI not configured > for solr (NoInitialContextEx)1190 [main] INFO > org.apache.solr.core.SolrResourceLoader – solr home defaulted to 'solr/' > (could not find system property or JNDI)1190 [main] INFO > org.apache.solr.core.SolrResourceLoader – new SolrResourceLoader for > directory: 'solr/'1280 [main] INFO org.apache.solr.core.ConfigSolr – > Loading container configuration from > /home/rahul/solr-4.7.2/example/solr/solr.xml1458 [main] INFO > org.apache.solr.core.CoresLocator – Config-defined core root directory: > /home/rahul/solr-4.7.2/example/solr1465 [main] INFO > org.apache.solr.core.CoreContainer – New CoreContainer > 602710225... > 3870 [coreLoadExecutor-4-thread-1] ERROR org.apache.spark.SparkContext – > Error initializing SparkContext. > java.lang.ClassNotFoundException: org.apache.spark.rpc.akka.AkkaRpcEnvFactory > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at > org.eclipse.jetty.webapp.WebAppClassLoader.loadClass(WebAppClassLoader.java:430) > at > org.eclipse.jetty.webapp.WebAppClassLoader.loadClass(WebAppClassLoader.java:383) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:274) > at org.apache.spark.util.Utils$.classForName(Utils.scala:173) > at org.apache.spark.rpc.RpcEnv$.getRpcEnvFactory(RpcEnv.scala:42) > at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:53) > at org.apache.spark.SparkEnv$.create(SparkEnv.scala:252) > at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193) > at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276) > at org.apache.spark.SparkContext.(SparkContext.scala:441) > at > org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) > at > com.snapdeal.search.spark.SparkLoadModel.loadModel(SparkLoadModel.java:11) > at > com.snapdeal.search.valuesource.parser.RankingModelValueSourceParser.init(RankingModelValueSourceParser.java:29) > at org.apache.solr.core.SolrCore.createInitInstance(SolrCore.java:591) > at org.apache.solr.core.SolrCore.initPlugins(SolrCore.java:2191) > at org.apache.solr.core.SolrCore.initPlugins(SolrCore.java:2185) > at org.apache.solr.core.SolrCore.initPlugins(SolrCore.java:2218) > at > org.apache.solr.core.SolrCore.initValueSourceParsers(SolrCore.java:2130) > at org.apache.solr.core.SolrCore.(SolrCore.java:765) > at org.apache.solr.core.SolrCore.(So
Re: Networking problems in Spark 1.6.0
Still, it would be good to know what happened exactly. Why did the netty dependency expect Java 8? Did you build your app on a machine with Java 8 and deploy on a Java 7 machine? Anyway, I played with the 1.6.0 spark-shell using Java 7 and it worked fine. I also looked at the distribution's class files using e.g., $ cd $HOME/spark/spark-1.6.0-bin-hadoop2.6 $ jar xf lib/spark-assembly-1.6.0-hadoop2.6.0.jar org/apache/spark/rpc/netty/Dispatcher.class $ javap -classpath . -verbose org.apache.spark.rpc.netty.Dispatcher | grep version minor version: 0 major version: 50 So, it was compiled with Java 6 (see https://en.wikipedia.org/wiki/Java_class_file). So, it doesn't appear to be a Spark build issue. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Tue, Jan 5, 2016 at 9:01 AM, Yiannis Gkoufas <johngou...@gmail.com> wrote: > Hi Dean, > > thanks so much for the response! It works without a problem now! > > On 5 January 2016 at 14:33, Dean Wampler <deanwamp...@gmail.com> wrote: > >> ConcurrentHashMap.keySet() returning a KeySetView is a Java 8 method. The >> Java 7 method returns a Set. Are you running Java 7? What happens if you >> run Java 8? >> >> Dean Wampler, Ph.D. >> Author: Programming Scala, 2nd Edition >> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >> Typesafe <http://typesafe.com> >> @deanwampler <http://twitter.com/deanwampler> >> http://polyglotprogramming.com >> >> On Tue, Jan 5, 2016 at 8:29 AM, Yiannis Gkoufas <johngou...@gmail.com> >> wrote: >> >>> Hi there, >>> >>> I have been using Spark 1.5.2 on my cluster without a problem and wanted >>> to try Spark 1.6.0. >>> I have the exact same configuration on both clusters. >>> I am able to start the Standalone Cluster but I fail to submit a job >>> getting errors like the following: >>> >>> 16/01/05 14:24:14 INFO AppClient$ClientEndpoint: Connecting to master >>> spark://my-ip:7077... >>> 16/01/05 14:24:34 INFO AppClient$ClientEndpoint: Connecting to master >>> spark://my-ip:7077... >>> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master >>> spark://my-ip:7077... >>> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master >>> spark://my-ip:7077... >>> 16/01/05 14:24:54 WARN TransportChannelHandler: Exception in connection >>> from my-ip/X.XXX.XX.XX:7077 >>> java.lang.NoSuchMethodError: >>> java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView; >>> at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:106) >>> at >>> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:586) >>> at >>> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577) >>> at >>> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170) >>> at >>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104) >>> at >>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) >>> at >>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) >>> at >>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >>> at >>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >>> at >>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>> at >>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >>> at >>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >>>
Re: Networking problems in Spark 1.6.0
ConcurrentHashMap.keySet() returning a KeySetView is a Java 8 method. The Java 7 method returns a Set. Are you running Java 7? What happens if you run Java 8? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Tue, Jan 5, 2016 at 8:29 AM, Yiannis Gkoufas <johngou...@gmail.com> wrote: > Hi there, > > I have been using Spark 1.5.2 on my cluster without a problem and wanted > to try Spark 1.6.0. > I have the exact same configuration on both clusters. > I am able to start the Standalone Cluster but I fail to submit a job > getting errors like the following: > > 16/01/05 14:24:14 INFO AppClient$ClientEndpoint: Connecting to master > spark://my-ip:7077... > 16/01/05 14:24:34 INFO AppClient$ClientEndpoint: Connecting to master > spark://my-ip:7077... > 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master > spark://my-ip:7077... > 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master > spark://my-ip:7077... > 16/01/05 14:24:54 WARN TransportChannelHandler: Exception in connection > from my-ip/X.XXX.XX.XX:7077 > java.lang.NoSuchMethodError: > java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView; > at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:106) > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:586) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577) > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:745) > > Has anyone else had similar problems? > > Thanks a lot > >
Re: Spark data frame
More specifically, you could have TBs of data across thousands of partitions for a single RDD. If you call collect(), BOOM! Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Tue, Dec 22, 2015 at 4:20 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Michael, > > collect will bring down the results to the driver JVM, whereas the RDD or > DataFrame would be cached on the executors (if it is cached). So, as Dean > said, the driver JVM needs to have enough memory to store the results of > collect. > > Thanks, > Silvio > > From: Michael Segel <msegel_had...@hotmail.com> > Date: Tuesday, December 22, 2015 at 4:26 PM > To: Dean Wampler <deanwamp...@gmail.com> > Cc: Gaurav Agarwal <gaurav130...@gmail.com>, "user@spark.apache.org" < > user@spark.apache.org> > Subject: Re: Spark data frame > > Dean, > > RDD in memory and then the collect() resulting in a collection, where both > are alive at the same time. > (Again not sure how Tungsten plays in to this… ) > > So his collection can’t be larger than 1/2 of the memory allocated to the > heap. > > (Unless you have allocated swap…, right?) > > On Dec 22, 2015, at 12:11 PM, Dean Wampler <deanwamp...@gmail.com> wrote: > > You can call the collect() method to return a collection, but be careful. > If your data is too big to fit in the driver's memory, it will crash. > > Dean Wampler, Ph.D. > Author: Programming Scala, 2nd Edition > <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) > Typesafe <http://typesafe.com/> > @deanwampler <http://twitter.com/deanwampler> > http://polyglotprogramming.com > > On Tue, Dec 22, 2015 at 1:09 PM, Gaurav Agarwal <gaurav130...@gmail.com> > wrote: > >> We are able to retrieve data frame by filtering the rdd object . I need >> to convert that data frame into java pojo. Any idea how to do that >> > > >
Re: Spark data frame
You can call the collect() method to return a collection, but be careful. If your data is too big to fit in the driver's memory, it will crash. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Tue, Dec 22, 2015 at 1:09 PM, Gaurav Agarwal <gaurav130...@gmail.com> wrote: > We are able to retrieve data frame by filtering the rdd object . I need to > convert that data frame into java pojo. Any idea how to do that >
Re:
If you mean retaining data from past jobs, try running the history server, documented here: http://spark.apache.org/docs/latest/monitoring.html Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Thu, Nov 19, 2015 at 3:10 AM, 金国栋 <scu...@gmail.com> wrote: > I don't really know what you mean by saying historical data, but if you > have configured logs, so you can always get historical data of jobs, > stages, tasks etc, unless you delete them. > > Best, > Jelly > > 2015-11-19 16:53 GMT+08:00 aman solanki <youthindia.a...@gmail.com>: > >> Hi All, >> >> I want to know how one can get historical data of jobs,stages,tasks etc >> of a running spark application. >> >> Please share the information regarding the same. >> >> Thanks, >> Aman Solanki >> > >
Re: getting different results from same line of code repeated
Methods like first() and take(n) can't guarantee to return the same result in a distributed context, because Spark uses an algorithm to grab data from one or more partitions that involves running a distributed job over the cluster, with tasks on the nodes where the chosen partitions are located. You can look at the logic in the Spark code base, RDD.scala (first method calls the take method) and SparkContext.scala (runJob method, which take calls). However, the exceptions definitely look like bugs to me. There must be some empty partitions. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Wed, Nov 18, 2015 at 4:52 PM, Walrus theCat <walrusthe...@gmail.com> wrote: > Hi, > > I'm launching a Spark cluster with the spark-ec2 script and playing around > in spark-shell. I'm running the same line of code over and over again, and > getting different results, and sometimes exceptions. Towards the end, > after I cache the first RDD, it gives me the correct result multiple times > in a row before throwing an exception. How can I get correct behavior out > of these operations on these RDDs? > > scala> val targets = > data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[116] > at sortBy at :36 > > scala> targets.first > res26: (String, Int) = (\bguns?\b,1253) > > scala> val targets = data map {_.REGEX} groupBy{identity} map { > Function.tupled(_->_.size)} sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[125] > at sortBy at :36 > > scala> targets.first > res27: (String, Int) = (nika,7) > > > scala> val targets = > data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[134] > at sortBy at :36 > > scala> targets.first > res28: (String, Int) = (\bcalientes?\b,6) > > scala> targets.sortBy(_._2,false).first > java.lang.UnsupportedOperationException: empty collection > > scala> val targets = > data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[283] > at sortBy at :36 > > scala> targets.first > res46: (String, Int) = (\bhurting\ yous?\b,8) > > scala> val targets = > data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[292] > at sortBy at :36 > > scala> targets.first > java.lang.UnsupportedOperationException: empty collection > > scala> val targets = > data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[301] > at sortBy at :36 > > scala> targets.first > res48: (String, Int) = (\bguns?\b,1253) > > scala> val targets = > data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[310] > at sortBy at :36 > > scala> targets.first > res49: (String, Int) = (\bguns?\b,1253) > > scala> val targets = > data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[319] > at sortBy at :36 > > scala> targets.first > res50: (String, Int) = (\bguns?\b,1253) > > scala> val targets = > data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) > targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[328] > at sortBy at :36 > > scala> targets.first > java.lang.UnsupportedOperationException: empty collection > > > > >
Re: How to create nested structure from RDD
Crap. Hit send accidentally... In pseudocode, assuming comma-separated input data: scala> case class Address(street: String, city: String) scala> case class User (name: String, address: Address) scala> val df = sc.textFile("/path/to/stuff"). map { line => val array = line.split(",") // assume: "name,street,city" User(array(0), Address(array(1), array(2))) }.toDF() scala> df.printSchema root |-- name: string (nullable = true) |-- address: struct (nullable = true) ||-- street: string (nullable = true) | |-- city: string (nullable = true) Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Tue, Nov 17, 2015 at 11:16 AM, Dean Wampler <deanwamp...@gmail.com> wrote: > One way to do it, in the Scala API, you would use a tuple or case class > with nested tuples or case classes and/or primitives. It works fine if you > convert to a DataFrame, too; you can reference nested elements using dot > notation. I think in Python it would similarly. > > In pseudocode, assuming comma-separated input data: > > case class Address(street: String, city: String) > case class User (name: String, address: Address) > > sc.textFile("/path/to/stuff"). > map { line => > line.split(0) > dean > > Dean Wampler, Ph.D. > Author: Programming Scala, 2nd Edition > <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) > Typesafe <http://typesafe.com> > @deanwampler <http://twitter.com/deanwampler> > http://polyglotprogramming.com > > On Tue, Nov 17, 2015 at 11:06 AM, fhussonnois <fhussonn...@gmail.com> > wrote: > >> Hi, >> >> I need to convert an rdd of RDD[User] to a DataFrame containing a single >> column named "user". The column "user" should be a nested struct with all >> User properties. >> >> How can I implement this efficiently ? >> >> Thank you in advance >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-nested-structure-from-RDD-tp25401.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: How to create nested structure from RDD
One way to do it, in the Scala API, you would use a tuple or case class with nested tuples or case classes and/or primitives. It works fine if you convert to a DataFrame, too; you can reference nested elements using dot notation. I think in Python it would similarly. In pseudocode, assuming comma-separated input data: case class Address(street: String, city: String) case class User (name: String, address: Address) sc.textFile("/path/to/stuff"). map { line => line.split(0) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Tue, Nov 17, 2015 at 11:06 AM, fhussonnois <fhussonn...@gmail.com> wrote: > Hi, > > I need to convert an rdd of RDD[User] to a DataFrame containing a single > column named "user". The column "user" should be a nested struct with all > User properties. > > How can I implement this efficiently ? > > Thank you in advance > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-nested-structure-from-RDD-tp25401.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: dynamic allocation w/ spark streaming on mesos?
Dynamic allocation doesn't work yet with Spark Streaming in any cluster scenario. There was a previous thread on this topic which discusses the issues that need to be resolved. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Wed, Nov 11, 2015 at 8:09 AM, PhuDuc Nguyen <duc.was.h...@gmail.com> wrote: > I'm trying to get Spark Streaming to scale up/down its number of executors > within Mesos based on workload. It's not scaling down. I'm using Spark > 1.5.1 reading from Kafka using the direct (receiver-less) approach. > > Based on this ticket https://issues.apache.org/jira/browse/SPARK-6287 > with the right configuration, I have a simple example working with the > spark-shell connected to a Mesos cluster. By working I mean the number of > executors scales up/down based on workload. However, the spark-shell is not > a streaming example. > > What is that status of dynamic resource allocation with Spark Streaming on > Mesos? Is it supported at all? Or supported but with some caveats to ensure > no data loss? > > thanks, > Duc >
Re: Spark Streaming: how to use StreamingContext.queueStream with existing RDD
Check out StreamingContext.queueStream ( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext ) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Mon, Oct 26, 2015 at 11:16 AM, Anfernee Xu <anfernee...@gmail.com> wrote: > Hi, > > Here's my situation, I have some kind of offline dataset and got them > loaded them into Spark as RDD, but I want to form a virtual data stream > feeding to Spark Streaming, my code looks like this > > >// sort offline data by time, the dataset spans 2 hours > 1) JavaRDD sortedByTime = offlineDataRDD.sortBy( ); > >// compute a list of JavaRDD, each element JavaRDD is hosting the data > in the same time >// bucket, for example 5 minutes > 2) List virtualStreamRdd = ? > > Queue<JavaRDD> queue = Queues.newLinkedBlockingQueue(); > queue.addAll(virtualStreamRdd); > > /* > * Create DStream from the queue > */ > > 3) final JavaDStream rowDStream = > streamingContext.queueStream(queue); > > > Currently I'm stucking in 2), any suggestion is appreciated. > > Thanks > > -- > --Anfernee >
Re: Spark 1.5.1 ClassNotFoundException in cluster mode.
There is a Datastax Spark connector library jar file that you probably have on your CLASSPATH locally, but not on the cluster. If you know where it is, you could either install it on each node in some location on their CLASSPATHs or when you submit the mob, pass the jar file using the "--jars" option. Note that the latter may not be an ideal solution if it has other dependencies that also need to be passed. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Wed, Oct 14, 2015 at 5:05 PM, Renato Perini <renato.per...@gmail.com> wrote: > Hello. > I have developed a Spark job using a jersey client (1.9 included with > Spark) to make some service calls during data computations. > Data is read and written on an Apache Cassandra 2.2.1 database. > When I run the job in local mode, everything works nicely. But when I > execute my job in cluster mode (spark standalone) I receive the following > exception: > I have no clue on where this exception occurs. Any idea / advice on what > can I check? > > [Stage 38:==> (8 + 2) > / 200]15/10/14 15:54:07 WARN ThrowableSerializationWrapper: Task exception > could > java.lang.ClassNotFoundException: > com.datastax.spark.connector.types.TypeConversionException > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:278) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105) > at > java.util.concurrent.Th
Re: Scala Limitation - Case Class definition with more than 22 arguments
While case classes no longer have the 22-element limitation as of Scala 2.11, tuples are still limited to 22 elements. For various technical reasons, this limitation probably won't be removed any time soon. However, you can nest tuples, like case classes, in most contexts. So, the last bit of your example, (r: ResultSet) => (r.getInt("col1"),r.getInt("col2")...r.getInt("col37") ) could add nested () to group elements and keep the outer number of elements <= 22. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Thu, Sep 24, 2015 at 6:01 AM, satish chandra j <jsatishchan...@gmail.com> wrote: > HI All, > > In addition to Case Class limitation in Scala, I finding Tuple limitation > too please find the explanation below > > //Query to pull data from Source Table > > var SQL_RDD= new JdbcRDD( sc, ()=> > DriverManager.getConnection(url,user,pass),"select col1, col2, > col3..col 37 from schema.Table LIMIT ? OFFSET ?",100,0,*1*,(r: > ResultSet) => (r.getInt("col1"),r.getInt("col2")...r.getInt("col37"))) > > > //Define Case Class > > case class sqlrow(col1:Int,col2:Int..col37) > > > var SchSQL= SQL_RDD.map(p => new sqlrow(p._1,p._2.p._37)) > > > followed by apply CreateSchema to RDD and than apply registerTempTable for > defining a table to make use in SQL Context in Spark > > As per the above SQL query I need to fetch 37 columns from the source > table, but it seems Scala has tuple restriction which I am defining by r > ResultSet variable in the above SQL, please let me know if any work around > for the same > > Regards, > Satish Chandra > > On Thu, Sep 24, 2015 at 3:18 PM, satish chandra j < > jsatishchan...@gmail.com> wrote: > >> HI All, >> As it is for SQL purpose I understand, need to go ahead with Custom Case >> Class approach >> Could anybody have a sample code for creating Custom Case Class to refer >> which would be really helpful >> >> Regards, >> Satish Chandra >> >> On Thu, Sep 24, 2015 at 2:51 PM, Adrian Tanase <atan...@adobe.com> wrote: >> >>> +1 on grouping the case classes and creating a hierarchy – as long as >>> you use the data programatically. For DataFrames / SQL the other ideas >>> probably scale better… >>> >>> From: Ted Yu >>> Date: Wednesday, September 23, 2015 at 7:07 AM >>> To: satish chandra j >>> Cc: user >>> Subject: Re: Scala Limitation - Case Class definition with more than 22 >>> arguments >>> >>> Can you switch to 2.11 ? >>> >>> The following has been fixed in 2.11: >>> https://issues.scala-lang.org/browse/SI-7296 >>> >>> Otherwise consider packaging related values into a case class of their >>> own. >>> >>> On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j < >>> jsatishchan...@gmail.com> wrote: >>> >>>> HI All, >>>> Do we have any alternative solutions in Scala to avoid limitation in >>>> defining a Case Class having more than 22 arguments >>>> >>>> We are using Scala version 2.10.2, currently I need to define a case >>>> class with 37 arguments but getting an error as "*error:Implementation >>>> restriction:caseclasses cannot have more than 22parameters.*" >>>> >>>> It would be a great help if any inputs on the same >>>> >>>> Regards, >>>> Satish Chandra >>>> >>>> >>>> >>> >> >
Re: Parquet partitioning performance issue
One general technique is perform a second pass later over the files, for example the next day or once a week, to concatenate smaller files into larger ones. This can be done for all file types and allows you make recent data available to analysis tools, while avoiding a large build up of small files overall. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Sun, Sep 13, 2015 at 12:54 PM, sonal sharma <sonalsharma1...@gmail.com> wrote: > Hi Team, > > We have scheduled jobs that read new records from MySQL database every > hour and write (append) them to parquet. For each append operation, spark > creates 10 new partitions in parquet file. > > Some of these partitions are fairly small in size (20-40 KB) leading to > high number of smaller partitions and affecting the overall read > performance. > > Is there any way in which we can configure spark to merge smaller > partitions into a bigger one to avoid too many partitions? Or can we define > a configuration in Parquet to set a minimum partition size, say 64 MB? > > Coalesce/repartition will not work for us as we have highly variable > activity on the database during peak and non-peak hours. > > Regards, > Sonal >
Re: Realtime Data Visualization Tool for Spark
Here's a demonstration video from @noootsab himself (creator of Spark Notebook) showing live charting in Spark Notebook. It's one reason I prefer it over the other options. https://twitter.com/noootsab/status/638489244160401408 Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Fri, Sep 11, 2015 at 12:55 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > So if you want to build your own from the ground up, then yes you could go > the d3js route. Like Feynman also responded you could use something like > Spark Notebook or Zeppelin to create some charts as well. It really depends > on your intended audience and ultimate goal. If you just want some counters > and graphs without any interactivity it shouldn't be too difficult. > > Another option, if you’re willing to use a hosted service, would be > something like MS Power BI. I’ve used this to publish data and have > realtime dashboards and reports fed by Spark. > > From: Shashi Vishwakarma > Date: Friday, September 11, 2015 at 11:56 AM > To: "user@spark.apache.org" > Subject: Realtime Data Visualization Tool for Spark > > Hi > > I have got streaming data which needs to be processed and send for > visualization. I am planning to use spark streaming for this but little > bit confused in choosing visualization tool. I read somewhere that D3.js > can be used but i wanted know which is best tool for visualization while > dealing with streaming application.(something that can be easily integrated) > > If someone has any link which can tell about D3.js(or any other > visualization tool) and Spark streaming application integration then > please share . That would be great help. > > > Thanks and Regards > Shashi > >
Re: [Spark on Amazon EMR] : File does not exist: hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
If you log into the cluster, do you see the file if you type: hdfs dfs -ls hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar (with the correct server address for "ipx-x-x-x"). If not, is the server address correct and routable inside the cluster. Recall that EC2 instances have both public and private host names & IP addresses. Also, is the port number correct for HDFS in the cluster? dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Wed, Sep 9, 2015 at 9:28 AM, shahab <shahab.mok...@gmail.com> wrote: > Hi, > I am using Spark on Amazon EMR. So far I have not succeeded to submit the > application successfully, not sure what's problem. In the log file I see > the followings. > java.io.FileNotFoundException: File does not exist: > hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar > > However, even putting spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar in the > fat jar file didn't solve the problem. I am out of clue now. > I want to submit a spark application, using aws web console, as a step. I > submit the application as : spark-submit --deploy-mode cluster --class > mypack.MyMainClass --master yarn-cluster s3://mybucket/MySparkApp.jar Is > there any one who has similar problem with EMR? > > best, > /Shahab >
Re: Java vs. Scala for Spark
It's true that Java 8 lambdas help. If you've read Learning Spark, where they use Java 7, Python, and Scala for the examples, it really shows how awful Java without lambdas is for Spark development. Still, there are several "power tools" in Scala I would sorely miss using Java 8: 1. The REPL (interpreter): I do most of my work in the REPL, then move the code to compiled code when I'm ready to turn it into a batch job. Even better, use Spark Notebook <http://spark-notebook.io/>! (and on GitHub <https://github.com/andypetrella/spark-notebook>). 2. Tuples: It's just too convenient to use tuples for schemas, return values from functions, etc., etc., etc., 3. Pattern matching: This has no analog in Java, so it's hard to appreciate it until you understand it, but see this example <https://github.com/deanwampler/spark-workshop/blob/master/src/main/scala/sparkworkshop/InvertedIndex5b.scala> for a taste of how concise it makes code! 4. Type inference: Spark really shows its utility. It means a lot less code to write, but you get the hints of what you just wrote! My $0.02. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Tue, Sep 8, 2015 at 10:28 AM, Igor Berman <igor.ber...@gmail.com> wrote: > we are using java7..its much more verbose that java8 or scala examples > in addition there sometimes libraries that has no java api, so you need > to write them by yourself(e.g. graphx) > on the other hand, scala is not trivial language like java, so it depends > on your team > > On 8 September 2015 at 17:44, Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: > >> Thank you for the quick responses. It's useful to have some insight from >> folks already extensively using Spark. >> >> Regards, >> >> Bryan Jeffrey >> >> On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen <so...@cloudera.com> wrote: >> >>> Why would Scala vs Java performance be different Ted? Relatively >>> speaking there is almost no runtime difference; it's the same APIs or >>> calls via a thin wrapper. Scala/Java vs Python is a different story. >>> >>> Java libraries can be used in Scala. Vice-versa too, though calling >>> Scala-generated classes can be clunky in Java. What's your concern >>> about interoperability Jeffrey? >>> >>> I disagree that Java 7 vs Scala usability is sooo different, but it's >>> certainly much more natural to use Spark in Scala. Java 8 closes a lot >>> of the usability gap with Scala, but not all of it. Enough that it's >>> not crazy for a Java shop to stick to Java 8 + Spark and not be at a >>> big disadvantage. >>> >>> The downsides of Scala IMHO are that it provides too much: lots of >>> nice features (closures! superb collections!), lots of rope to hang >>> yourself too (implicits sometimes!) and some WTF features (XML >>> literals!) Learning the good useful bits of Scala isn't hard. You can >>> always write Scala code as much like Java as you like, I find. >>> >>> Scala tooling is different from Java tooling; that's an >>> underappreciated barrier. For example I think SBT is good for >>> development, bad for general project lifecycle management compared to >>> Maven, but in any event still less developed. SBT/scalac are huge >>> resource hogs, since so much of Scala is really implemented in the >>> compiler; prepare to update your laptop to develop in Scala on your >>> IDE of choice, and start to think about running long-running compile >>> servers like we did in the year 2000. >>> >>> Still net-net I would choose Scala, FWIW. >>> >>> On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> > Performance wise, Scala is by far the best choice when you use Spark. >>> > >>> > The cost of learning Scala is not negligible but not insurmountable >>> either. >>> > >>> > My personal opinion. >>> > >>> > On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey <bryan.jeff...@gmail.com >>> > >>> > wrote: >>> >> >>> >> All, >>> >> >>> >> We're looking at language choice in developing a simple streaming >>> >> processing application in spark. We've got a small set of example >>> code >>> >> built in Scala. Articles like the following: >>> >> >>> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html >>> >> would seem to indicate that Scala is great for use in distributed >>> >> programming (including Spark). However, there is a large group of >>> folks >>> >> that seem to feel that interoperability with other Java libraries is >>> much to >>> >> be desired, and that the cost of learning (yet another) language is >>> quite >>> >> high. >>> >> >>> >> Has anyone looked at Scala for Spark dev in an enterprise environment? >>> >> What was the outcome? >>> >> >>> >> Regards, >>> >> >>> >> Bryan Jeffrey >>> > >>> > >>> >> >> >
Re: Run scala code with spark submit
I haven't tried it, but scala-shell should work if you give it a scala script file, since it's basically a wrapper around the Scala REPL. dean On Thursday, August 20, 2015, MasterSergius master.serg...@gmail.com wrote: Is there any possibility to run standalone scala program via spark submit? Or I have always put it in some packages, build it with maven (or sbt)? What if I have just simple program, like that example word counter? Could anyone please, show it on this simple test file Greeting.scala: It comiles with scalac, runs with scala. Now I want to run in with spark (I can get these files via wget, for example) So, how I can run via spark-submit one-filer scala program? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Run-scala-code-with-spark-submit-tp24367.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:; -- Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
Re: Parquet without hadoop: Possible?
It should work fine. I have an example script here: https://github.com/deanwampler/spark-workshop/blob/master/src/main/scala/sparkworkshop/SparkSQLParquet10-script.scala (Spark 1.4.X) What does I am failing to do so mean? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Aug 11, 2015 at 9:28 AM, saif.a.ell...@wellsfargo.com wrote: Hi all, I don’t have any hadoop fs installed on my environment, but I would like to store dataframes in parquet files. I am failing to do so, if possible, anyone have any pointers? Thank you, Saif
Re: EC2 cluster doesn't work saveAsTextFile
Following Hadoop conventions, Spark won't overwrite an existing directory. You need to provide a unique output path every time you run the program, or delete or rename the target directory before you run the job. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit at local there is no problem , but i run at cluster, saveAsTextFile doesn't work.*It says me User class threw exception: Output directory hdfs://172.31.42.10:54310/./weblogReadResult http://172.31.42.10:54310/./weblogReadResult already exists* Is there anyone can help me about this issue ? Best, yasemin -- hiç ender hiç
Re: question about spark streaming
Have a look at the various versions of PairDStreamFunctions.updateStateByWindow ( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions). It supports updating running state in memory. (You can persist the state to a database/files periodically if you want). Use an in-memory data structure like a hash map with SKU-price key-values. Update the map as needed on each iteration. One of the versions of this function lets you specify a partitioner if you still need to shard keys. Also, I would be flexible about the 1 second batch interval. Is that really a mandatory requirement for this problem? HTH, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 5:24 AM, sequoiadb mailing-list-r...@sequoiadb.com wrote: hi guys, i have a question about spark streaming. There’s an application keep sending transaction records into spark stream with about 50k tps The record represents a sales information including customer id / product id / time / price columns The application is required to monitor the change of price for each product. For example, if the price of a product increases 10% within 3 minutes, it will send an alert to end user. The interval is required to be set every 1 second, window is somewhere between 180 to 300 seconds. The issue is that I have to compare the price of each transaction ( totally about 10k different products ) against the lowest/highest price for the same product in the all past 180 seconds. That means, in every single second, I have to loop through 50k transactions and compare the price of the same product in all 180 seconds. So it seems I have to separate the calculation based on product id, so that each worker only processes a certain list of products. For example, if I can make sure the same product id always go to the same worker agent, it doesn’t need to shuffle data between worker agent for each comparison. Otherwise if it required to compare each transaction with all other RDDs that cross multiple worker agent, I guess it may not be fast enough for the requirement. Is there anyone knows how to specify the worker node for each transaction record based on its product id, in order to avoid massive shuffle operation? If simply making the product id as the key and price as the value, reduceByKeyAndWindow may cause massive shuffle and slow down the whole throughput. Am I correct? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Cassandra Connector issue
Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set( spark.cassandra.connection.host, 10.246.43.15) .setAppName(First Spark App) .setMaster(local) * .s*etJars(Array(myJar)) .set(cassandra.username, username) .set(cassandra.password, password) .forDse *new* SparkContext(conf) } *val* sc = createSparkContext() *val* user=hkonak0 *val** pass=*Winter18 Class.forName(org.postgresql.Driver).newInstance *val* url = jdbc:postgresql://gptester:5432/db_test *val* myRDD27 = *new* JdbcRDD( sc, ()= DriverManager.getConnection(url,user,pass),select * from wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?,5,0,1,(r: ResultSet) = {(r.getInt(alarm_type_code),r.getString(language_code),r.getString( alrm_type_cd_desc))}) myRDD27.saveToCassandra(keyspace,arm_typ_txt,SomeColumns( alarm_type_code,language_code,alrm_type_cd_desc)) println(myRDD27.count()) println(myRDD27.first) sc.stop() sys.exit() } } *POM XML:* dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.2/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactId*hadoop*-client/artifactId version1.2.1/version /dependency dependency groupIdorg.scala-*lang*/groupId artifactId*scala*-library/artifactId version2.10.5/version /dependency dependency groupId*junit*/groupId artifactId*junit*/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdcom.datastax.dse/groupId artifactId*dse*/artifactId version4.7.2/version scopesystem/scope systemPathC:\workspace\*etl*\*lib*\dse.jar/ systemPath /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-*cassandra*-connector-java_2.10/ artifactId version1.1.1/version /dependency /dependencies Please let me know if any further details required to analyze the issue Regards, Satish Chandra
Re: Spark Streaming Restart at scheduled intervals
org.apache.spark.streaming.twitter.TwitterInputDStream is a small class. You could write your own that lets you change the filters at run time. Then provide a mechanism in your app, like periodic polling of a database table or file for the list of filters. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 3:56 AM, Pankaj Narang pankajnaran...@gmail.com wrote: Hi All, I am creating spark twitter streaming connection in my app over long period of time. When I have some new keywords I need to add them to the spark streaming connection. I need to stop and start the current twitter streaming connection in this case. I have tried akka actor scheduling but could not achieve the same. Have anybody have idea how to do that ? Regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Restart-at-scheduled-intervals-tp24192.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: EC2 cluster doesn't work saveAsTextFile
So, just before running the job, if you run the HDFS command at a shell prompt: hdfs dfs -ls hdfs://172.31.42.10:54310/./weblogReadResult. Does it say the path doesn't exist? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:58 AM, Yasemin Kaya godo...@gmail.com wrote: Thanx Dean, i am giving unique output path and in every time i also delete the directory before i run the job. 2015-08-10 15:30 GMT+03:00 Dean Wampler deanwamp...@gmail.com: Following Hadoop conventions, Spark won't overwrite an existing directory. You need to provide a unique output path every time you run the program, or delete or rename the target directory before you run the job. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit at local there is no problem , but i run at cluster, saveAsTextFile doesn't work.*It says me User class threw exception: Output directory hdfs://172.31.42.10:54310/./weblogReadResult http://172.31.42.10:54310/./weblogReadResult already exists* Is there anyone can help me about this issue ? Best, yasemin -- hiç ender hiç -- hiç ender hiç
Re: Spark Cassandra Connector issue
I don't know if DSE changed spark-submit, but you have to use a comma-separated list of jars to --jars. It probably looked for HelloWorld in the second one, the dse.jar file. Do this: dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/ postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark- cassandra-connector-java_2.10-1.1.1.jar /home/missingmerch/etl-0.0. 1-SNAPSHOT.jar I also removed the extra //. Or put file: in front of them so they are proper URLs. Note the snapshot jar isn't in the --jars list. I assume that's where HelloWorld is found. Confusing, yes it is... dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 8:23 AM, satish chandra j jsatishchan...@gmail.com wrote: Hi, Thanks for quick input, now I am getting class not found error *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* java.lang.ClassNotFoundException: HelloWorld at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Previously I could fix the issue by changing the order of arguments passing in DSE command line interface but now I am not sure why the issue again Please let me know if still I am missing anything in my Command as mentioned above(as insisted I have added dse.jar and spark-cassandra-connector-java_2.10.1.1.1.jar) Thanks for support Satish Chandra On Mon, Aug 10, 2015 at 6:19 PM, Dean Wampler deanwamp...@gmail.com wrote: Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set
Re: Spark-submit fails when jar is in HDFS
Also, Spark on Mesos supports cluster mode: http://spark.apache.org/docs/latest/running-on-mesos.html#cluster-mode Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, Aug 9, 2015 at 4:30 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try this way? /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class org.apache.spark.examples.SparkPi *--jars hdfs://hdfs1/tmp/spark-* *examples-1.4.1-hadoop2.6.0-**cdh5.4.4.jar* 100 Thanks Best Regards On Fri, Aug 7, 2015 at 5:51 AM, Alan Braithwaite a...@cloudflare.com wrote: Hi All, We're trying to run spark with mesos and docker in client mode (since mesos doesn't support cluster mode) and load the application Jar from HDFS. The following is the command we're running: /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class org.apache.spark.examples.SparkPi hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar 100 We're getting the following warning before an exception from that command: Warning: Skip remote jar hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar. java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi Before I debug further, is this even supported? I started reading the code and it wasn't clear that it's possible to load a remote jar in client mode at all. I did see a related issue in [2] but it didn't quite clarify everything I was looking for. Thanks, - Alan [1] https://spark.apache.org/docs/latest/submitting-applications.html [2] http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html
Re: spark config
That's the correct URL. Recent change? The last time I looked, earlier this week, it still had the obsolete artifactory URL for URL1 ;) Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Aug 7, 2015 at 5:19 PM, Ted Yu yuzhih...@gmail.com wrote: In master branch, build/sbt-launch-lib.bash has the following: URL1= https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar I verified that the following exists: https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/#sbt-launch.jar FYI On Fri, Aug 7, 2015 at 2:08 PM, Bryce Lobdell lobde...@gmail.com wrote: I Recently downloaded spark package 1.4.0: A build of Spark with sbt/sbt clean assembly failed with message Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar Upon investigation I figured out that sbt-launch-0.13.7.jar is downloaded at build time and that it contained the the following: html headtitle404 Not Found/title/head body bgcolor=white centerh1404 Not Found/h1/center hrcenternginx/center /body /html which is an HTML error message to the effect that the file is missing (from the web server). The script sbt-launch-lib.bash contains the following lines which determine where the file sbt-launch.jar is downloaded from: acquire_sbt_jar () { SBT_VERSION=`awk -F = '/sbt\.version/ {print $2}' ./project/build.properties` URL1= http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar URL2= http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar JAR=build/sbt-launch-${SBT_VERSION}.jar The script sbt-launch.bash downloads $URL1 first, and incorrectly concludes that it succeeded on the basis that the file sbt-launch-0.13.7.jar exists (though it contains HTML). I succeeded in building Spark by: (1) Downloading the file sbt-launch-0.13.7.jar from $URL2 and placing it in the build directory. (2) Modifying sbt-launch-lib.bash to prevent the download of that file. (3) Restarting the download as I usually would, with SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly I think a lot of people will be confused by this. Probably someone should do some of the following: (1) Delete $URL1 and all references, or replace it with the correct/current URL which points to the sbt-launch.jar(s). (2) Modify sbt-launch-lib.bash, so that it will not conclude that the download of sbt-launch.jar succeeded, when the data returned is an HTML error message. Let me know if this is not clear, I will gladly explain in more detail or with more clarity, if needed. -Bryce Lobdell A transcript of my console is below: @ip-xx-xxx-xx-xxx:~/spark/spark-1.4.0$ SPARK_HIVE=true SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly NOTE: The sbt/sbt script has been relocated to build/sbt. Please update references to point to the new location. Invoking 'build/sbt clean assembly' now ... Using /usr/lib/jvm/java-7-openjdk-amd64/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar *Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar* inquidia@ip-10-102-69-107:~/spark/spark-1.4.0$ cd build/ inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash *inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ unzip -l sbt-launch-0.13.7.jar* *Archive: sbt-launch-0.13.7.jar* * End-of-central-directory signature not found. Either this file is not* * a zipfile, or it constitutes one disk of a multi-part archive. In the* * latter case the central directory and zipfile comment will be found on* * the last disk(s) of this archive.* unzip: cannot find zipfile directory in one of sbt-launch-0.13.7.jar or sbt-launch-0.13.7.jar.zip, and cannot find sbt-launch-0.13.7.jar.ZIP, period. inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls mvn sbt sbt-launch-0.13.7.jar sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395 Jun 3 01:07 sbt -rw-rw-r-- 1 inquidia inquidia 162 Aug 7 20:24 sbt-launch-0.13.7.jar -rwxr-xr-x 1 inquidia inquidia 5285 Jun 3 01:07 sbt-launch-lib.bash inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l total 28 -rwxr-xr-x 1 inquidia inquidia 5384 Jun 3 01:07 mvn -rwxr-xr-x 1 inquidia inquidia 5395 Jun 3 01:07 sbt -rw-rw-r-- 1 inquidia inquidia *162 *Aug 7 20:24 sbt-launch-0.13.7.jar -rwxr-xr-x 1 inquidia inquidia 5285 Jun 3 01:07 sbt-launch-lib.bash inquidia@ip-10-102-69-107
[POWERED BY] Please add Typesafe to the list of organizations
Typesafe (http://typesafe.com). We provide commercial support for Spark on Mesos and Mesosphere DCOS. We contribute to Spark's Mesos integration and Spark Streaming enhancements. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
Re: How to set log level in spark-submit ?
Did you use an absolute path in $path_to_file? I just tried this with spark-shell v1.4.1 and it worked for me. If the URL is wrong, you should see an error message from log4j that it can't find the file. For windows it would be something like file:/c:/path/to/file, I believe. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 30, 2015 at 4:41 AM, Alexander Krasheninnikov a.krasheninni...@corp.badoo.com wrote: I saw such example in docs: --conf spark.driver.extraJavaOptions=-Dlog4j.configuration= file://$path_to_file but, unfortunately, it does not work for me. On 30.07.2015 05:12, canan chen wrote: Yes, that should work. What I mean is is there any option in spark-submit command that I can specify for the log level On Thu, Jul 30, 2015 at 10:05 AM, Jonathan Coveney jcove...@gmail.com wrote: Put a log4j.properties file in conf/. You can copy log4j.properties.template as a good base El miércoles, 29 de julio de 2015, canan chen ccn...@gmail.com escribió: Anyone know how to set log level in spark-submit ? Thanks
Re: Spark - Eclipse IDE - Maven
If you don't mind using SBT with your Scala instead of Maven, you can see the example I created here: https://github.com/deanwampler/spark-workshop It can be loaded into Eclipse or IntelliJ Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 29, 2015 at 10:52 AM, Venkat Reddy ksiv...@gmail.com wrote: Thanks Petar, I will purchase. Thanks for the input. Thanks Siva On Tue, Jul 28, 2015 at 4:39 PM, Carol McDonald cmcdon...@maprtech.com wrote: I agree, I found this book very useful for getting started with spark and eclipse On Tue, Jul 28, 2015 at 11:10 AM, Petar Zecevic petar.zece...@gmail.com wrote: Sorry about self-promotion, but there's a really nice tutorial for setting up Eclipse for Spark in Spark in Action book: http://www.manning.com/bonaci/ On 24.7.2015. 7:26, Siva Reddy wrote: Hi All, I am trying to setup the Eclipse (LUNA) with Maven so that I create a maven projects for developing spark programs. I am having some issues and I am not sure what is the issue. Can Anyone share a nice step-step document to configure eclipse with maven for spark development. Thanks Siva -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Eclipse-IDE-Maven-tp23977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple operations on same DStream in Spark Streaming
Is this average supposed to be across all partitions? If so, it will require some one of the reduce operations in every batch interval. If that's too slow for the data rate, I would investigate using PairDStreamFunctions.updateStateByKey to compute the sum + count of the 2nd integers, per 1st integer, then do the filtering and final averaging downstream if you can, i.e., where you actually need the final value. If you need it on every batch iteration, then you'll have to do a reduce per iteration. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Jul 28, 2015 at 3:14 AM, Akhil Das ak...@sigmoidanalytics.com wrote: One approach would be to store the batch data in an intermediate storage (like HBase/MySQL or even in zookeeper), and inside your filter function you just go and read the previous value from this storage and do whatever operation that you are supposed to do. Thanks Best Regards On Sun, Jul 26, 2015 at 3:37 AM, foobar heath...@fb.com wrote: Hi I'm working with Spark Streaming using scala, and trying to figure out the following problem. In my DStream[(int, int)], each record is an int pair tuple. For each batch, I would like to filter out all records with first integer below average of first integer in this batch, and for all records with first integer above average of first integer in the batch, compute the average of second integers in such records. What's the best practice to implement this? I tried this but kept getting the object not serializable exception because it's hard to share variables (such as average of first int in the batch) between workers and driver. Any suggestions? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-operations-on-same-DStream-in-Spark-Streaming-tp23995.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Clustetr setup for SPARK standalone application:
When you say you installed Spark, did you install the master and slave services for standalone mode as described here http://spark.apache.org/docs/latest/spark-standalone.html? If you intended to run Spark on Hadoop, see here http://spark.apache.org/docs/latest/running-on-yarn.html. It looks like either the master service isn't running or isn't reachable over your network. Is hadoopm0 publicly routable? Is port 7077 blocked? As a test, can you telnet to it? telnet hadoopm0 7077 Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Jul 28, 2015 at 7:01 AM, Sagar sagarjad...@yonsei.ac.kr wrote: Hello Sir, I am MS student working on SPARK. I am totally new in this field. I have install the spark. The local spark-shell is working fine. But whenever I tried the Master configuration I got some errors. When I run this command ; MASTER=spark://hadoopm0:7077 spark-shell I gets the errors likes; 15/07/27 21:17:26 INFO AppClient$ClientActor: Connecting to master spark://hadoopm0:7077... 15/07/27 21:17:46 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 15/07/27 21:17:46 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet. 15/07/27 21:17:46 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. Also I have attached the my screenshot of Master UI. Can you please give me some references, documentations or how to solve this issue. Thanks in advance. Thanking You, -- [image: Avast logo] http://www.avast.com/ This email has been checked for viruses by Avast antivirus software. www.avast.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Comparison between Standalone mode and YARN mode
YARN and Mesos are better for production clusters of non-trivial size that have mixed job kinds and multiple users, as they manage resources more intelligently and dynamically. They also support other services you probably need, like HDFS, databases, workflow tools, etc. Standalone is fine, though, if you have a limited number of jobs competing for resources, for example a small cluster dedicated to ingesting or processing a specific kind of data, or for a dev/QA cluster. Standalone mode has much lower overhead, but you have to manage the daemon services yourself, including configuration of Zookeeper if you need master failover. Hence, you don't see it often in production scenarios. The Spark page on cluster deployments has more details: http://spark.apache.org/docs/latest/cluster-overview.html dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 22, 2015 at 6:56 PM, Dogtail Ray spark.ru...@gmail.com wrote: Hi, I am very curious about the differences between Standalone mode and YARN mode. According to http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/, it seems that YARN mode is always better than Standalone mode. Is that the case? Or I should choose different modes according to my specific requirements? Thanks!
Re: Programmatically launch several hundred Spark Streams in parallel
You don't need the par (parallel) versions of the Scala collections, actually, Recall that you are building a pipeline in the driver, but it doesn't start running cluster tasks until ssc.start() is called, at which point Spark will figure out the task parallelism. In fact, you might as well do the foreachRDD call within the initial map. No need for the streams collection, unless you need it for something else. Test it out to make sure I'm not wrong ;) However, I'm a little confused by the per-stream logic. It looks like you're using foreachRDD to dump each input stream into the same output location stream._1. True? If it's a directory, you'll get an error that it already exists for the *second* stream in streams. If you're just funneling all 500 inputs into the same output location, how about using DStream.union to combine all the input streams into one, then have one foreachRDD to write output? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Jul 24, 2015 at 11:23 AM, Brandon White bwwintheho...@gmail.com wrote: Hello, So I have about 500 Spark Streams and I want to know the fastest and most reliable way to process each of them. Right now, I am creating and process them in a list: val ssc = new StreamingContext(sc, Minutes(10)) val streams = paths.par.map { nameAndPath = (path._1, ssc.textFileStream(path._1)) } streams.par.foreach { nameAndStream = streamTuple.foreachRDD { rdd = df = sqlContext.jsonRDD(rdd) df.insertInto(stream._1) } } ssc.start() Is this the best way to do this? Are there any better faster methods?
Re: Mesos + Spark
When running Spark in Mesos cluster mode, the driver program runs in one of the cluster nodes, like the other Spark processes that are spawned. You won't need a special node for this purpose. I'm not very familiar with Chronos, but its UI or the regular Mesos UI should show you where the driver is running, then you can use the Spark web UI on that machine to see what the Spark job is doing. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Jul 24, 2015 at 4:47 PM, boci boci.b...@gmail.com wrote: Thanks, but something is not clear... I have the mesos cluster. - I want to submit my application and scheduled with chronos. - For cluster mode I need a dispatcher, this is another container (machine in the real world)? What will this do? It's needed when I using chronos? - How can I access to my spark job from chronos? I think submit in client mode is not fit to my condition, that's right? Thanks b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Wed, Jul 22, 2015 at 4:51 PM, Dean Wampler deanwamp...@gmail.com wrote: This page, http://spark.apache.org/docs/latest/running-on-mesos.html, covers many of these questions. If you submit a job with the option --supervise, it will be restarted if it fails. You can use Chronos for scheduling. You can create a single streaming job with a 10 minute batch interval, if that works for your every 10-min. need. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 22, 2015 at 3:53 AM, boci boci.b...@gmail.com wrote: Hi guys! I'm a new in mesos. I have two spark application (one streaming and one batch). I want to run both app in mesos cluster. Now for testing I want to run in docker container so I started a simple redjack/mesos-master, but I think a lot of think unclear for me (both mesos and spark-mesos). If I have a mesos cluster (for testing it will be some docker container) i need a separate machine (container) to run my spark job? Or can I submit the cluster and schedule (chronos or I dunno)? How can I run the streaming job? What happened if the controller died? Or if I call spark-submit with master=mesos my application started and I can forget? How can I run in every 10 min without submit in every 10 min? How can I run my streaming app in HA mode? Thanks b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com
Re: Mesos + Spark
You can certainly start jobs without Chronos, but to automatically restart finished jobs or to run jobs at specific times or periods, you'll want something like Chronos. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Jul 24, 2015 at 5:08 PM, boci boci.b...@gmail.com wrote: Thanks, Mesos will show spark is driver is running, but what happened if my batch job finished? How can I reschedule without chronos ? Can I submit a job without start it? Thanks b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jul 24, 2015 at 11:52 PM, Dean Wampler deanwamp...@gmail.com wrote: When running Spark in Mesos cluster mode, the driver program runs in one of the cluster nodes, like the other Spark processes that are spawned. You won't need a special node for this purpose. I'm not very familiar with Chronos, but its UI or the regular Mesos UI should show you where the driver is running, then you can use the Spark web UI on that machine to see what the Spark job is doing. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Jul 24, 2015 at 4:47 PM, boci boci.b...@gmail.com wrote: Thanks, but something is not clear... I have the mesos cluster. - I want to submit my application and scheduled with chronos. - For cluster mode I need a dispatcher, this is another container (machine in the real world)? What will this do? It's needed when I using chronos? - How can I access to my spark job from chronos? I think submit in client mode is not fit to my condition, that's right? Thanks b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Wed, Jul 22, 2015 at 4:51 PM, Dean Wampler deanwamp...@gmail.com wrote: This page, http://spark.apache.org/docs/latest/running-on-mesos.html, covers many of these questions. If you submit a job with the option --supervise, it will be restarted if it fails. You can use Chronos for scheduling. You can create a single streaming job with a 10 minute batch interval, if that works for your every 10-min. need. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 22, 2015 at 3:53 AM, boci boci.b...@gmail.com wrote: Hi guys! I'm a new in mesos. I have two spark application (one streaming and one batch). I want to run both app in mesos cluster. Now for testing I want to run in docker container so I started a simple redjack/mesos-master, but I think a lot of think unclear for me (both mesos and spark-mesos). If I have a mesos cluster (for testing it will be some docker container) i need a separate machine (container) to run my spark job? Or can I submit the cluster and schedule (chronos or I dunno)? How can I run the streaming job? What happened if the controller died? Or if I call spark-submit with master=mesos my application started and I can forget? How can I run in every 10 min without submit in every 10 min? How can I run my streaming app in HA mode? Thanks b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com
Re: Mesos + Spark
This page, http://spark.apache.org/docs/latest/running-on-mesos.html, covers many of these questions. If you submit a job with the option --supervise, it will be restarted if it fails. You can use Chronos for scheduling. You can create a single streaming job with a 10 minute batch interval, if that works for your every 10-min. need. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 22, 2015 at 3:53 AM, boci boci.b...@gmail.com wrote: Hi guys! I'm a new in mesos. I have two spark application (one streaming and one batch). I want to run both app in mesos cluster. Now for testing I want to run in docker container so I started a simple redjack/mesos-master, but I think a lot of think unclear for me (both mesos and spark-mesos). If I have a mesos cluster (for testing it will be some docker container) i need a separate machine (container) to run my spark job? Or can I submit the cluster and schedule (chronos or I dunno)? How can I run the streaming job? What happened if the controller died? Or if I call spark-submit with master=mesos my application started and I can forget? How can I run in every 10 min without submit in every 10 min? How can I run my streaming app in HA mode? Thanks b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com
Re: Spark-hive parquet schema evolution
While it's not recommended to overwrite files Hive thinks it understands, you can add the column to Hive's metastore using an ALTER TABLE command using HiveQL in the Hive shell or using HiveContext.sql(): ALTER TABLE mytable ADD COLUMNS col_name data_type See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column for full details. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 22, 2015 at 4:36 AM, Cheng Lian lian.cs@gmail.com wrote: Since Hive doesn’t support schema evolution, you’ll have to update the schema stored in metastore somehow. For example, you can create a new external table with the merged schema. Say you have a Hive table t1: CREATE TABLE t1 (c0 INT, c1 DOUBLE); By default, this table is stored in HDFS path hdfs://some-host:9000/user/hive/warehouse/t1. Now you append some Parquet data with an extra column c2 to the same directory: import org.apache.spark.sql.types._ val path = hdfs://some-host:9000/user/hive/warehouse/t1val df1 = sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id cast StringType as 'c2) df1.write.mode(append).parquet(path) Now you can create a new external table t2 like this: val df2 = sqlContext.read.option( mergeSchema, true).parquet(path) df2.write.path(path).saveAsTable(t2) Since we specified a path above, the newly created t2 is an external table pointing to the original HDFS location. But the schema of t2 is the merged version. The drawback of this approach is that, t2 is actually a Spark SQL specific data source table rather than a genuine Hive table. This means, it can be accessed by Spark SQL only. We’re just using Hive metastore to help persisting metadata of the data source table. However, since you’re asking how to access the new table via Spark SQL CLI, this should work for you. We are working on making Parquet and ORC data source tables accessible via Hive in Spark 1.5.0. Cheng On 7/22/15 10:32 AM, Jerrick Hoang wrote: Hi Lian, Sorry I'm new to Spark so I did not express myself very clearly. I'm concerned about the situation when let's say I have a Parquet table some partitions and I add a new column A to parquet schema and write some data with the new schema to a new partition in the table. If i'm not mistaken, if I do a sqlContext.read.parquet(table_path).printSchema() it will print the correct schema with new column A. But if I do a 'describe table' from SparkSQLCLI I won't see the new column being added. I understand that this is because Hive doesn't support schema evolution. So what is the best way to support CLI queries in this situation? Do I need to manually alter the table everytime the underlying schema changes? Thanks On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Jerrick, What do you mean by schema evolution with Hive metastore tables? Hive doesn't take schema evolution into account. Could you please give a concrete use case? Are you trying to write Parquet data with extra columns into an existing metastore Parquet table? Cheng On 7/21/15 1:04 AM, Jerrick Hoang wrote: I'm new to Spark, any ideas would be much appreciated! Thanks On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang jerrickho...@gmail.comjerrickho...@gmail.com wrote: Hi all, I'm aware of the support for schema evolution via DataFrame API. Just wondering what would be the best way to go about dealing with schema evolution with Hive metastore tables. So, say I create a table via SparkSQL CLI, how would I deal with Parquet schema evolution? Thanks, J
Re: Spark Streaming Checkpointing solutions
TD's Spark Summit talk offers suggestions ( https://spark-summit.org/2015/events/recipes-for-running-spark-streaming-applications-in-production/). He recommends using HDFS, because you get the triplicate resiliency it offers, albeit with extra overhead. I believe the driver doesn't need visibility to the checkpointing directory, e.g., if you're running in client mode, but all the cluster nodes would need to see it for recovering a lost stage, where it might get started on a different node. Hence, I would think NFS could work, if all nodes have the same mount, although there would be a lot of network overhead. In some situations, a high performance file system appliance, e.g., NAS, could suffice. My $0.02, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Jul 21, 2015 at 10:43 AM, Emmanuel fortin.emman...@gmail.com wrote: Hi, I'm working on a Spark Streaming application and I would like to know what is the best storage to use for checkpointing. For testing purposes we're are using NFS between the worker, the master and the driver program (in client mode), but we have some issues with the CheckpointWriter (1 thread dedicated). *My understanding is that NFS is not a good candidate for this usage.* 1. What is the best solution for checkpointing and what are the alternatives ? 2. Does checkpointings directories need to be shared by the driver application and the workers too ? Thanks for your replies -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-solutions-tp23932.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Silly question about building Spark 1.4.1
hadoop-2.6 is supported (look for profile XML in the pom.xml file). For Hive, add -Phive -Phive-thriftserver (See http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables) for more details. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Jul 20, 2015 at 2:55 PM, Michael Segel msegel_had...@hotmail.com wrote: Sorry, Should have sent this to user… However… it looks like the docs page may need some editing? Thx -Mike Begin forwarded message: *From: *Michael Segel msegel_had...@hotmail.com *Subject: **Silly question about building Spark 1.4.1* *Date: *July 20, 2015 at 12:26:40 PM MST *To: *d...@spark.apache.org Hi, I’m looking at the online docs for building spark 1.4.1 … http://spark.apache.org/docs/latest/building-spark.html I was interested in building spark for Scala 2.11 (latest scala) and also for Hive and JDBC support. The docs say: “ To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11 property: dev/change-version-to-2.11.sh mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package “ So… Is there a reason I shouldn’t build against hadoop-2.6 ? If I want to add the Thirft and Hive support, is it possible? Looking at the Scala build, it looks like hive support is being built? (Looking at the stdout messages…) Should the docs be updated? Am I missing something? (Dean W. can confirm, I am completely brain dead. ;-) Thx -Mike PS. Yes I can probably download a prebuilt image, but I’m a glutton for punishment. ;-)
Re: Breaking lineage and reducing stages in Spark Streaming
Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote: The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Re: Breaking lineage and reducing stages in Spark Streaming
I think you're complicating the cache behavior by aggressively re-using vars when temporary vals would be more straightforward. For example, newBase = newBase.unpersist()... effectively means that newBase's data is not actually cached when the subsequent .union(...) is performed, so it probably goes back to the lineage... Same with the current.unpersist logic before it. Names are cheap, so just use local vals: val newCurrent = rdd.union(current).reduceByKey(_+_) current.unpersist() Also, what happens if you omit the 2 argument for the number of partitions in reduceByKey? Other minor points: I would change the joined, toUpdate, toNotUpdate logic to this: val joined = current.leftOuterJoin(newBase).map(mymap).cache() val toUpdate = joined.filter(myfilter).cache() val toNotUpdate = joined.filter(mynotfilter).cache() Maybe it's just for this email example, but you don't need to call collect on toUpdate before using foreach(println). If the RDD is huge, you definitely don't want to do that. Hope this helps. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya anand.na...@gmail.com wrote: Yes, myRDD is outside of DStream. Following is the actual code where newBase and current are the rdds being updated with each batch: val base = sc.textFile... var newBase = base.cache() val dstream: DStream[String] = ssc.textFileStream... var current: RDD[(String, Long)] = sc.emptyRDD.cache() dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = { current = rdd.union(current.unpersist()).reduceByKey(_+_, 2) val joined = current.leftOuterJoin(newBase).cache() val toUpdate = joined.filter(myfilter).map(mymap).cache() val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache() toUpdate.collect().foreach(println) // this goes to some store newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_, 2).cache() current = toNotUpdate.cache() toUpdate.unpersist() joined.unpersist() rdd.unpersist() }) Regards, Anand On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote: Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote: The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Please add the Chicago Spark Users' Group to the community page
Here's our home page: http://www.meetup.com/Chicago-Spark-Users/ Thanks, Dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
Re: Spark 1.3.1 On Mesos Issues.
It would be nice to see the code for MapR FS Java API, but my google foo failed me (assuming it's open source)... So, shooting in the dark ;) there are a few things I would check, if you haven't already: 1. Could there be 1.2 versions of some Spark jars that get picked up at run time (but apparently not in local mode) on one or more nodes? (Side question: Does your node experiment fail on all nodes?) Put another way, are the classpaths good for all JVM tasks? 2. Can you use just MapR and Spark 1.3.1 successfully, bypassing Mesos? Incidentally, how are you combining Mesos and MapR? Are you running Spark in Mesos, but accessing data in MapR-FS? Perhaps the MapR shim library doesn't support Spark 1.3.1. HTH, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Jun 1, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote: All - I am facing and odd issue and I am not really sure where to go for support at this point. I am running MapR which complicates things as it relates to Mesos, however this HAS worked in the past with no issues so I am stumped here. So for starters, here is what I am trying to run. This is a simple show tables using the Hive Context: from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext, Row, HiveContext sparkhc = HiveContext(sc) test = sparkhc.sql(show tables) for r in test.collect(): print r When I run it on 1.3.1 using ./bin/pyspark --master local This works with no issues. When I run it using Mesos with all the settings configured (as they had worked in the past) I get lost tasks and when I zoom in them, the error that is being reported is below. Basically it's a NullPointerException on the com.mapr.fs.ShimLoader. What's weird to me is is I took each instance and compared both together, the class path, everything is exactly the same. Yet running in local mode works, and running in mesos fails. Also of note, when the task is scheduled to run on the same node as when I run locally, that fails too! (Baffling). Ok, for comparison, how I configured Mesos was to download the mapr4 package from spark.apache.org. Using the exact same configuration file (except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0. When I run this example with the mapr4 for 1.2.0 there is no issue in Mesos, everything runs as intended. Using the same package for 1.3.1 then it fails. (Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails as well). So basically When I used 1.2.0 and followed a set of steps, it worked on Mesos and 1.3.1 fails. Since this is a current version of Spark, MapR is supports 1.2.1 only. (Still working on that). I guess I am at a loss right now on why this would be happening, any pointers on where I could look or what I could tweak would be greatly appreciated. Additionally, if there is something I could specifically draw to the attention of MapR on this problem please let me know, I am perplexed on the change from 1.2.0 to 1.3.1. Thank you, John Full Error on 1.3.1 on Mesos: 15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity 1060.3 MB java.lang.NullPointerException at com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:96) at com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:232) at com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847) at org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141) at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98) at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:220) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1959) at org.apache.spark.storage.BlockManager.(BlockManager.scala:104) at org.apache.spark.storage.BlockManager.(BlockManager.scala:179) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:310) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:186) at org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:70
Re: Recommended Scala version
Most of the 2.11 issues are being resolved in Spark 1.4. For a while, the Spark project has published maven artifacts that are compiled with 2.11 and 2.10, although the downloads at http://spark.apache.org/downloads.html are still all for 2.10. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 26, 2015 at 10:33 AM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Yes, recommended version is 2.10 as all the features are not supported by 2.11 version. Kafka libraries and JDBC components are yet to be ported to 2.11 version. And so if your project doesn't depend on these components, you can give v2.11 a try. Here's a link https://spark.apache.org/docs/1.2.0/building-spark.html#building-for-scala-211 for building with 2.11 version. Though, you won't be running into any issues if you try v2.10 as of now. But then again, the future releases will have to shift to 2.11 version once support for v2.10 ends in the long run. On Tue, May 26, 2015 at 8:21 PM, Punyashloka Biswal punya.bis...@gmail.com wrote: Dear Spark developers and users, Am I correct in believing that the recommended version of Scala to use with Spark is currently 2.10? Is there any plan to switch to 2.11 in future? Are there any advantages to using 2.11 today? Regards, Punya
Re: Tasks randomly stall when running on mesos
Here is a link for builds of 1.4 RC2: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc2-bin/ For a mvn repo, I believe the RC2 artifacts are here: https://repository.apache.org/content/repositories/orgapachespark-1104/ A few experiments you might try: 1. Does spark-shell work? It might start fine, but make sure you can create an RDD and use it, e.g., something like: val rdd = sc.parallelize(Seq(1,2,3,4,5,6)) rdd foreach println 2. Try coarse grained mode, which has different logic for executor management. You can set it in $SPARK_HOME/conf/spark-defaults.conf file: spark.mesos.coarse true Or, from this page http://spark.apache.org/docs/latest/running-on-mesos.html, set the property in a SparkConf object used to construct the SparkContext: conf.set(spark.mesos.coarse, true) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, May 25, 2015 at 12:06 PM, Reinis Vicups sp...@orbit-x.de wrote: Hello, I assume I am running spark in a fine-grained mode since I haven't changed the default here. One question regarding 1.4.0-RC1 - is there a mvn snapshot repository I could use for my project config? (I know that I have to download source and make-distribution for executor as well) thanks reinis On 25.05.2015 17:07, Iulian Dragoș wrote: On Mon, May 25, 2015 at 2:43 PM, Reinis Vicups sp...@orbit-x.de wrote: Hello, I am using Spark 1.3.1-hadoop2.4 with Mesos 0.22.1 with zookeeper and running on a cluster with 3 nodes on 64bit ubuntu. My application is compiled with spark 1.3.1 (apparently with mesos 0.21.0 dependency), hadoop 2.5.1-mapr-1503 and akka 2.3.10. Only with this combination I have succeeded to run spark-jobs on mesos at all. Different versions are causing class loader issues. I am submitting spark jobs with spark-submit with mesos://zk://.../mesos. Are you using coarse grained or fine grained mode? sandbox log of slave-node app01 (the one that stalls) shows following: 10:01:25.815506 35409 fetcher.cpp:214] Fetching URI 'hdfs://dev-hadoop01/apps/spark-1.3.1-bin-hadoop2.4.tgz' 10:01:26.497764 35409 fetcher.cpp:99] Fetching URI 'hdfs://dev-hadoop01/apps/spark-1.3.1-bin-hadoop2.4.tgz' using Hadoop Client 10:01:26.497869 35409 fetcher.cpp:109] Downloading resource from 'hdfs://dev-hadoop01/apps/spark-1.3.1-bin-hadoop2.4.tgz' to '/tmp/mesos/slaves/20150511-150924-3410235146-5050-1903-S3/frameworks/20150511-150924-3410235146-5050-1903-0249/executors/20150511-150924-3410235146-5050-1903-S3/runs/ec3a0f13-2f44-4952-bb23-4d48abaacc05/spark-1.3.1-bin-hadoop2.4.tgz' 10:01:32.877717 35409 fetcher.cpp:78] Extracted resource '/tmp/mesos/slaves/20150511-150924-3410235146-5050-1903-S3/frameworks/20150511-150924-3410235146-5050-1903-0249/executors/20150511-150924-3410235146-5050-1903-S3/runs/ec3a0f13-2f44-4952-bb23-4d48abaacc05/spark-1.3.1-bin-hadoop2.4.tgz' into '/tmp/mesos/slaves/20150511-150924-3410235146-5050-1903-S3/frameworks/20150511-150924-3410235146-5050-1903-0249/executors/20150511-150924-3410235146-5050-1903-S3/runs/ec3a0f13-2f44-4952-bb23-4d48abaacc05' Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 10:01:34 INFO MesosExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 10:01:34.459292 35730 exec.cpp:132] Version: 0.22.0 *10:01:34 ERROR MesosExecutorBackend: Received launchTask but executor was null* 10:01:34.540870 35765 exec.cpp:206] Executor registered on slave 20150511-150924-3410235146-5050-1903-S3 10:01:34 INFO MesosExecutorBackend: Registered with Mesos as executor ID 20150511-150924-3410235146-5050-1903-S3 with 1 cpus It looks like an inconsistent state on the Mesos scheduler. It tries to launch a task on a given slave before the executor has registered. This code was improved/refactored in 1.4, could you try 1.4.0-RC1? Yes and note the second message after the error you highlighted; that's when the executor would be registered with Mesos and the local object created. iulian 10:01:34 INFO SecurityManager: Changing view acls to... 10:01:35 INFO Slf4jLogger: Slf4jLogger started 10:01:35 INFO Remoting: Starting remoting 10:01:35 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@app01:xxx] 10:01:35 INFO Utils: Successfully started service 'sparkExecutor' on port xxx. 10:01:35 INFO AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@dev-web01/user/MapOutputTracker 10:01:35 INFO AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkDriver@dev-web01/user/BlockManagerMaster 10:01:36 INFO DiskBlockManager: Created local directory at /tmp/spark-52a6585a-f9f2-4ab6-bebc-76be99b0c51c/blockmgr-e6d79818-fe30-4b5c-bcd6-8fbc5a201252 10:01:36 INFO MemoryStore: MemoryStore started with capacity 88.3 MB 10:01:36 WARN
Re: Spark SQL: preferred syntax for column reference?
Is the $foo or mydf(foo) or both checked at compile time to verify that the column reference is valid? Thx. Dean On Wednesday, May 13, 2015, Michael Armbrust mich...@databricks.com wrote: I would not say that either method is preferred (neither is old/deprecated). One advantage to the second is that you are referencing a column from a specific dataframe, instead of just providing a string that will be resolved much like an identifier in a SQL query. This means given: df1 = [id: int, name: string ] df2 = [id: int, zip: int] I can do something like: df1.join(df2, df1(id) === df2(id)) Where as I would need aliases if I was only using strings: df1.as(a).join(df2.as(b), $a.id === $b.id) On Wed, May 13, 2015 at 9:55 AM, Diana Carroll dcarr...@cloudera.com javascript:_e(%7B%7D,'cvml','dcarr...@cloudera.com'); wrote: I'm just getting started with Spark SQL and DataFrames in 1.3.0. I notice that the Spark API shows a different syntax for referencing columns in a dataframe than the Spark SQL Programming Guide. For instance, the API docs for the select method show this: df.select($colA, $colB) Whereas the programming guide shows this: df.filter(df(name) 21).show() I tested and both the $column and df(column) syntax works, but I'm wondering which is *preferred*. Is one the original and one a new feature we should be using? Thanks, Diana (Spark Curriculum Developer for Cloudera) -- Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
Re: value toDF is not a member of RDD object
It's the import statement Olivier showed that makes the method available. Note that you can also use `sc.createDataFrame(myRDD)`, without the need for the import statement. I personally prefer this approach. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote: you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, and got a compile error when packaging with sbt: [error] myfile.scala:30: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM I double checked my code includes import sqlContext.implicits._ after reading this post https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E on spark mailing list, even tried to use toDF(col1, col2) suggested by Xiangrui Meng in that post and got the same error. The Spark version is specified in build.sbt file as follows: scalaVersion := 2.11.6 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 % provided libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1 Anyone have ideas the cause of this error? REGARDS, Todd Leo
Re: Spark - Timeout Issues - OutOfMemoryError
I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote: IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest code you can. Make it work first. Then, if it really isn't fast enough, look for actual evidence of bottlenecks and optimize those. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this http
Re: Questions about Accumulators
Yes, correct. However, note that when an accumulator operation is *idempotent*, meaning that repeated application for the same data behaves exactly like one application, then that accumulator can be safely called in transformation steps (non-actions), too. For example, max and min tracking. Just last week I wrote one that used a hash map to track the latest timestamps seen for specific keys. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 8:07 AM, xiazhuchang hk8...@163.com wrote: “For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed. ” Is this mean the guarantees(accumulator only be updated once) only in actions? That is to say, one should use the accumulator only in actions, orelse there may be some errors(update more than once) if used in transformations? e.g. map(x = accumulator += x) After executed, the correct result of accumulator should be 1; Unfortunately, some errors happened, restart task, the map() operation re-executed(map(x = accumulator += x) re-executed), then the final result of acculumator will be 2, twice as the correct result? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746p22747.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark - Timeout Issues - OutOfMemoryError
How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to copy too much data back to it. If you're trying to force a map-side join, Spark can do that for you in some cases within the regular DataFrame/RDD context. See http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning and this talk by Michael Armbrust for example, http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Full Exception *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 s* *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)]) *Code at line 37* val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) } .collectAsMap Listing data set size is 26G (10 files) and my driver memory is 12G (I cant go beyond it). The reason i do collectAsMap is to brodcast it and do a map-side join instead of regular join. Please suggest ? On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: My Spark Job is failing and i see == 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] I see
Re: Spark - Timeout Issues - OutOfMemoryError
IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest code you can. Make it work first. Then, if it really isn't fast enough, look for actual evidence of bottlenecks and optimize those. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist and http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html Looks like Kryo (2.21v) changed something to stop using default constructors. (Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); Please suggest Trace: 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in stage 2.0 (TID 774) com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: values (org.apache.avro.generic.GenericData$Record) datum (org.apache.avro.mapred.AvroKey) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) Regards, Any suggestions. I am not able to get this thing to work over a month now, its kind of getting frustrating. On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com wrote: How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to copy too much data back to it. If you're trying to force a map-side join, Spark can do that for you in some cases within the regular DataFrame/RDD context. See http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning and this talk by Michael Armbrust for example, http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ
Re: Spark distributed SQL: JSON Data set on all worker node
Note that each JSON object has to be on a single line in the files. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 4:14 AM, ayan guha guha.a...@gmail.com wrote: Yes it is possible. You need to use jsonfile method on SQL context and then create a dataframe from the rdd. Then register it as a table. Should be 3 lines of code, thanks to spark. You may see few YouTube video esp for unifying pipelines. On 3 May 2015 19:02, Jai jai4l...@gmail.com wrote: Hi, I am noob to spark and related technology. i have JSON stored at same location on all worker clients spark cluster). I am looking to load JSON data set on these clients and do SQL query, like distributed SQL. is it possible to achieve? right now, master submits task to one node only. Thanks and regards Mrityunjay
Re: java.io.IOException: No space left on device
Or multiple volumes. The LOCAL_DIRS (YARN) and SPARK_LOCAL_DIRS (Mesos, Standalone) environment variables and the spark.local.dir property control where temporary data is written. The default is /tmp. See http://spark.apache.org/docs/latest/configuration.html#runtime-environment for more details. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Apr 29, 2015 at 6:19 AM, Anshul Singhle ans...@betaglide.com wrote: Do you have multiple disks? Maybe your work directory is not in the right disk? On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com wrote: Hi, I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf output,the training data is a file containing 156060 (size 8.1M). The problem is that when trying to presist a partition into memory and there is not enought memory, the partition is persisted on disk and despite Having 229G of free disk space, I got No space left on device.. This is how I'm running the program : ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv testData.tsv And this is a part of the log: If you need more informations, please let me know. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException: No space left on device
Makes sense. / is where /tmp would be. However, 230G should be plenty of space. If you have INFO logging turned on (set in $SPARK_HOME/conf/log4j.properties), you'll see messages about saving data to disk that will list sizes. The web console also has some summary information about this. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Apr 29, 2015 at 6:25 AM, selim namsi selim.na...@gmail.com wrote: This is the output of df -h so as you can see I'm using only one disk mounted on / df -h Filesystem Size Used Avail Use% Mounted on /dev/sda8 276G 34G 229G 13% /none4.0K 0 4.0K 0% /sys/fs/cgroup udev7.8G 4.0K 7.8G 1% /dev tmpfs 1.6G 1.4M 1.6G 1% /runnone5.0M 0 5.0M 0% /run/locknone7.8G 37M 7.8G 1% /run/shmnone 100M 40K 100M 1% /run/user /dev/sda1 496M 55M 442M 11% /boot/efi Also when running the program, I noticed that the Used% disk space related to the partition mounted on / was growing very fast On Wed, Apr 29, 2015 at 12:19 PM Anshul Singhle ans...@betaglide.com wrote: Do you have multiple disks? Maybe your work directory is not in the right disk? On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com wrote: Hi, I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf output,the training data is a file containing 156060 (size 8.1M). The problem is that when trying to presist a partition into memory and there is not enought memory, the partition is persisted on disk and despite Having 229G of free disk space, I got No space left on device.. This is how I'm running the program : ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv testData.tsv And this is a part of the log: If you need more informations, please let me know. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: A problem of using spark streaming to capture network packets
I would use the ps command on each machine while the job is running to confirm that every process involved is running as root. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 28, 2015 at 8:58 PM, Lin Hao Xu xulin...@cn.ibm.com wrote: btw, from spark web ui, the acl is marked with *root* Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets [image: Inactive hide details for Dean Wampler ---2015/04/29 09:40:34---Are the tasks on the slaves also running as root? If not, that]Dean Wampler ---2015/04/29 09:40:34---Are the tasks on the slaves also running as root? If not, that might explain the problem. From: Dean Wampler deanwamp...@gmail.com To: Lin Hao Xu/China/IBM@IBMCN Cc: Hai Shan Wu/China/IBM@IBMCN, user user@spark.apache.org Date: 2015/04/29 09:40 Subject: Re: A problem of using spark streaming to capture network packets -- Are the tasks on the slaves also running as root? If not, that might explain the problem. dean Dean Wampler, Ph.D. Author: *Programming Scala, 2nd Edition* http://shop.oreilly.com/product/0636920033073.do (O'Reilly) *Typesafe* http://typesafe.com/ *@deanwampler* http://twitter.com/deanwampler *http://polyglotprogramming.com* http://polyglotprogramming.com/ On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu *xulin...@cn.ibm.com* xulin...@cn.ibm.com wrote: 1. The full command line is written in a shell script: LIB=/home/spark/.m2/repository /opt/spark/bin/spark-submit \ --class spark.pcap.run.TestPcapSpark \ --jars $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/ org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar \ /home/spark/napa/napa.jar 2. And we run this script with *sudo*, if you do not use sudo, then you cannot access network interface. 3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in a standard Java program, it really worked like a champion. Best regards, Lin Hao XU IBM Research China Email: *xulin...@cn.ibm.com* xulin...@cn.ibm.com My Flickr: *http://www.flickr.com/photos/xulinhao/sets* http://www.flickr.com/photos/xulinhao/sets [image: Inactive hide details for Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you u]Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you use to submit the job? From: Dean Wampler *deanwamp...@gmail.com* deanwamp...@gmail.com To: Hai Shan Wu/China/IBM@IBMCN Cc: user *user@spark.apache.org* user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN Date: 2015/04/28 20:07 Subject: Re: A problem of using spark streaming to capture network packets -- It's probably not your code. What's the full command line you use to submit the job? Are you sure the job on the cluster has access to the network interface? Can you test the receiver by itself without Spark? For example, does this line work as expected: ListPcapNetworkInterface nifs = Pcaps.findAllDevs(); dean Dean Wampler, Ph.D. Author: *Programming Scala, 2nd Edition* http://shop.oreilly.com/product/0636920033073.do (O'Reilly) *Typesafe* http://typesafe.com/ *@deanwampler* http://twitter.com/deanwampler *http://polyglotprogramming.com* http://polyglotprogramming.com/ On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu *wuh...@cn.ibm.com* wuh...@cn.ibm.com wrote: Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! * (See attached file: PcapReceiver.java)(See attached file: TestPcapSpark.java)* Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: *wuh...@cn.ibm.com* wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM - To unsubscribe, e-mail: *user-unsubscr...@spark.apache.org
Re: A problem of using spark streaming to capture network packets
Are the tasks on the slaves also running as root? If not, that might explain the problem. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu xulin...@cn.ibm.com wrote: 1. The full command line is written in a shell script: LIB=/home/spark/.m2/repository /opt/spark/bin/spark-submit \ --class spark.pcap.run.TestPcapSpark \ --jars $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/ org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar \ /home/spark/napa/napa.jar 2. And we run this script with *sudo*, if you do not use sudo, then you cannot access network interface. 3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in a standard Java program, it really worked like a champion. Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets [image: Inactive hide details for Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you u]Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you use to submit the job? From: Dean Wampler deanwamp...@gmail.com To: Hai Shan Wu/China/IBM@IBMCN Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN Date: 2015/04/28 20:07 Subject: Re: A problem of using spark streaming to capture network packets -- It's probably not your code. What's the full command line you use to submit the job? Are you sure the job on the cluster has access to the network interface? Can you test the receiver by itself without Spark? For example, does this line work as expected: ListPcapNetworkInterface nifs = Pcaps.findAllDevs(); dean Dean Wampler, Ph.D. Author: *Programming Scala, 2nd Edition* http://shop.oreilly.com/product/0636920033073.do (O'Reilly) *Typesafe* http://typesafe.com/ *@deanwampler* http://twitter.com/deanwampler *http://polyglotprogramming.com* http://polyglotprogramming.com/ On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu *wuh...@cn.ibm.com* wuh...@cn.ibm.com wrote: Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! * (See attached file: PcapReceiver.java)(See attached file: TestPcapSpark.java)* Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: *wuh...@cn.ibm.com* wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM - To unsubscribe, e-mail: *user-unsubscr...@spark.apache.org* user-unsubscr...@spark.apache.org For additional commands, e-mail: *user-h...@spark.apache.org* user-h...@spark.apache.org
Re: A problem of using spark streaming to capture network packets
It's probably not your code. What's the full command line you use to submit the job? Are you sure the job on the cluster has access to the network interface? Can you test the receiver by itself without Spark? For example, does this line work as expected: ListPcapNetworkInterface nifs = Pcaps.findAllDevs(); dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote: Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! *(See attached file: PcapReceiver.java)**(See attached file: TestPcapSpark.java)* Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Cluster Setup
It's mostly manual. You could try automating with something like Chef, of course, but there's nothing already available in terms of automation. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Apr 24, 2015 at 10:33 AM, James King jakwebin...@gmail.com wrote: Thanks Dean, Sure I have that setup locally and testing it with ZK. But to start my multiple Masters do I need to go to each host and start there or is there a better way to do this. Regards jk On Fri, Apr 24, 2015 at 5:23 PM, Dean Wampler deanwamp...@gmail.com wrote: The convention for standalone cluster is to use Zookeeper to manage master failover. http://spark.apache.org/docs/latest/spark-standalone.html Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Apr 24, 2015 at 5:01 AM, James King jakwebin...@gmail.com wrote: I'm trying to find out how to setup a resilient Spark cluster. Things I'm thinking about include: - How to start multiple masters on different hosts? - there isn't a conf/masters file from what I can see Thank you.
Re: Spark Cluster Setup
The convention for standalone cluster is to use Zookeeper to manage master failover. http://spark.apache.org/docs/latest/spark-standalone.html Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Apr 24, 2015 at 5:01 AM, James King jakwebin...@gmail.com wrote: I'm trying to find out how to setup a resilient Spark cluster. Things I'm thinking about include: - How to start multiple masters on different hosts? - there isn't a conf/masters file from what I can see Thank you.
Re: Instantiating/starting Spark jobs programmatically
I strongly recommend spawning a new process for the Spark jobs. Much cleaner separation. Your driver program won't be clobbered if the Spark job dies, etc. It can even watch for failures and restart. In the Scala standard library, the sys.process package has classes for constructing and interoperating with external processes. Perhaps Java has something similar these days? dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 21, 2015 at 2:15 PM, Steve Loughran ste...@hortonworks.com wrote: On 21 Apr 2015, at 17:34, Richard Marscher rmarsc...@localytics.com wrote: - There are System.exit calls built into Spark as of now that could kill your running JVM. We have shadowed some of the most offensive bits within our own application to work around this. You'd likely want to do that or to do your own Spark fork. For example, if the SparkContext can't connect to your cluster master node when it is created, it will System.exit. people can block errant System.exit calls by running under a SecurityManager. Less than ideal (and there's a small performance hit) -but possible
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
I wasn't involved in this decision (I just make the fries), but CompactBuffer is designed for relatively small data sets that at least fit in memory. It's more or less an Array. In principle, returning an iterator could hide the actual data structure that might be needed to hold a much bigger data set, if necessary. HOWEVER, it actually returns a CompactBuffer. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444 Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote: Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Scala Version?
Without the rest of your code it's hard to make sense of errors. Why do you need to use reflection? Make sure you use the same Scala versions throughout and 2.10.4 is recommended. That's still the official version for Spark, even though provisional support for 2.11 exists. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 21, 2015 at 12:26 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: While running a my Spark Application over 1.3.0 with Scala 2.10.0 i encountered 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage 2.0 (TID 28) java.lang.UnsupportedOperationException: tail of empty list at scala.collection.immutable.Nil$.tail(List.scala:339) at scala.collection.immutable.Nil$.tail(List.scala:334) at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172) My app has quite a lot of reflection usage as i need to build avro schema. I was told on stack overflow that this error is because reflection is not thread safe in scala 2.10 and was advised to use scala 2.11 Hence I moved to scala 2.11 and moved all spark maven dependencies to 2.11 (spark-avro is still 2.10 as 2.11 is not available). I ran into this 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage 2.0 (TID 28) java.lang.UnsupportedOperationException: tail of empty list at scala.collection.immutable.Nil$.tail(List.scala:339) at scala.collection.immutable.Nil$.tail(List.scala:334) at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172) Code: args.map { option = { val pairs = option.split(=) arguments += pairs(0) - pairs(1) } } Error is at -. Does Spark use scala libs 2.10 at runtime ? Any suggestions for fix ? -- Deepak
Re: Spark on Windows
If you're running Hadoop, too, now that Hortonworks supports Spark, you might be able to use their distribution. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 16, 2015 at 2:19 PM, Arun Lists lists.a...@gmail.com wrote: Thanks, Matei! We'll try that and let you know if it works. You are correct in inferring that some of the problems we had were with dependencies. We also had problems with the spark-submit scripts. I will get the details from the engineer who worked on the Windows builds and provide them to you. arun On Thu, Apr 16, 2015 at 10:44 AM, Matei Zaharia matei.zaha...@gmail.com wrote: You could build Spark with Scala 2.11 on Mac / Linux and transfer it over to Windows. AFAIK it should build on Windows too, the only problem is that Maven might take a long time to download dependencies. What errors are you seeing? Matei On Apr 16, 2015, at 9:23 AM, Arun Lists lists.a...@gmail.com wrote: We run Spark on Mac and Linux but also need to run it on Windows 8.1 and Windows Server. We ran into problems with the Scala 2.10 binary bundle for Spark 1.3.0 but managed to get it working. However, on Mac/Linux, we are on Scala 2.11.6 (we built Spark from the sources). On Windows, however despite our best efforts we cannot get Spark 1.3.0 as built from sources working for Scala 2.11.6. Spark has too many moving parts and dependencies! When can we expect to see a binary bundle for Spark 1.3.0 that is built for Scala 2.11.6? I read somewhere that the only reason that Spark 1.3.0 is still built for Scala 2.10 is because Kafka is still on Scala 2.10. For those of us who don't use Kafka, can we have a Scala 2.10 bundle. If there isn't an official bundle arriving any time soon, can someone who has built it for Windows 8.1 successfully please share with the group? Thanks, arun
Re: Need some guidance
That appears to work, with a few changes to get the types correct: input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com wrote: How about this? input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2).collect() On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com wrote: The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: Need some guidance
The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: task not serialize
Foreach() runs in parallel across the cluster, like map, flatMap, etc. You'll only run into problems if you call collect(), which brings the entire RDD into memory in the driver program. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 7, 2015 at 3:50 AM, Jeetendra Gangele gangele...@gmail.com wrote: Lets say I follow below approach and I got RddPair with huge size .. which can not fit into one machine ... what to run foreach on this RDD? On 7 April 2015 at 04:25, Jeetendra Gangele gangele...@gmail.com wrote: On 7 April 2015 at 04:03, Dean Wampler deanwamp...@gmail.com wrote: On Mon, Apr 6, 2015 at 6:20 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks a lot.That means Spark does not support the nested RDD? if I pass the javaSparkContext that also wont work. I mean passing SparkContext not possible since its not serializable That's right. RDD don't nest and SparkContexts aren't serializable. i have a requirement where I will get JavaRDDVendorRecord matchRdd and I need to return the postential matches for this record from Hbase. so for each field of VendorRecord I have to do following 1. query Hbase to get the list of potential record in RDD 2. run logistic regression on RDD return from steps 1 and each element of the passed matchRdd. If I understand you correctly, each VectorRecord could correspond to 0-to-N records in HBase, which you need to fetch, true? yes thats correct each Vendorrecord corresponds to 0 to N matches If so, you could use the RDD flatMap method, which takes a function a that accepts each record, then returns a sequence of 0-to-N new records of some other type, like your HBase records. However, running an HBase query for each VendorRecord could be expensive. If you can turn this into a range query or something like that, it would help. I haven't used HBase much, so I don't have good advice on optimizing this, if necessary. Alternatively, can you do some sort of join on the VendorRecord RDD and an RDD of query results from HBase? Join will give too big result RDD of query result is returning around 1 for each record and i have 2 millions to process so it will be huge to have this. 2 m*1 big number For #2, it sounds like you need flatMap to return records that combine the input VendorRecords and fields pulled from HBase. Whatever you can do to make this work like table scans and joins will probably be most efficient. dean On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote: The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); } -
Re: FlatMapPair run for longer time
It's hard for us to diagnose your performance problems, because we don't have your environment and fixing one will simply reveal the next one to be fixed. So, I suggest you use the following strategy to figure out what takes the most time and hence what you might try to optimize. Try replacing expressions that might be expensive with stubs. Your calls to HBase for example. What happens if your replace the call with fake, hard-coded data? Does performance improve dramatically? If so, then optimize how you query HBase. If it makes no significant difference, try something else. Also try looking at the Spark source code to understand what happens under the hood. At this point, your best bet is to develop your intuition about the performance overhead of various constructs in real-world scenarios and how Spark distributes computation. Then you'll find it easier to know what to optimize. You'll want to understand what happens in flatMap, filter, join, groupBy, reduce, etc. Don't forget this guide, too: https://spark.apache.org/docs/latest/tuning.html. The Learning Spark book from O'Reilly is also really helpful. I also recommend that you switch to Java 8 and Lambdas, or go all the way to Scala, so all that noisy code shrinks down to simpler expressions. You'll be surprised how helpful that is for comprehending your code and reasoning about it. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 7, 2015 at 12:54 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am running the below code and its running for very long time where input to flatMapTopair is record of 50K. and I am calling Hbase for 50K times just a range scan query to should not take time. can anybody guide me what is wrong here? JavaPairRDDVendorRecord, IterableVendorRecord pairvendorData =matchRdd.flatMapToPair( new PairFlatMapFunctionVendorRecord, VendorRecord, VendorRecord(){ @Override public IterableTuple2VendorRecord,VendorRecord call( VendorRecord t) throws Exception { ListTuple2VendorRecord, VendorRecord pairs = new LinkedListTuple2VendorRecord, VendorRecord(); MatcherKeys matchkeys=CompanyMatcherHelper.getBlockinkeys(t); ListVendorRecord Matchedrecords =ckdao.getMatchingRecordsWithscan(matchkeys); for(int i=0;iMatchedrecords.size();i++){ pairs.add( new Tuple2VendorRecord,VendorRecord(t,Matchedrecords.get(i))); } return pairs; } } ).groupByKey(200).persist(StorageLevel.DISK_ONLY_2());
Re: task not serialize
On Mon, Apr 6, 2015 at 6:20 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks a lot.That means Spark does not support the nested RDD? if I pass the javaSparkContext that also wont work. I mean passing SparkContext not possible since its not serializable That's right. RDD don't nest and SparkContexts aren't serializable. i have a requirement where I will get JavaRDDVendorRecord matchRdd and I need to return the postential matches for this record from Hbase. so for each field of VendorRecord I have to do following 1. query Hbase to get the list of potential record in RDD 2. run logistic regression on RDD return from steps 1 and each element of the passed matchRdd. If I understand you correctly, each VectorRecord could correspond to 0-to-N records in HBase, which you need to fetch, true? If so, you could use the RDD flatMap method, which takes a function a that accepts each record, then returns a sequence of 0-to-N new records of some other type, like your HBase records. However, running an HBase query for each VendorRecord could be expensive. If you can turn this into a range query or something like that, it would help. I haven't used HBase much, so I don't have good advice on optimizing this, if necessary. Alternatively, can you do some sort of join on the VendorRecord RDD and an RDD of query results from HBase? For #2, it sounds like you need flatMap to return records that combine the input VendorRecords and fields pulled from HBase. Whatever you can do to make this work like table scans and joins will probably be most efficient. dean On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote: The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); }
Re: task not serialize
The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); }
Re: conversion from java collection type to scala JavaRDDObject
The runtime attempts to serialize everything required by records, and also any lambdas/closures you use. Small, simple types are less likely to run into this problem. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, Apr 5, 2015 at 4:21 PM, Jeetendra Gangele gangele...@gmail.com wrote: You are right I have class called VendorRecord which is not serializable also this class object have many sub classed(may be 30 or more).Do I need to recursively serialize all? On 4 April 2015 at 18:14, Dean Wampler deanwamp...@gmail.com wrote: Without the rest of your code, it's hard to know what might be unserializable. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sat, Apr 4, 2015 at 7:56 AM, Jeetendra Gangele gangele...@gmail.com wrote: Hi I have tried with parallelize but i got the below exception java.io.NotSerializableException: pacific.dr.VendorRecord Here is my code ListVendorRecord vendorRecords=blockingKeys.getMatchingRecordsWithscan(matchKeysOutput); JavaRDDVendorRecord lines = sc.parallelize(vendorRecords) On 2 April 2015 at 21:11, Dean Wampler deanwamp...@gmail.com wrote: Use JavaSparkContext.parallelize. http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#parallelize(java.util.List) Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 11:33 AM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All Is there an way to make the JavaRDDObject from existing java collection type ListObject? I know this can be done using scala , but i am looking how to do this using java. Regards Jeetendra
Re: conversion from java collection type to scala JavaRDDObject
Without the rest of your code, it's hard to know what might be unserializable. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sat, Apr 4, 2015 at 7:56 AM, Jeetendra Gangele gangele...@gmail.com wrote: Hi I have tried with parallelize but i got the below exception java.io.NotSerializableException: pacific.dr.VendorRecord Here is my code ListVendorRecord vendorRecords=blockingKeys.getMatchingRecordsWithscan(matchKeysOutput); JavaRDDVendorRecord lines = sc.parallelize(vendorRecords) On 2 April 2015 at 21:11, Dean Wampler deanwamp...@gmail.com wrote: Use JavaSparkContext.parallelize. http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#parallelize(java.util.List) Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 11:33 AM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All Is there an way to make the JavaRDDObject from existing java collection type ListObject? I know this can be done using scala , but i am looking how to do this using java. Regards Jeetendra
Re: UNRESOLVED DEPENDENCIES while building Spark 1.3.0
Use the MVN build instead. From the README in the git repo ( https://github.com/apache/spark) mvn -DskipTests clean package Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sat, Apr 4, 2015 at 4:39 PM, mas mas.ha...@gmail.com wrote: Hi All, I am trying to build spark 1.3.O on standalone Ubuntu 14.04. I am using the sbt command i.e. sbt/sbt assembly to build it. This command works pretty good with spark version 1.1 however, it gives following error with spark 1.3.0. Any help or suggestions to resolve this would highly be appreciated. [info] Done updating. [info] Updating {file:/home/roott/aamirTest/spark/}network-shuffle... [info] Resolving org.fusesource.jansi#jansi;1.4 ... [warn] :: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :: [warn] :: org.apache.spark#spark-network-common_2.10;1.3.0: configuration not p ublic in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was requir ed from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test [warn] :: [warn] [warn] Note: Unresolved dependencies path: [warn] org.apache.spark:spark-network-common_2.10:1.3.0 ((com.typesafe. sbt.pom.MavenHelper) MavenHelper.scala#L76) [warn]+- org.apache.spark:spark-network-shuffle_2.10:1.3.0 sbt.ResolveException: unresolved dependency: org.apache.spark#spark-network-comm on_2.10;1.3.0: configuration not public in org.apache.spark#spark-network-common _2.10;1.3.0: 'test'. It was required from org.apache.spark#spark-network-shuffle _2.10;1.3.0 test at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:278) at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:175) at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:157) at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151) at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151) at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:128) at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:56) at sbt.IvySbt$$anon$4.call(Ivy.scala:64) at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93) at xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRet ries$1(Locks.scala:78) at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala: 97) at xsbt.boot.Using$.withResource(Using.scala:10) at xsbt.boot.Using$.apply(Using.scala:9) at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58) at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48) at xsbt.boot.Locks$.apply0(Locks.scala:31) at xsbt.boot.Locks$.apply(Locks.scala:28) at sbt.IvySbt.withDefaultLogger(Ivy.scala:64) at sbt.IvySbt.withIvy(Ivy.scala:123) at sbt.IvySbt.withIvy(Ivy.scala:120) at sbt.IvySbt$Module.withModule(Ivy.scala:151) at sbt.IvyActions$.updateEither(IvyActions.scala:157) at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala :1318) at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala :1315) at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1 345) at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1 343) at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35) at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1348) at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1342) at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45) at sbt.Classpaths$.cachedUpdate(Defaults.scala:1360) at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1300) at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1275) at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40) at sbt.std.Transform$$anon$4.work(System.scala:63) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:22 6) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:22 6) at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17) at sbt.Execute.work(Execute.scala:235) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestric tions.scala:159) at sbt.CompletionService$$anon$2.call(CompletionService.scala:28
Re: ArrayBuffer within a DataFrame
A hack workaround is to use flatMap: rdd.flatMap{ case (date, array) = for (x - array) yield (date, x) } For those of you who don't know Scala, the for comprehension iterates through the ArrayBuffer, named array and yields new tuples with the date and each element. The case expression to the left of the = pattern matches on the input tuples. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 10:45 PM, Denny Lee denny.g@gmail.com wrote: Thanks Michael - that was it! I was drawing a blank on this one for some reason - much appreciated! On Thu, Apr 2, 2015 at 8:27 PM Michael Armbrust mich...@databricks.com wrote: A lateral view explode using HiveQL. I'm hopping to add explode shorthand directly to the df API in 1.4. On Thu, Apr 2, 2015 at 7:10 PM, Denny Lee denny.g@gmail.com wrote: Quick question - the output of a dataframe is in the format of: [2015-04, ArrayBuffer(A, B, C, D)] and I'd like to return it as: 2015-04, A 2015-04, B 2015-04, C 2015-04, D What's the best way to do this? Thanks in advance!
Re: Reading a large file (binary) into RDD
This might be overkill for your needs, but the scodec parser combinator library might be useful for creating a parser. https://github.com/scodec/scodec Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 6:53 PM, java8964 java8...@hotmail.com wrote: I think implementing your own InputFormat and using SparkContext.hadoopFile() is the best option for your case. Yong -- From: kvi...@vt.edu Date: Thu, 2 Apr 2015 17:31:30 -0400 Subject: Re: Reading a large file (binary) into RDD To: freeman.jer...@gmail.com CC: user@spark.apache.org The file has a specific structure. I outline it below. The input file is basically a representation of a graph. INT INT(A) LONG (B) A INTs(Degrees) A SHORTINTs (Vertex_Attribute) B INTs B INTs B SHORTINTs B SHORTINTs A - number of vertices B - number of edges (note that the INTs/SHORTINTs associated with this are edge attributes) After reading in the file, I need to create two RDDs (one with vertices and the other with edges) On Thu, Apr 2, 2015 at 4:46 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hm, that will indeed be trickier because this method assumes records are the same byte size. Is the file an arbitrary sequence of mixed types, or is there structure, e.g. short, long, short, long, etc.? If you could post a gist with an example of the kind of file and how it should look once read in that would be useful! - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 2:09 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: Thanks for the reply. Unfortunately, in my case, the binary file is a mix of short and long integers. Is there any other way that could of use here? My current method happens to have a large overhead (much more than actual computation time). Also, I am short of memory at the driver when it has to read the entire file. On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: If it’s a flat binary file and each record is the same length (in bytes), you can use Spark’s binaryRecords method (defined on the SparkContext), which loads records from one or more large flat binary files into an RDD. Here’s an example in python to show how it works: # write data from an array from numpy import random dat = random.randn(100,5) f = open('test.bin', 'w') f.write(dat) f.close() # load the data back in from numpy import frombuffer nrecords = 5 bytesize = 8 recordsize = nrecords * bytesize data = sc.binaryRecords('test.bin', recordsize) parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float')) # these should be equal parsed.first() dat[0,:] Does that help? - jeremyfreeman.net @thefreemanlab On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: What are some efficient ways to read a large file into RDDs? For example, have several executors read a specific/unique portion of the file and construct RDDs. Is this possible to do in Spark? Currently, I am doing a line-by-line read of the file at the driver and constructing the RDD.
Re: Error in SparkSQL/Scala IDE
It failed to find the class class org.apache.spark.sql.catalyst.ScalaReflection in the Spark SQL library. Make sure it's in the classpath and the version is correct, too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 8:39 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Hi Everyone, I am getting following error while registering table using Scala IDE. Please let me know how to resolve this error. I am using Spark 1.2.1 import sqlContext.createSchemaRDD val empFile = sc.textFile(/tmp/emp.csv, 4) .map ( _.split(,) ) .map( row= Employee(row(0),row(1), row(2), row(3), row( 4))) empFile.registerTempTable(Employees) Thanks Sathish Exception in thread main scala.reflect.internal.MissingRequirementError: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-library.jar:/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-reflect.jar:/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-actor.jar:/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-swing.jar:/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-compiler.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/classes] not found. at scala.reflect.internal.MissingRequirementError$.signal( MissingRequirementError.scala:16) at scala.reflect.internal.MissingRequirementError$.notFound( MissingRequirementError.scala:17) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass( Mirrors.scala:48) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass( Mirrors.scala:61) at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass( Mirrors.scala:72) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21) at org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply( ScalaReflection.scala:115) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute( TypeTags.scala:231) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:335) at scala.reflect.api.Universe.typeOf(Universe.scala:59) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor( ScalaReflection.scala:115) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor( ScalaReflection.scala:33) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor( ScalaReflection.scala:100) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor( ScalaReflection.scala:33) at org.apache.spark.sql.catalyst.ScalaReflection$class.attributesFor( ScalaReflection.scala:94) at org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor( ScalaReflection.scala:33) at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:111) at com.svairavelu.examples.QueryCSV$.main(QueryCSV.scala:24) at com.svairavelu.examples.QueryCSV.main(QueryCSV.scala)
Re: How to learn Spark ?
I have a self-study workshop here: https://github.com/deanwampler/spark-workshop dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 8:33 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: You can start with http://spark.apache.org/docs/1.3.0/index.html Also get the Learning Spark book http://amzn.to/1NDFI5x. It's great. Enjoy! Vadim ᐧ On Thu, Apr 2, 2015 at 4:19 AM, Star Guo st...@ceph.me wrote: Hi, all I am new to here. Could you give me some suggestion to learn Spark ? Thanks. Best Regards, Star Guo
Re: workers no route to host
It appears you are using a Cloudera Spark build, 1.3.0-cdh5.4.0-SNAPSHOT, which expects to find the hadoop command: /data/PlatformDep/cdh5/dist/bin/compute-classpath.sh: line 164: hadoop: command not found If you don't want to use Hadoop, download one of the pre-built Spark releases from spark.apache.org. Even the Hadoop builds there will work okay, as they don't actually attempt to run Hadoop commands. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Mar 31, 2015 at 3:12 AM, ZhuGe t...@outlook.com wrote: Hi, i set up a standalone cluster of 5 machines(tmaster, tslave1,2,3,4) with spark-1.3.0-cdh5.4.0-snapshort. when i execute the sbin/start-all.sh, the master is ok, but i cant see the web ui. Moreover, the worker logs is something like this: Spark assembly has been built with Hive, including Datanucleus jars on classpath /data/PlatformDep/cdh5/dist/bin/compute-classpath.sh: line 164: hadoop: command not found Spark Command: java -cp :/data/PlatformDep/cdh5/dist/sbin/../conf:/data/PlatformDep/cdh5/dist/lib/spark-assembly-1.3.0-cdh5.4.0-SNAPSHOT-hadoop2.6.0-cdh5.4.0-SNAPSHOT.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-rdbms-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-api-jdo-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-core-3.2.2.jar: -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://192.168.128.16:7071 --webui-port 8081 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/31 06:47:22 INFO Worker: Registered signal handlers for [TERM, HUP, INT] 15/03/31 06:47:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/31 06:47:23 INFO SecurityManager: Changing view acls to: dcadmin 15/03/31 06:47:23 INFO SecurityManager: Changing modify acls to: dcadmin 15/03/31 06:47:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dcadmin); users with modify permissions: Set(dcadmin) 15/03/31 06:47:23 INFO Slf4jLogger: Slf4jLogger started 15/03/31 06:47:23 INFO Remoting: Starting remoting 15/03/31 06:47:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@tslave2:60815] 15/03/31 06:47:24 INFO Utils: Successfully started service 'sparkWorker' on port 60815. 15/03/31 06:47:24 INFO Worker: Starting Spark worker tslave2:60815 with 2 cores, 3.0 GB RAM 15/03/31 06:47:24 INFO Worker: Running Spark version 1.3.0 15/03/31 06:47:24 INFO Worker: Spark home: /data/PlatformDep/cdh5/dist 15/03/31 06:47:24 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/31 06:47:24 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 15/03/31 06:47:24 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 15/03/31 06:47:24 INFO WorkerWebUI: Started WorkerWebUI at http://tslave2:8081 15/03/31 06:47:24 INFO Worker: Connecting to master akka.tcp:// sparkMaster@192.168.128.16:7071/user/Master... 15/03/31 06:47:24 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp:// sparkMaster@192.168.128.16:7071]: Error [Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No route to host ] 15/03/31 06:47:24 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp:// sparkMaster@192.168.128.16:7071]: Error [Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No route to host ] 15/03/31 06:47:24 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp:// sparkMaster@192.168.128.16:7071]: Error [Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No route to host ] 15/03/31 06:47:24 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp:// sparkMaster@192.168.128.16:7071]: Error [Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071] the worker machines
Re: conversion from java collection type to scala JavaRDDObject
Use JavaSparkContext.parallelize. http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#parallelize(java.util.List) Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 11:33 AM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All Is there an way to make the JavaRDDObject from existing java collection type ListObject? I know this can be done using scala , but i am looking how to do this using java. Regards Jeetendra
Re: Spark Streaming Error in block pushing thread
Are you allocating 1 core per input stream plus additional cores for the rest of the processing? Each input stream Reader requires a dedicated core. So, if you have two input streams, you'll need local[3] at least. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 11:45 AM, byoung bill.yo...@threatstack.com wrote: I am running a spark streaming stand-alone cluster, connected to rabbitmq endpoint(s). The application will run for 20-30 minutes before failing with the following error: WARN 2015-04-01 21:00:53,944 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 22 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,944 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 23 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,951 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 20 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,951 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 19 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,952 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 18 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,952 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 17 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,952 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 16 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:54,151 org.apache.spark.streaming.scheduler.ReceiverTracker.logWarning.71: Error reported by receiver for stream 0: Error in block pushing thread - java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:166) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:127) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:112) at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:182) at org.apache.spark.streaming.receiver.BlockGenerator.org $apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:155) at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:87) Has anyone run into this before? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Error-in-block-pushing-thread-tp22356.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.0 DataFrame count() method throwing java.io.EOFException
To clarify one thing, is count() the first action ( http://spark.apache.org/docs/latest/programming-guide.html#actions) you're attempting? As defined in the programming guide, an action forces evaluation of the pipeline of RDDs. It's only then that reading the data actually occurs. So, count() might not be the issue, but some upstream step that attempted to read the file. As a sanity check, if you just read the text file and don't convert the strings, then call count(), does that work? If so, it might be something about your JavaBean BERecord after all. Can you post its definition? Also calling take(1) to grab the first element should also work, even if the RDD is empty. (It will return an empty RDD in that case, but not throw an exception.) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 10:16 AM, Ashley Rose ashley.r...@telarix.com wrote: That’s precisely what I was trying to check. It should have 42577 records in it, because that’s how many there were in the text file I read in. // Load a text file and convert each line to a JavaBean. JavaRDDString lines = sc.textFile(file.txt); JavaRDDBERecord tbBER = lines.map(s - convertToBER(s)); // Apply a schema to an RDD of JavaBeans and register it as a table. schemaBERecords = sqlContext.createDataFrame(tbBER, BERecord.class); schemaBERecords.registerTempTable(tbBER); The BERecord class is a standard Java Bean that implements Serializable, so that shouldn’t be the issue. As you said, count() shouldn’t fail like this even if the table was empty. I was able to print out the schema of the DataFrame just fine with df.printSchema(), and I just wanted to see if data was populated correctly. *From:* Dean Wampler [mailto:deanwamp...@gmail.com] *Sent:* Wednesday, April 01, 2015 6:05 PM *To:* Ashley Rose *Cc:* user@spark.apache.org *Subject:* Re: Spark 1.3.0 DataFrame count() method throwing java.io.EOFException Is it possible tbBER is empty? If so, it shouldn't fail like this, of course. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Apr 1, 2015 at 5:57 PM, ARose ashley.r...@telarix.com wrote: Note: I am running Spark on Windows 7 in standalone mode. In my app, I run the following: DataFrame df = sqlContext.sql(SELECT * FROM tbBER); System.out.println(Count: + df.count()); tbBER is registered as a temp table in my SQLContext. When I try to print the number of rows in the DataFrame, the job fails and I get the following error message: java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351
Re: How to learn Spark ?
You're welcome. Two limitations to know about: 1. I haven't updated it to 1.3 2. It uses Scala for all examples (my bias ;), so less useful if you don't want to use Scala. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 12:54 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks Dean. This is great. ᐧ On Thu, Apr 2, 2015 at 9:01 AM, Dean Wampler deanwamp...@gmail.com wrote: I have a self-study workshop here: https://github.com/deanwampler/spark-workshop dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fshop.oreilly.com%2Fproduct%2F0636920033073.dosi=5533377798602752pi=4b4c247b-b7e9-4031-81d5-9b9a8f5f1963 (O'Reilly) Typesafe http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Ftypesafe.com%2Fsi=5533377798602752pi=4b4c247b-b7e9-4031-81d5-9b9a8f5f1963 @deanwampler http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Ftwitter.com%2Fdeanwamplersi=5533377798602752pi=4b4c247b-b7e9-4031-81d5-9b9a8f5f1963 http://polyglotprogramming.com On Thu, Apr 2, 2015 at 8:33 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: You can start with http://spark.apache.org/docs/1.3.0/index.html Also get the Learning Spark book http://amzn.to/1NDFI5x. It's great. Enjoy! Vadim On Thu, Apr 2, 2015 at 4:19 AM, Star Guo st...@ceph.me wrote: Hi, all I am new to here. Could you give me some suggestion to learn Spark ? Thanks. Best Regards, Star Guo
Re: Spark Streaming Error in block pushing thread
I misread that you're running in standalone mode, so ignore the local[3] example ;) How many separate readers are listening to rabbitmq topics? This might not be the problem, but I'm just eliminating possibilities. Another possibility is that the in-bound data rate exceeds your ability to process it. What's your streaming batch window size? See also here for ideas: http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 10:57 AM, Bill Young bill.yo...@threatstack.com wrote: Sorry for the obvious typo, I have 4 workers with 16 cores total* On Thu, Apr 2, 2015 at 11:56 AM, Bill Young bill.yo...@threatstack.com wrote: Thank you for the response, Dean. There are 2 worker nodes, with 8 cores total, attached to the stream. I have the following settings applied: spark.executor.memory 21475m spark.cores.max 16 spark.driver.memory 5235m On Thu, Apr 2, 2015 at 11:50 AM, Dean Wampler deanwamp...@gmail.com wrote: Are you allocating 1 core per input stream plus additional cores for the rest of the processing? Each input stream Reader requires a dedicated core. So, if you have two input streams, you'll need local[3] at least. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 2, 2015 at 11:45 AM, byoung bill.yo...@threatstack.com wrote: I am running a spark streaming stand-alone cluster, connected to rabbitmq endpoint(s). The application will run for 20-30 minutes before failing with the following error: WARN 2015-04-01 21:00:53,944 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 22 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,944 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 23 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,951 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 20 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,951 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 19 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,952 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 18 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,952 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 17 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:53,952 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove RDD 16 - Ask timed out on [Actor[akka.tcp:// sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]] after [3 ms]} WARN 2015-04-01 21:00:54,151 org.apache.spark.streaming.scheduler.ReceiverTracker.logWarning.71: Error reported by receiver for stream 0: Error in block pushing thread - java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:166) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:127) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:112) at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:182) at org.apache.spark.streaming.receiver.BlockGenerator.org $apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:155) at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:87) Has
Re: Spark 1.3.0 DataFrame count() method throwing java.io.EOFException
Is it possible tbBER is empty? If so, it shouldn't fail like this, of course. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Apr 1, 2015 at 5:57 PM, ARose ashley.r...@telarix.com wrote: Note: I am running Spark on Windows 7 in standalone mode. In my app, I run the following: DataFrame df = sqlContext.sql(SELECT * FROM tbBER); System.out.println(Count: + df.count()); tbBER is registered as a temp table in my SQLContext. When I try to print the number of rows in the DataFrame, the job fails and I get the following error message: java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This only happens when I try to call df.count(). The rest runs fine. Is the count() function not supported in standalone mode? The stack trace makes it appear to be Hadoop functionality... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-DataFrame-count-method-throwing-java-io-EOFException-tp22344.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems
For the Spark SQL parts, 1.3 breaks backwards compatibility, because before 1.3, Spark SQL was considered experimental where API changes were allowed. So, H2O and ADA compatible with 1.2.X might not work with 1.3. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 9:39 AM, roni roni.epi...@gmail.com wrote: Even if H2o and ADA are dependent on 1.2.1 , it should be backword compatible, right? So using 1.3 should not break them. And the code is not using the classes from those libs. I tried sbt clean compile .. same errror Thanks _R On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: What version of Spark do the other dependencies rely on (Adam and H2O?) - that could be it Or try sbt clean compile — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote: I have a EC2 cluster created using spark version 1.2.1. And I have a SBT project . Now I want to upgrade to spark 1.3 and use the new features. Below are issues . Sorry for the long post. Appreciate your help. Thanks -Roni Question - Do I have to create a new cluster using spark 1.3? Here is what I did - In my SBT file I changed to - libraryDependencies += org.apache.spark %% spark-core % 1.3.0 But then I started getting compilation error. along with Here are some of the libraries that were evicted: [warn] * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0 [warn] * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) - 2.6.0 [warn] Run 'evicted' to see detailed eviction warnings constructor cannot be instantiated to expected type; [error] found : (T1, T2) [error] required: org.apache.spark.sql.catalyst.expressions.Row [error] val ty = joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)} [error] ^ Here is my SBT and code -- SBT - version := 1.0 scalaVersion := 2.10.4 resolvers += Sonatype OSS Snapshots at https://oss.sonatype.org/content/repositories/snapshots;; resolvers += Maven Repo1 at https://repo1.maven.org/maven2;; resolvers += Maven Repo at https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/;; /* Dependencies - %% appends Scala version to artifactId */ libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 libraryDependencies += org.bdgenomics.adam % adam-core % 0.16.0 libraryDependencies += ai.h2o % sparkling-water-core_2.10 % 0.2.10 CODE -- import org.apache.spark.{SparkConf, SparkContext} case class KmerIntesect(kmer: String, kCount: Int, fileName: String) object preDefKmerIntersection { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(preDefKmer-intersect) val sc = new SparkContext(sparkConf) import sqlContext.createSchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc) val bedFile = sc.textFile(s3n://a/b/c,40) val hgfasta = sc.textFile(hdfs://a/b/c,40) val hgPair = hgfasta.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) val filtered = hgPair.filter(kv = kv._2 == 1) val bedPair = bedFile.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) val joinRDD = bedPair.join(filtered) val ty = joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)} ty.registerTempTable(KmerIntesect) ty.saveAsParquetFile(hdfs://x/y/z/kmerIntersect.parquet) } }
Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems
Weird. Are you running using SBT console? It should have the spark-core jar on the classpath. Similarly, spark-shell or spark-submit should work, but be sure you're using the same version of Spark when running as when compiling. Also, you might need to add spark-sql to your SBT dependencies, but that shouldn't be this issue. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 12:09 PM, roni roni.epi...@gmail.com wrote: Thanks Dean and Nick. So, I removed the ADAM and H2o from my SBT as I was not using them. I got the code to compile - only for fail while running with - SparkContext: Created broadcast 1 from textFile at kmerIntersetion.scala:21 Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/rdd/RDD$ at preDefKmerIntersection$.main(kmerIntersetion.scala:26) This line is where I do a JOIN operation. val hgPair = hgfasta.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) val filtered = hgPair.filter(kv = kv._2 == 1) val bedPair = bedFile.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) * val joinRDD = bedPair.join(filtered) * Any idea whats going on? I have data on the EC2 so I am avoiding creating the new cluster , but just upgrading and changing the code to use 1.3 and Spark SQL Thanks Roni On Wed, Mar 25, 2015 at 9:50 AM, Dean Wampler deanwamp...@gmail.com wrote: For the Spark SQL parts, 1.3 breaks backwards compatibility, because before 1.3, Spark SQL was considered experimental where API changes were allowed. So, H2O and ADA compatible with 1.2.X might not work with 1.3. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 9:39 AM, roni roni.epi...@gmail.com wrote: Even if H2o and ADA are dependent on 1.2.1 , it should be backword compatible, right? So using 1.3 should not break them. And the code is not using the classes from those libs. I tried sbt clean compile .. same errror Thanks _R On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: What version of Spark do the other dependencies rely on (Adam and H2O?) - that could be it Or try sbt clean compile — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote: I have a EC2 cluster created using spark version 1.2.1. And I have a SBT project . Now I want to upgrade to spark 1.3 and use the new features. Below are issues . Sorry for the long post. Appreciate your help. Thanks -Roni Question - Do I have to create a new cluster using spark 1.3? Here is what I did - In my SBT file I changed to - libraryDependencies += org.apache.spark %% spark-core % 1.3.0 But then I started getting compilation error. along with Here are some of the libraries that were evicted: [warn] * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0 [warn] * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) - 2.6.0 [warn] Run 'evicted' to see detailed eviction warnings constructor cannot be instantiated to expected type; [error] found : (T1, T2) [error] required: org.apache.spark.sql.catalyst.expressions.Row [error] val ty = joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)} [error] ^ Here is my SBT and code -- SBT - version := 1.0 scalaVersion := 2.10.4 resolvers += Sonatype OSS Snapshots at https://oss.sonatype.org/content/repositories/snapshots;; resolvers += Maven Repo1 at https://repo1.maven.org/maven2;; resolvers += Maven Repo at https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/;; /* Dependencies - %% appends Scala version to artifactId */ libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 libraryDependencies += org.bdgenomics.adam % adam-core % 0.16.0 libraryDependencies += ai.h2o % sparkling-water-core_2.10 % 0.2.10 CODE -- import org.apache.spark.{SparkConf, SparkContext} case class KmerIntesect(kmer: String, kCount: Int, fileName: String) object preDefKmerIntersection { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(preDefKmer-intersect) val sc = new SparkContext(sparkConf) import sqlContext.createSchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc) val bedFile = sc.textFile(s3n://a/b/c,40) val hgfasta = sc.textFile(hdfs://a/b/c,40) val hgPair
Re: newbie quesiton - spark with mesos
I think the problem is the use the loopback address: export SPARK_LOCAL_IP=127.0.0.1 In the stack trace from the slave, you see this: ... Reason: Connection refused: localhost/127.0.0.1:51849 akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@localhost:51849/), Path(/user/MapOutputTracker)] It's trying to connect to an Akka actor on itself, using the loopback address. Try changing SPARK_LOCAL_IP to the publicly routable IP address. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 7:37 PM, Anirudha Jadhav anirudh...@gmail.com wrote: My bad there, I was using the correct link for docs. The spark shell runs correctly, the framework is registered fine on mesos. is there some setting i am missing: this is my spark-env.sh export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI=http://100.125.5.93/sparkx.tgz export SPARK_LOCAL_IP=127.0.0.1 here is what i see on the slave node. less 20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56/stderr WARNING: Logging before InitGoogleLogging() is written to STDERR I0324 02:30:29.389225 27755 fetcher.cpp:76] Fetching URI ' http://100.125.5.93/sparkx.tgz' I0324 02:30:29.389361 27755 fetcher.cpp:126] Downloading ' http://100.125.5.93/sparkx.tgz' to '/tmp/mesos/slaves/20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56/sparkx.tgz' I0324 02:30:35.353446 27755 fetcher.cpp:64] Extracted resource '/tmp/mesos/slaves/20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56/sparkx.tgz' into '/tmp/mesos/slaves/20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56' Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/24 02:30:37 INFO MesosExecutorBackend: Registered signal handlers for [TERM, HUP, INT] I0324 02:30:37.071077 27863 exec.cpp:132] Version: 0.21.1 I0324 02:30:37.080971 27885 exec.cpp:206] Executor registered on slave 20150226-160708-78932-5050-8971-S0 15/03/24 02:30:37 INFO MesosExecutorBackend: Registered with Mesos as executor ID 20150226-160708-78932-5050-8971-S0 with 1 cpus 15/03/24 02:30:37 INFO SecurityManager: Changing view acls to: ubuntu 15/03/24 02:30:37 INFO SecurityManager: Changing modify acls to: ubuntu 15/03/24 02:30:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ubuntu); users with modify permissions: Set(ubuntu) 15/03/24 02:30:37 INFO Slf4jLogger: Slf4jLogger started 15/03/24 02:30:37 INFO Remoting: Starting remoting 15/03/24 02:30:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkexecu...@mesos-si2.dny1.bcpc.bloomberg.com:50542] 15/03/24 02:30:38 INFO Utils: Successfully started service 'sparkExecutor' on port 50542. 15/03/24 02:30:38 INFO AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@localhost:51849/user/MapOutputTracker 15/03/24 02:30:38 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkDriver@localhost:51849]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: localhost/127.0.0.1:51849 akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@localhost:51849/), Path(/user/MapOutputTracker)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58
Re: Date and decimal datatype not working
Recall that the input isn't actually read until to do something that forces evaluation, like call saveAsTextFile. You didn't show the whole stack trace here, but it probably occurred while parsing an input line where one of your long fields is actually an empty string. Because this is such a common problem, I usually define a parse method that converts input text to the desired schema. It catches parse exceptions like this and reports the bad line at least. If you can return a default long in this case, say 0, that makes it easier to return something. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 11:48 AM, BASAK, ANANDA ab9...@att.com wrote: Thanks. This library is only available with Spark 1.3. I am using version 1.2.1. Before I upgrade to 1.3, I want to try what can be done in 1.2.1. So I am using following: val MyDataset = sqlContext.sql(my select query”) MyDataset.map(t = t(0)+|+t(1)+|+t(2)+|+t(3)+|+t(4)+|+t(5)).saveAsTextFile(/my_destination_path) But it is giving following error: 15/03/24 17:05:51 ERROR Executor: Exception in task 1.0 in stage 13.0 (TID 106) java.lang.NumberFormatException: For input string: at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:453) at java.lang.Long.parseLong(Long.java:483) at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230) is there something wrong with the TSTAMP field which is Long datatype? Thanks Regards --- Ananda Basak Ph: 425-213-7092 *From:* Yin Huai [mailto:yh...@databricks.com] *Sent:* Monday, March 23, 2015 8:55 PM *To:* BASAK, ANANDA *Cc:* user@spark.apache.org *Subject:* Re: Date and decimal datatype not working To store to csv file, you can use Spark-CSV https://github.com/databricks/spark-csv library. On Mon, Mar 23, 2015 at 5:35 PM, BASAK, ANANDA ab9...@att.com wrote: Thanks. This worked well as per your suggestions. I had to run following: val TABLE_A = sc.textFile(/Myhome/SPARK/files/table_a_file.txt).map(_.split(|)).map(p = ROW_A(p(0).trim.toLong, p(1), p(2).trim.toInt, p(3), BigDecimal(p(4)), BigDecimal(p(5)), BigDecimal(p(6 Now I am stuck at another step. I have run a SQL query, where I am Selecting from all the fields with some where clause , TSTAMP filtered with date range and order by TSTAMP clause. That is running fine. Then I am trying to store the output in a CSV file. I am using saveAsTextFile(“filename”) function. But it is giving error. Can you please help me to write a proper syntax to store output in a CSV file? Thanks Regards --- Ananda Basak Ph: 425-213-7092 *From:* BASAK, ANANDA *Sent:* Tuesday, March 17, 2015 3:08 PM *To:* Yin Huai *Cc:* user@spark.apache.org *Subject:* RE: Date and decimal datatype not working Ok, thanks for the suggestions. Let me try and will confirm all. Regards Ananda *From:* Yin Huai [mailto:yh...@databricks.com] *Sent:* Tuesday, March 17, 2015 3:04 PM *To:* BASAK, ANANDA *Cc:* user@spark.apache.org *Subject:* Re: Date and decimal datatype not working p(0) is a String. So, you need to explicitly convert it to a Long. e.g. p(0).trim.toLong. You also need to do it for p(2). For those BigDecimals value, you need to create BigDecimal objects from your String values. On Tue, Mar 17, 2015 at 5:55 PM, BASAK, ANANDA ab9...@att.com wrote: Hi All, I am very new in Spark world. Just started some test coding from last week. I am using spark-1.2.1-bin-hadoop2.4 and scala coding. I am having issues while using Date and decimal data types. Following is my code that I am simply running on scala prompt. I am trying to define a table and point that to my flat file containing raw data (pipe delimited format). Once that is done, I will run some SQL queries and put the output data in to another flat file with pipe delimited format. *** val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD // Define row and table case class ROW_A( TSTAMP: Long, USIDAN: String, SECNT:Int, SECT: String, BLOCK_NUM:BigDecimal, BLOCK_DEN:BigDecimal, BLOCK_PCT:BigDecimal) val TABLE_A = sc.textFile(/Myhome/SPARK/files/table_a_file.txt).map(_.split(|)).map(p = ROW_A(p(0), p(1), p(2), p(3), p(4), p(5), p(6))) TABLE_A.registerTempTable(TABLE_A) *** The second last command is giving error, like following: console:17: error: type mismatch; found : String
Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems
Yes, that's the problem. The RDD class exists in both binary jar files, but the signatures probably don't match. The bottom line, as always for tools like this, is that you can't mix versions. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 3:13 PM, roni roni.epi...@gmail.com wrote: My cluster is still on spark 1.2 and in SBT I am using 1.3. So probably it is compiling with 1.3 but running with 1.2 ? On Wed, Mar 25, 2015 at 12:34 PM, Dean Wampler deanwamp...@gmail.com wrote: Weird. Are you running using SBT console? It should have the spark-core jar on the classpath. Similarly, spark-shell or spark-submit should work, but be sure you're using the same version of Spark when running as when compiling. Also, you might need to add spark-sql to your SBT dependencies, but that shouldn't be this issue. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 12:09 PM, roni roni.epi...@gmail.com wrote: Thanks Dean and Nick. So, I removed the ADAM and H2o from my SBT as I was not using them. I got the code to compile - only for fail while running with - SparkContext: Created broadcast 1 from textFile at kmerIntersetion.scala:21 Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/rdd/RDD$ at preDefKmerIntersection$.main(kmerIntersetion.scala:26) This line is where I do a JOIN operation. val hgPair = hgfasta.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) val filtered = hgPair.filter(kv = kv._2 == 1) val bedPair = bedFile.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) * val joinRDD = bedPair.join(filtered) * Any idea whats going on? I have data on the EC2 so I am avoiding creating the new cluster , but just upgrading and changing the code to use 1.3 and Spark SQL Thanks Roni On Wed, Mar 25, 2015 at 9:50 AM, Dean Wampler deanwamp...@gmail.com wrote: For the Spark SQL parts, 1.3 breaks backwards compatibility, because before 1.3, Spark SQL was considered experimental where API changes were allowed. So, H2O and ADA compatible with 1.2.X might not work with 1.3. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 9:39 AM, roni roni.epi...@gmail.com wrote: Even if H2o and ADA are dependent on 1.2.1 , it should be backword compatible, right? So using 1.3 should not break them. And the code is not using the classes from those libs. I tried sbt clean compile .. same errror Thanks _R On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: What version of Spark do the other dependencies rely on (Adam and H2O?) - that could be it Or try sbt clean compile — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote: I have a EC2 cluster created using spark version 1.2.1. And I have a SBT project . Now I want to upgrade to spark 1.3 and use the new features. Below are issues . Sorry for the long post. Appreciate your help. Thanks -Roni Question - Do I have to create a new cluster using spark 1.3? Here is what I did - In my SBT file I changed to - libraryDependencies += org.apache.spark %% spark-core % 1.3.0 But then I started getting compilation error. along with Here are some of the libraries that were evicted: [warn] * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0 [warn] * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) - 2.6.0 [warn] Run 'evicted' to see detailed eviction warnings constructor cannot be instantiated to expected type; [error] found : (T1, T2) [error] required: org.apache.spark.sql.catalyst.expressions.Row [error] val ty = joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)} [error] ^ Here is my SBT and code -- SBT - version := 1.0 scalaVersion := 2.10.4 resolvers += Sonatype OSS Snapshots at https://oss.sonatype.org/content/repositories/snapshots;; resolvers += Maven Repo1 at https://repo1.maven.org/maven2;; resolvers += Maven Repo at https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/ ; /* Dependencies - %% appends Scala version to artifactId */ libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0 libraryDependencies += org.apache.spark %% spark-core
Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems
You could stop the running the processes and run the same processes using the new version, starting with the master and then the slaves. You would have to snoop around a bit to get the command-line arguments right, but it's doable. Use `ps -efw` to find the command-lines used. Be sure to rerun them as the same user. Or look at what the EC2 scripts do. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 4:54 PM, roni roni.epi...@gmail.com wrote: Is there any way that I can install the new one and remove previous version. I installed spark 1.3 on my EC2 master and set teh spark home to the new one. But when I start teh spark-shell I get - java.lang.UnsatisfiedLinkError: org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative()V at org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative(Native Method) Is There no way to upgrade without creating new cluster? Thanks Roni On Wed, Mar 25, 2015 at 1:18 PM, Dean Wampler deanwamp...@gmail.com wrote: Yes, that's the problem. The RDD class exists in both binary jar files, but the signatures probably don't match. The bottom line, as always for tools like this, is that you can't mix versions. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 3:13 PM, roni roni.epi...@gmail.com wrote: My cluster is still on spark 1.2 and in SBT I am using 1.3. So probably it is compiling with 1.3 but running with 1.2 ? On Wed, Mar 25, 2015 at 12:34 PM, Dean Wampler deanwamp...@gmail.com wrote: Weird. Are you running using SBT console? It should have the spark-core jar on the classpath. Similarly, spark-shell or spark-submit should work, but be sure you're using the same version of Spark when running as when compiling. Also, you might need to add spark-sql to your SBT dependencies, but that shouldn't be this issue. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 12:09 PM, roni roni.epi...@gmail.com wrote: Thanks Dean and Nick. So, I removed the ADAM and H2o from my SBT as I was not using them. I got the code to compile - only for fail while running with - SparkContext: Created broadcast 1 from textFile at kmerIntersetion.scala:21 Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/rdd/RDD$ at preDefKmerIntersection$.main(kmerIntersetion.scala:26) This line is where I do a JOIN operation. val hgPair = hgfasta.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) val filtered = hgPair.filter(kv = kv._2 == 1) val bedPair = bedFile.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) * val joinRDD = bedPair.join(filtered) * Any idea whats going on? I have data on the EC2 so I am avoiding creating the new cluster , but just upgrading and changing the code to use 1.3 and Spark SQL Thanks Roni On Wed, Mar 25, 2015 at 9:50 AM, Dean Wampler deanwamp...@gmail.com wrote: For the Spark SQL parts, 1.3 breaks backwards compatibility, because before 1.3, Spark SQL was considered experimental where API changes were allowed. So, H2O and ADA compatible with 1.2.X might not work with 1.3. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Mar 25, 2015 at 9:39 AM, roni roni.epi...@gmail.com wrote: Even if H2o and ADA are dependent on 1.2.1 , it should be backword compatible, right? So using 1.3 should not break them. And the code is not using the classes from those libs. I tried sbt clean compile .. same errror Thanks _R On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: What version of Spark do the other dependencies rely on (Adam and H2O?) - that could be it Or try sbt clean compile — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote: I have a EC2 cluster created using spark version 1.2.1. And I have a SBT project . Now I want to upgrade to spark 1.3 and use the new features. Below are issues . Sorry for the long post. Appreciate your help. Thanks -Roni Question - Do I have to create a new cluster using spark 1.3? Here is what I did - In my SBT file I changed to - libraryDependencies += org.apache.spark
Re: How to deploy binary dependencies to workers?
Both spark-submit and spark-shell have a --jars option for passing additional jars to the cluster. They will be added to the appropriate classpaths. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Mar 24, 2015 at 4:13 AM, Xi Shen davidshe...@gmail.com wrote: Hi, I am doing ML using Spark mllib. However, I do not have full control to the cluster. I am using Microsoft Azure HDInsight I want to deploy the BLAS or whatever required dependencies to accelerate the computation. But I don't know how to deploy those DLLs when I submit my JAR to the cluster. I know how to pack those DLLs into a jar. The real challenge is how to let the system find them... Thanks, David
Re: Optimal solution for getting the header from CSV with Spark
Good point. There's no guarantee that you'll get the actual first partition. One reason why I wouldn't allow a CSV header line in a real data file, if I could avoid it. Back to Spark, a safer approach is RDD.foreachPartition, which takes a function expecting an iterator. You'll only need to grab the first element (being careful that the partition isn't empty!) and then determine which of those first lines has the header info. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Mar 24, 2015 at 11:12 AM, Sean Owen so...@cloudera.com wrote: I think this works in practice, but I don't know that the first block of the file is guaranteed to be in the first partition? certainly later down the pipeline that won't be true but presumably this is happening right after reading the file. I've always just written some filter that would only match the header, which assumes that this is possible to distinguish, but usually is. On Tue, Mar 24, 2015 at 2:41 PM, Dean Wampler deanwamp...@gmail.com wrote: Instead of data.zipWithIndex().filter(_._2==0), which will cause Spark to read the whole file, use data.take(1), which is simpler. From the Rdd.take documentation, it works by first scanning one partition, and using the results from that partition to estimate the number of additional partitions needed to satisfy the limit. In this case, it will trivially stop at the first. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition (O'Reilly) Typesafe @deanwampler http://polyglotprogramming.com On Tue, Mar 24, 2015 at 7:12 AM, Spico Florin spicoflo...@gmail.com wrote: Hello! I would like to know what is the optimal solution for getting the header with from a CSV file with Spark? My aproach was: def getHeader(data: RDD[String]): String = { data.zipWithIndex().filter(_._2==0).map(x=x._1).take(1).mkString() } Thanks.
Re: Optimal solution for getting the header from CSV with Spark
Instead of data.zipWithIndex().filter(_._2==0), which will cause Spark to read the whole file, use data.take(1), which is simpler. From the Rdd.take documentation, it works by first scanning one partition, and using the results from that partition to estimate the number of additional partitions needed to satisfy the limit. In this case, it will trivially stop at the first. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Mar 24, 2015 at 7:12 AM, Spico Florin spicoflo...@gmail.com wrote: Hello! I would like to know what is the optimal solution for getting the header with from a CSV file with Spark? My aproach was: def getHeader(data: RDD[String]): String = { data.zipWithIndex().filter(_._2==0).map(x=x._1).take(1).mkString() } Thanks.
Re: Saving Dstream into a single file
You can use the coalesce method to reduce the number of partitions. You can reduce to one if the data is not too big. Then write the output. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 16, 2015 at 2:42 PM, Zhan Zhang zzh...@hortonworks.com wrote: Each RDD has multiple partitions, each of them will produce one hdfs file when saving output. I don’t think you are allowed to have multiple file handler writing to the same hdfs file. You still can load multiple files into hive tables, right? Thanks.. Zhan Zhang On Mar 15, 2015, at 7:31 AM, tarek_abouzeid tarek.abouzei...@yahoo.com wrote: i am doing word count example on flume stream and trying to save output as text files in HDFS , but in the save directory i got multiple sub directories each having files with small size , i wonder if there is a way to append in a large file instead of saving in multiple files , as i intend to save the output in hive hdfs directory so i can query the result using hive hope anyone have a workaround for this issue , Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Dstream-into-a-single-file-tp22058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: registerTempTable is not a member of RDD on spark 1.2?
In 1.2 it's a member of SchemaRDD and it becomes available on RDD (through the type class mechanism) when you add a SQLContext, like so. val sqlContext = new SQLContext(sc)import sqlContext._ In 1.3, the method has moved to the new DataFrame type. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 5:25 AM, IT CTO goi@gmail.com wrote: Hi, I am running spark when I use sc.version I get 1.2 but when I call registerTempTable(MyTable) I get error saying registedTempTable is not a member of RDD Why? -- Eran | CTO
Re: newbie quesiton - spark with mesos
That's a very old page, try this instead: http://spark.apache.org/docs/latest/running-on-mesos.html When you run your Spark job on Mesos, tasks will be started on the slave nodes as needed, since fine-grained mode is the default. For a job like your example, very few tasks will be needed. Actually only one would be enough, but the default number of partitions will be used. I believe 8 is the default for Mesos. For local mode (local[*]), it's the number of cores. You can also set the propoerty spark.default.parallelism. HTH, Dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 11:46 AM, Anirudha Jadhav aniru...@nyu.edu wrote: i have a mesos cluster, which i deploy spark to by using instructions on http://spark.apache.org/docs/0.7.2/running-on-mesos.html after that the spark shell starts up fine. then i try the following on the shell: val data = 1 to 1 val distData = sc.parallelize(data) distData.filter(_ 10).collect() open spark web ui at host:4040 and see an active job. NOW, how do i start workers or spark workers on mesos ? who completes my job? thanks, -- Ani
Re: Getting around Serializability issues for types not in my control
Well, it's complaining about trait OptionInstances which is defined in Option.scala in the std package. Use scalap or javap on the scalaz library to find out which member of the trait is the problem, but since it says $$anon$1, I suspect it's the first value member, implicit val optionInstance, which has a long list of mixin traits, one of which is probably at fault. OptionInstances is huge, so there might be other offenders. Scalaz wasn't designed for distributed systems like this, so you'll probably find many examples of nonserializability. An alternative is to avoid using Scalaz in any closures passed to Spark methods, but that's probably not what you want. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote: Hey all, I'd like to use the Scalaz library in some of my Spark jobs, but am running into issues where some stuff I use from Scalaz is not serializable. For instance, in Scalaz there is a trait /** In Scalaz */ trait Applicative[F[_]] { def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C] def point[A](a: = A): F[A] } But when I try to use it in say, in an `RDD#aggregate` call I get: Caused by: java.io.NotSerializableException: scalaz.std.OptionInstances$$anon$1 Serialization stack: - object not serializable (class: scalaz.std.OptionInstances$$anon$1, value: scalaz.std.OptionInstances$$anon$1@4516ee8c) - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1, type: interface scalaz.Applicative) - object (class dielectric.syntax.RDDOps$$anonfun$1, function2) - field (class: dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, name: apConcat$1, type: interface scala.Function2) - object (class dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, function2) Outside of submitting a PR to Scalaz to make things Serializable, what can I do to make things Serializable? I considered something like implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]): SomeSerializableType[F] = new SomeSerializableType { ... } ?? Not sure how to go about doing it - I looked at java.io.Externalizable but given `scalaz.Applicative` has no value members I'm not sure how to implement the interface. Any guidance would be much appreciated - thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org