Re: Using functional programming rather than SQL

2016-02-22 Thread Dean Wampler
Kevin gave you the answer you need, but I'd like to comment on your subject
line. SQL is a limited form of FP. Sure, there are no anonymous functions
and other limitations, but it's declarative, like good FP programs should
be, and it offers an important subset of the operators ("combinators") you
want.

Also, on a practical note, use the DataFrame API whenever you can, rather
than dropping down to the RDD API, because the DataFrame API is far more
performant. It's a classic case where restricting your options enables more
aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark
Summit East nicely made this point.
http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott <kevin.r.mell...@gmail.com>
wrote:

> In your example, the *rs* instance should be a DataFrame object. In other
> words, the result of *HiveContext.sql* is a DataFrame that you can
> manipulate using *filter, map, *etc.
>
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>
>
> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>> Hi,
>>
>> I have data stored in Hive tables that I want to do simple manipulation.
>>
>> Currently in Spark I perform the following with getting the result set
>> using SQL from Hive tables, registering as a temporary table in Spark
>>
>> Now Ideally I can get the result set into a DF and work on DF to slice
>> and dice the data using functional programming with filter, map. split etc.
>>
>> I wanted to get some ideas on how to go about it.
>>
>> thanks
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> HiveContext.sql("use oraclehadoop")
>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
>> SUM(s.amount_sold) AS TotalSales
>> FROM smallsales s, times t, channels c
>> WHERE s.time_id = t.time_id
>> AND   s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> *rs.registerTempTable("tmp")*
>>
>>
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL
>> """).collect.foreach(println)
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC
>> """).collect.foreach(println)
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>


Re: [Spark Streaming] Spark Streaming dropping last lines

2016-02-10 Thread Dean Wampler
Here's a wild guess; it might be the fact that your first command uses tail
-f, so it doesn't close the input file handle when it hits the end of the
available bytes, while your second use of nc does this. If so, the last few
lines might be stuck in a buffer waiting to be forwarded. If so, Spark
wouldn't see these bytes.

You could test this by using nc or another program on the other end of the
socket and see if it receives all the bytes.

What happens if you add the -q 10 option to your nc command in the first
case? That is, force it to close when no more bytes are seen for 10 seconds?

HTH,
dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Wed, Feb 10, 2016 at 3:51 PM, Nipun Arora <nipunarora2...@gmail.com>
wrote:

> Hi All,
>
> I apologize for reposting, I wonder if anyone can explain this behavior?
> And what would be the best way to resolve this without introducing
> something like kafka in the midst.
> I basically have a logstash instance, and would like to stream output of
> logstash to spark_streaming without introducing a new message passing
> service like kafka/redis in the midst.
>
> We will eventually probably use kafka, but for now I need guaranteed
> delivery.
>
> For the tail -f  |nc -lk  command, I wait for a significant
> time after spark stops receiving any data in it's microbatches. I confirm
> that it's not getting any data, i.e. the file end has probably been reached
> by printing the first two lines of every micro-batch.
>
> Thanks
> Nipun
>
>
>
> On Mon, Feb 8, 2016 at 10:05 PM Nipun Arora <nipunarora2...@gmail.com>
> wrote:
>
>> I have a spark-streaming service, where I am processing and detecting
>> anomalies on the basis of some offline generated model. I feed data into
>> this service from a log file, which is streamed using the following command
>>
>> tail -f | nc -lk 
>>
>> Here the spark streaming service is taking data from port . Once
>> spark has finished processing, and is showing that it is processing empty
>> micro-batches, I kill both spark, and the netcat process above. However, I
>> observe that the last few lines are being dropped in some cases, i.e. spark
>> streaming does not receive those log lines or they are not processed.
>>
>> However, I also observed that if I simply take the logfile as standard
>> input instead of tailing it, the connection is closed at the end of the
>> file, and no lines are dropped:
>>
>> nc -q 10 -lk  < logfile
>>
>> Can anyone explain why this behavior is happening? And what could be a
>> better resolution to the problem of streaming log data to spark streaming
>> instance?
>>
>>
>> Thanks
>>
>> Nipun
>>
>


Re: Trying to understand dynamic resource allocation

2016-01-11 Thread Dean Wampler
It works on Mesos, too. I'm not sure about Standalone mode.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Mon, Jan 11, 2016 at 10:01 AM, Nick Peterson <nrpeter...@gmail.com>
wrote:

> My understanding is that dynamic allocation is only enabled for
> Spark-on-Yarn. Those settings likely have no impact in standalone mode.
>
> Nick
>
> On Mon, Jan 11, 2016, 5:10 AM Yiannis Gkoufas <johngou...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am exploring a bit the dynamic resource allocation provided by the
>> Standalone Cluster Mode and I was wondering whether this behavior I am
>> experiencing is expected.
>> In my configuration I have 3 slaves with 24 cores each.
>> I have in my spark-defaults.conf:
>>
>> spark.shuffle.service.enabled true
>> spark.dynamicAllocation.enabled true
>> spark.dynamicAllocation.minExecutors 1
>> spark.dynamicAllocation.maxExecutors 6
>> spark.executor.cores 4
>>
>> When I submit a first Job it takes up all of the 72 cores of the cluster.
>> When I submit the second Job while the first one is running I get the
>> error:
>>
>> Initial job has not accepted any resources; check your cluster UI to
>> ensure that workers are registered and have sufficient resources
>>
>> Is this the expected behavior?
>>
>> Thanks a lot
>>
>>
>>


Re: Spark Context not getting initialized in local mode

2016-01-08 Thread Dean Wampler
ClassNotFoundException usually means one of a few problems:

1. Your app assembly is missing the jar files with those classes.
2. You mixed jar files from imcompatible versions in your assembly.
3. You built with one version of Spark and deployed to another.


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Fri, Jan 8, 2016 at 1:24 AM, Rahul Kumar <rahul.kuma...@snapdeal.com>
wrote:

>
>
>
>
> Hi all,
> I am trying to start solr with a custom plugin which uses spark library. I
> am trying to initialize sparkcontext in local mode. I have made a fat jar
> for this plugin using maven shade and put it in the lib directory. *While
> starting solr it is not able to initialize sparkcontext.* It says class
> not found exception for AkkaRpcEnvFactory. Can anyone please help.
>
> *It gives the following error:*
>
> 3870 [coreLoadExecutor-4-thread-1] ERROR org.apache.spark.SparkContext  – 
> Error initializing SparkContext.
> java.lang.ClassNotFoundException:org.apache.spark.rpc.akka.AkkaRpcEnvFactory
>
> *Here is the detailed error*
>
> java -jar start.jar0[main] INFO  org.eclipse.jetty.server.Server  – 
> jetty-8.1.10.v2013031227   [main] INFO  
> org.eclipse.jetty.deploy.providers.ScanningAppProvider  – Deployment monitor 
> /home/rahul/solr-4.7.2/example/contexts at interval 040   [main] INFO  
> org.eclipse.jetty.deploy.DeploymentManager  – Deployable added: 
> /home/rahul/solr-4.7.2/example/contexts/solr-jetty-context.xml1095 [main] 
> INFO  org.eclipse.jetty.webapp.StandardDescriptorProcessor  – NO JSP Support 
> for /solr, did not find org.apache.jasper.servlet.JspServlet1155 [main] INFO  
> org.apache.solr.servlet.SolrDispatchFilter  – SolrDispatchFilter.init()1189 
> [main] INFO  org.apache.solr.core.SolrResourceLoader  – JNDI not configured 
> for solr (NoInitialContextEx)1190 [main] INFO  
> org.apache.solr.core.SolrResourceLoader  – solr home defaulted to 'solr/' 
> (could not find system property or JNDI)1190 [main] INFO  
> org.apache.solr.core.SolrResourceLoader  – new SolrResourceLoader for 
> directory: 'solr/'1280 [main] INFO  org.apache.solr.core.ConfigSolr  – 
> Loading container configuration from 
> /home/rahul/solr-4.7.2/example/solr/solr.xml1458 [main] INFO  
> org.apache.solr.core.CoresLocator  – Config-defined core root directory: 
> /home/rahul/solr-4.7.2/example/solr1465 [main] INFO  
> org.apache.solr.core.CoreContainer  – New CoreContainer 
> 602710225...
> 3870 [coreLoadExecutor-4-thread-1] ERROR org.apache.spark.SparkContext  – 
> Error initializing SparkContext.
> java.lang.ClassNotFoundException: org.apache.spark.rpc.akka.AkkaRpcEnvFactory
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at 
> org.eclipse.jetty.webapp.WebAppClassLoader.loadClass(WebAppClassLoader.java:430)
> at 
> org.eclipse.jetty.webapp.WebAppClassLoader.loadClass(WebAppClassLoader.java:383)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:274)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
> at org.apache.spark.rpc.RpcEnv$.getRpcEnvFactory(RpcEnv.scala:42)
> at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:53)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:252)
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
> at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
> at org.apache.spark.SparkContext.(SparkContext.scala:441)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at 
> com.snapdeal.search.spark.SparkLoadModel.loadModel(SparkLoadModel.java:11)
> at 
> com.snapdeal.search.valuesource.parser.RankingModelValueSourceParser.init(RankingModelValueSourceParser.java:29)
> at org.apache.solr.core.SolrCore.createInitInstance(SolrCore.java:591)
> at org.apache.solr.core.SolrCore.initPlugins(SolrCore.java:2191)
> at org.apache.solr.core.SolrCore.initPlugins(SolrCore.java:2185)
> at org.apache.solr.core.SolrCore.initPlugins(SolrCore.java:2218)
> at 
> org.apache.solr.core.SolrCore.initValueSourceParsers(SolrCore.java:2130)
> at org.apache.solr.core.SolrCore.(SolrCore.java:765)
> at org.apache.solr.core.SolrCore.(So

Re: Networking problems in Spark 1.6.0

2016-01-05 Thread Dean Wampler
​Still, it would be good to know what happened exactly. Why did the netty
dependency expect Java 8?  Did you build your app on a machine with Java 8
and deploy on a Java 7 machine?​

Anyway, I played with the 1.6.0 spark-shell using Java 7 and it worked
fine. I also looked at the distribution's class files using e.g.,

$ cd $HOME/spark/spark-1.6.0-bin-hadoop2.6
​$
 jar xf lib/spark-assembly-1.6.0-hadoop2.6.0.jar
org/apache/spark/rpc/netty/Dispatcher.class
$ javap -classpath . -verbose org.apache.spark.rpc.netty.Dispatcher | grep
version
  minor version: 0
  major version: 50

So, it was compiled with Java 6 (see
https://en.wikipedia.org/wiki/Java_class_file). So, it doesn't appear to be
a Spark build issue.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Tue, Jan 5, 2016 at 9:01 AM, Yiannis Gkoufas <johngou...@gmail.com>
wrote:

> Hi Dean,
>
> thanks so much for the response! It works without a problem now!
>
> On 5 January 2016 at 14:33, Dean Wampler <deanwamp...@gmail.com> wrote:
>
>> ConcurrentHashMap.keySet() returning a KeySetView is a Java 8 method. The
>> Java 7 method returns a Set. Are you running Java 7? What happens if you
>> run Java 8?
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Tue, Jan 5, 2016 at 8:29 AM, Yiannis Gkoufas <johngou...@gmail.com>
>> wrote:
>>
>>> Hi there,
>>>
>>> I have been using Spark 1.5.2 on my cluster without a problem and wanted
>>> to try Spark 1.6.0.
>>> I have the exact same configuration on both clusters.
>>> I am able to start the Standalone Cluster but I fail to submit a job
>>> getting errors like the following:
>>>
>>> 16/01/05 14:24:14 INFO AppClient$ClientEndpoint: Connecting to master
>>> spark://my-ip:7077...
>>> 16/01/05 14:24:34 INFO AppClient$ClientEndpoint: Connecting to master
>>> spark://my-ip:7077...
>>> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
>>> spark://my-ip:7077...
>>> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
>>> spark://my-ip:7077...
>>> 16/01/05 14:24:54 WARN TransportChannelHandler: Exception in connection
>>> from my-ip/X.XXX.XX.XX:7077
>>> java.lang.NoSuchMethodError:
>>> java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView;
>>> at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:106)
>>> at
>>> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:586)
>>> at
>>> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577)
>>> at
>>> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
>>> at
>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>>> at
>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>> at
>>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>> at
>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>> 

Re: Networking problems in Spark 1.6.0

2016-01-05 Thread Dean Wampler
ConcurrentHashMap.keySet() returning a KeySetView is a Java 8 method. The
Java 7 method returns a Set. Are you running Java 7? What happens if you
run Java 8?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Tue, Jan 5, 2016 at 8:29 AM, Yiannis Gkoufas <johngou...@gmail.com>
wrote:

> Hi there,
>
> I have been using Spark 1.5.2 on my cluster without a problem and wanted
> to try Spark 1.6.0.
> I have the exact same configuration on both clusters.
> I am able to start the Standalone Cluster but I fail to submit a job
> getting errors like the following:
>
> 16/01/05 14:24:14 INFO AppClient$ClientEndpoint: Connecting to master
> spark://my-ip:7077...
> 16/01/05 14:24:34 INFO AppClient$ClientEndpoint: Connecting to master
> spark://my-ip:7077...
> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
> spark://my-ip:7077...
> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
> spark://my-ip:7077...
> 16/01/05 14:24:54 WARN TransportChannelHandler: Exception in connection
> from my-ip/X.XXX.XX.XX:7077
> java.lang.NoSuchMethodError:
> java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView;
> at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:106)
> at
> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:586)
> at
> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577)
> at
> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> Has anyone else had similar problems?
>
> Thanks a lot
>
>


Re: Spark data frame

2015-12-22 Thread Dean Wampler
More specifically, you could have TBs of data across thousands of
partitions for a single RDD. If you call collect(), BOOM!

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Tue, Dec 22, 2015 at 4:20 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Michael,
>
> collect will bring down the results to the driver JVM, whereas the RDD or
> DataFrame would be cached on the executors (if it is cached). So, as Dean
> said, the driver JVM needs to have enough memory to store the results of
> collect.
>
> Thanks,
> Silvio
>
> From: Michael Segel <msegel_had...@hotmail.com>
> Date: Tuesday, December 22, 2015 at 4:26 PM
> To: Dean Wampler <deanwamp...@gmail.com>
> Cc: Gaurav Agarwal <gaurav130...@gmail.com>, "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: Spark data frame
>
> Dean,
>
> RDD in memory and then the collect() resulting in a collection, where both
> are alive at the same time.
> (Again not sure how Tungsten plays in to this… )
>
> So his collection can’t be larger than 1/2 of the memory allocated to the
> heap.
>
> (Unless you have allocated swap…, right?)
>
> On Dec 22, 2015, at 12:11 PM, Dean Wampler <deanwamp...@gmail.com> wrote:
>
> You can call the collect() method to return a collection, but be careful.
> If your data is too big to fit in the driver's memory, it will crash.
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com/>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Tue, Dec 22, 2015 at 1:09 PM, Gaurav Agarwal <gaurav130...@gmail.com>
> wrote:
>
>> We are able to retrieve data frame by filtering the rdd object . I need
>> to convert that data frame into java pojo. Any idea how to do that
>>
>
>
>


Re: Spark data frame

2015-12-22 Thread Dean Wampler
You can call the collect() method to return a collection, but be careful.
If your data is too big to fit in the driver's memory, it will crash.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Tue, Dec 22, 2015 at 1:09 PM, Gaurav Agarwal <gaurav130...@gmail.com>
wrote:

> We are able to retrieve data frame by filtering the rdd object . I need to
> convert that data frame into java pojo. Any idea how to do that
>


Re:

2015-11-19 Thread Dean Wampler
If you mean retaining data from past jobs, try running the history server,
documented here:

http://spark.apache.org/docs/latest/monitoring.html

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Thu, Nov 19, 2015 at 3:10 AM, 金国栋 <scu...@gmail.com> wrote:

> I don't really know what you mean by saying historical data, but if you
> have configured logs, so you can always get historical data of jobs,
> stages, tasks etc, unless you delete them.
>
> Best,
> Jelly
>
> 2015-11-19 16:53 GMT+08:00 aman solanki <youthindia.a...@gmail.com>:
>
>> Hi All,
>>
>> I want to know how one can get historical data of jobs,stages,tasks etc
>> of a running spark application.
>>
>> Please share the information regarding the same.
>>
>> Thanks,
>> Aman Solanki
>>
>
>


Re: getting different results from same line of code repeated

2015-11-18 Thread Dean Wampler
Methods like first() and take(n) can't guarantee to return the same result
in a distributed context, because Spark uses an algorithm to grab data from
one or more partitions that involves running a distributed job over the
cluster, with tasks on the nodes where the chosen partitions are located.
You can look at the logic in the Spark code base, RDD.scala (first method
calls the take method) and SparkContext.scala (runJob method, which take
calls).

However, the exceptions definitely look like bugs to me. There must be some
empty partitions.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Wed, Nov 18, 2015 at 4:52 PM, Walrus theCat <walrusthe...@gmail.com>
wrote:

> Hi,
>
> I'm launching a Spark cluster with the spark-ec2 script and playing around
> in spark-shell. I'm running the same line of code over and over again, and
> getting different results, and sometimes exceptions.  Towards the end,
> after I cache the first RDD, it gives me the correct result multiple times
> in a row before throwing an exception.  How can I get correct behavior out
> of these operations on these RDDs?
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[116]
> at sortBy at :36
>
> scala> targets.first
> res26: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets = data map {_.REGEX} groupBy{identity} map {
> Function.tupled(_->_.size)} sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[125]
> at sortBy at :36
>
> scala> targets.first
> res27: (String, Int) = (nika,7)
>
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[134]
> at sortBy at :36
>
> scala> targets.first
> res28: (String, Int) = (\bcalientes?\b,6)
>
> scala> targets.sortBy(_._2,false).first
> java.lang.UnsupportedOperationException: empty collection
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[283]
> at sortBy at :36
>
> scala> targets.first
> res46: (String, Int) = (\bhurting\ yous?\b,8)
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[292]
> at sortBy at :36
>
> scala> targets.first
> java.lang.UnsupportedOperationException: empty collection
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[301]
> at sortBy at :36
>
> scala> targets.first
> res48: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[310]
> at sortBy at :36
>
> scala> targets.first
> res49: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[319]
> at sortBy at :36
>
> scala> targets.first
> res50: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[328]
> at sortBy at :36
>
> scala> targets.first
> java.lang.UnsupportedOperationException: empty collection
>
>
>
>
>


Re: How to create nested structure from RDD

2015-11-17 Thread Dean Wampler
Crap. Hit send accidentally...

In pseudocode, assuming comma-separated input data:

scala> case class Address(street: String, city: String)
scala> case class User (name: String, address: Address)

scala> val df = sc.textFile("/path/to/stuff").
  map { line =>
val array = line.split(",")   // assume: "name,street,city"
User(array(0), Address(array(1), array(2)))
  }.toDF()

scala> df.printSchema
root
 |-- name: string (nullable = true)
 |-- address: struct (nullable = true)
 ||-- street: string (nullable = true)
 |    |-- city: string (nullable = true)


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Tue, Nov 17, 2015 at 11:16 AM, Dean Wampler <deanwamp...@gmail.com>
wrote:

> One way to do it, in the Scala API, you would use a tuple or case class
> with nested tuples or case classes and/or primitives. It works fine if you
> convert to a DataFrame, too; you can reference nested elements using dot
> notation. I think in Python it would similarly.
>
> In pseudocode, assuming comma-separated input data:
>
> case class Address(street: String, city: String)
> case class User (name: String, address: Address)
>
> sc.textFile("/path/to/stuff").
>   map { line =>
> line.split(0)
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Tue, Nov 17, 2015 at 11:06 AM, fhussonnois <fhussonn...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I need to convert an rdd of RDD[User] to a DataFrame containing a single
>> column named "user". The column "user" should be a nested struct with all
>> User properties.
>>
>> How can I implement this efficiently ?
>>
>> Thank you in advance
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-nested-structure-from-RDD-tp25401.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to create nested structure from RDD

2015-11-17 Thread Dean Wampler
One way to do it, in the Scala API, you would use a tuple or case class
with nested tuples or case classes and/or primitives. It works fine if you
convert to a DataFrame, too; you can reference nested elements using dot
notation. I think in Python it would similarly.

In pseudocode, assuming comma-separated input data:

case class Address(street: String, city: String)
case class User (name: String, address: Address)

sc.textFile("/path/to/stuff").
  map { line =>
line.split(0)
dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Tue, Nov 17, 2015 at 11:06 AM, fhussonnois <fhussonn...@gmail.com> wrote:

> Hi,
>
> I need to convert an rdd of RDD[User] to a DataFrame containing a single
> column named "user". The column "user" should be a nested struct with all
> User properties.
>
> How can I implement this efficiently ?
>
> Thank you in advance
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-nested-structure-from-RDD-tp25401.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: dynamic allocation w/ spark streaming on mesos?

2015-11-11 Thread Dean Wampler
Dynamic allocation doesn't work yet with Spark Streaming in any cluster
scenario. There was a previous thread on this topic which discusses the
issues that need to be resolved.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Wed, Nov 11, 2015 at 8:09 AM, PhuDuc Nguyen <duc.was.h...@gmail.com>
wrote:

> I'm trying to get Spark Streaming to scale up/down its number of executors
> within Mesos based on workload. It's not scaling down. I'm using Spark
> 1.5.1 reading from Kafka using the direct (receiver-less) approach.
>
> Based on this ticket https://issues.apache.org/jira/browse/SPARK-6287
> with the right configuration, I have a simple example working with the
> spark-shell connected to a Mesos cluster. By working I mean the number of
> executors scales up/down based on workload. However, the spark-shell is not
> a streaming example.
>
> What is that status of dynamic resource allocation with Spark Streaming on
> Mesos? Is it supported at all? Or supported but with some caveats to ensure
> no data loss?
>
> thanks,
> Duc
>


Re: Spark Streaming: how to use StreamingContext.queueStream with existing RDD

2015-10-26 Thread Dean Wampler
Check out StreamingContext.queueStream (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext
)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Mon, Oct 26, 2015 at 11:16 AM, Anfernee Xu <anfernee...@gmail.com> wrote:

> Hi,
>
> Here's my situation, I have some kind of offline dataset and got them
> loaded them into Spark as RDD, but I want to form a virtual data stream
> feeding to Spark Streaming, my code looks like this
>
>
>// sort offline data by time, the dataset spans 2 hours
>  1)  JavaRDD sortedByTime = offlineDataRDD.sortBy( );
>
>// compute a list of JavaRDD,  each element JavaRDD is hosting the data
> in the same time
>// bucket, for example 5 minutes
>   2) List virtualStreamRdd = ?
>
> Queue<JavaRDD> queue = Queues.newLinkedBlockingQueue();
> queue.addAll(virtualStreamRdd);
>
> /*
>  * Create DStream from the queue
>  */
>
> 3) final JavaDStream rowDStream =
> streamingContext.queueStream(queue);
>
>
> Currently I'm stucking in 2), any suggestion is appreciated.
>
> Thanks
>
> --
> --Anfernee
>


Re: Spark 1.5.1 ClassNotFoundException in cluster mode.

2015-10-14 Thread Dean Wampler
There is a Datastax Spark connector library jar file that you probably have
on your CLASSPATH locally, but not on the cluster. If you know where it is,
you could either install it on each node in some location on their
CLASSPATHs or when you submit the mob, pass the jar file using the "--jars"
option. Note that the latter may not be an ideal solution if it has other
dependencies that also need to be passed.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Wed, Oct 14, 2015 at 5:05 PM, Renato Perini <renato.per...@gmail.com>
wrote:

> Hello.
> I have developed a Spark job using a jersey client (1.9 included with
> Spark) to make some service calls during data computations.
> Data is read and written on an Apache Cassandra 2.2.1 database.
> When I run the job in local mode, everything works nicely. But when I
> execute my job in cluster mode (spark standalone) I receive the following
> exception:
> I have no clue on where this exception occurs. Any idea / advice on what
> can I check?
>
> [Stage 38:==> (8 + 2)
> / 200]15/10/14 15:54:07 WARN ThrowableSerializationWrapper: Task exception
> could
> java.lang.ClassNotFoundException:
> com.datastax.spark.connector.types.TypeConversionException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:278)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
> at
> java.util.concurrent.Th

Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-27 Thread Dean Wampler
While case classes no longer have the 22-element limitation as of Scala
2.11, tuples are still limited to 22 elements. For various technical
reasons, this limitation probably won't be removed any time soon.

However, you can nest tuples, like case classes, in most contexts. So, the
last bit of your example,

(r: ResultSet) => (r.getInt("col1"),r.getInt("col2")...r.getInt("col37")
)

could add nested () to group elements and keep the outer number of elements
<= 22.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Thu, Sep 24, 2015 at 6:01 AM, satish chandra j <jsatishchan...@gmail.com>
wrote:

> HI All,
>
> In addition to Case Class limitation in Scala, I finding Tuple limitation
> too please find the explanation below
>
> //Query to pull data from Source Table
>
> var SQL_RDD= new JdbcRDD( sc, ()=>
> DriverManager.getConnection(url,user,pass),"select col1, col2,
> col3..col 37 from schema.Table LIMIT ? OFFSET ?",100,0,*1*,(r:
> ResultSet) => (r.getInt("col1"),r.getInt("col2")...r.getInt("col37")))
>
>
> //Define Case Class
>
> case class sqlrow(col1:Int,col2:Int..col37)
>
>
> var SchSQL= SQL_RDD.map(p => new sqlrow(p._1,p._2.p._37))
>
>
> followed by apply CreateSchema to RDD and than apply registerTempTable for
> defining a table to make use in SQL Context in Spark
>
> As per the above SQL query I need to fetch 37 columns from the source
> table, but it seems Scala has tuple restriction which I am defining by r
> ResultSet variable in the above SQL, please let me know if any work around
> for the same
>
> Regards,
> Satish Chandra
>
> On Thu, Sep 24, 2015 at 3:18 PM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI All,
>> As it is for SQL purpose I understand, need to go ahead with Custom Case
>> Class approach
>> Could anybody have a sample code for creating Custom Case Class to refer
>> which would be really helpful
>>
>> Regards,
>> Satish Chandra
>>
>> On Thu, Sep 24, 2015 at 2:51 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>
>>> +1 on grouping the case classes and creating a hierarchy – as long as
>>> you use the data programatically. For DataFrames / SQL the other ideas
>>> probably scale better…
>>>
>>> From: Ted Yu
>>> Date: Wednesday, September 23, 2015 at 7:07 AM
>>> To: satish chandra j
>>> Cc: user
>>> Subject: Re: Scala Limitation - Case Class definition with more than 22
>>> arguments
>>>
>>> Can you switch to 2.11 ?
>>>
>>> The following has been fixed in 2.11:
>>> https://issues.scala-lang.org/browse/SI-7296
>>>
>>> Otherwise consider packaging related values into a case class of their
>>> own.
>>>
>>> On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
>>>> HI All,
>>>> Do we have any alternative solutions in Scala to avoid limitation in
>>>> defining a Case Class having more than 22 arguments
>>>>
>>>> We are using Scala version 2.10.2, currently I need to define a case
>>>> class with 37 arguments but getting an error as "*error:Implementation
>>>> restriction:caseclasses cannot have more than 22parameters.*"
>>>>
>>>> It would be a great help if any inputs on the same
>>>>
>>>> Regards,
>>>> Satish Chandra
>>>>
>>>>
>>>>
>>>
>>
>


Re: Parquet partitioning performance issue

2015-09-13 Thread Dean Wampler
One general technique is perform a second pass later over the files, for
example the next day or once a week, to concatenate smaller files into
larger ones. This can be done for all file types and allows you make recent
data available to analysis tools, while avoiding a large build up of small
files overall.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Sun, Sep 13, 2015 at 12:54 PM, sonal sharma <sonalsharma1...@gmail.com>
wrote:

> Hi Team,
>
> We have scheduled jobs that read new records from MySQL database every
> hour and write (append) them to parquet. For each append operation, spark
> creates 10 new partitions in parquet file.
>
> Some of these partitions are fairly small in size (20-40 KB) leading to
> high number of smaller partitions and affecting the overall read
> performance.
>
> Is there any way in which we can configure spark to merge smaller
> partitions into a bigger one to avoid too many partitions? Or can we define
> a configuration in Parquet to set a minimum partition size, say 64 MB?
>
> Coalesce/repartition will not work for us as we have highly variable
> activity on the database during peak and non-peak hours.
>
> Regards,
> Sonal
>


Re: Realtime Data Visualization Tool for Spark

2015-09-11 Thread Dean Wampler
Here's a demonstration video from @noootsab himself (creator of Spark
Notebook) showing live charting in Spark Notebook. It's one reason I prefer
it over the other options.

https://twitter.com/noootsab/status/638489244160401408

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Fri, Sep 11, 2015 at 12:55 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> So if you want to build your own from the ground up, then yes you could go
> the d3js route. Like Feynman also responded you could use something like
> Spark Notebook or Zeppelin to create some charts as well. It really depends
> on your intended audience and ultimate goal. If you just want some counters
> and graphs without any interactivity it shouldn't be too difficult.
>
> Another option, if you’re willing to use a hosted service, would be
> something like MS Power BI. I’ve used this to publish data and have
> realtime dashboards and reports fed by Spark.
>
> From: Shashi Vishwakarma
> Date: Friday, September 11, 2015 at 11:56 AM
> To: "user@spark.apache.org"
> Subject: Realtime Data Visualization Tool for Spark
>
> Hi
>
> I have got streaming data which needs to be processed and send for
> visualization.  I am planning to use spark streaming for this but little
> bit confused in choosing visualization tool. I read somewhere that D3.js
> can be used but i wanted know which is best tool for visualization while
> dealing with streaming application.(something that can be easily integrated)
>
> If someone has any link which can tell about D3.js(or any other
> visualization tool) and Spark streaming application integration  then
> please share . That would be great help.
>
>
> Thanks and Regards
> Shashi
>
>


Re: [Spark on Amazon EMR] : File does not exist: hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

2015-09-09 Thread Dean Wampler
If you log into the cluster, do you see the file if you type:

hdfs dfs
-ls 
hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

(with the correct server address for "ipx-x-x-x"). If not, is the server
address correct and routable inside the cluster. Recall that EC2 instances
have both public and private host names & IP addresses.

Also, is the port number correct for HDFS in the cluster?

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Wed, Sep 9, 2015 at 9:28 AM, shahab <shahab.mok...@gmail.com> wrote:

> Hi,
> I am using Spark on Amazon EMR. So far I have not succeeded to submit the
> application successfully, not sure what's problem. In the log file I see
> the followings.
> java.io.FileNotFoundException: File does not exist:
> hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
>
> However, even putting spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar in the
> fat jar file didn't solve the problem. I am out of clue now.
> I want to submit a spark application, using aws web console, as a step. I
> submit the application as : spark-submit --deploy-mode cluster --class
> mypack.MyMainClass --master yarn-cluster s3://mybucket/MySparkApp.jar Is
> there any one who has similar problem with EMR?
>
> best,
> /Shahab
>


Re: Java vs. Scala for Spark

2015-09-08 Thread Dean Wampler
It's true that Java 8 lambdas help. If you've read Learning Spark, where
they use Java 7, Python, and Scala for the examples, it really shows how
awful Java without lambdas is for Spark development.

Still, there are several "power tools" in Scala I would sorely miss using
Java 8:

1. The REPL (interpreter): I do most of my work in the REPL, then move the
code to compiled code when I'm ready to turn it into a batch job. Even
better, use Spark Notebook <http://spark-notebook.io/>! (and on GitHub
<https://github.com/andypetrella/spark-notebook>).
2. Tuples: It's just too convenient to use tuples for schemas, return
values from functions, etc., etc., etc.,
3. Pattern matching: This has no analog in Java, so it's hard to appreciate
it until you understand it, but see this example
<https://github.com/deanwampler/spark-workshop/blob/master/src/main/scala/sparkworkshop/InvertedIndex5b.scala>
for a taste of how concise it makes code!
4. Type inference: Spark really shows its utility. It means a lot less code
to write, but you get the hints of what you just wrote!

My $0.02.

dean


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Tue, Sep 8, 2015 at 10:28 AM, Igor Berman <igor.ber...@gmail.com> wrote:

> we are using java7..its much more verbose that java8 or scala examples
> in addition there sometimes libraries that has no java  api, so you need
> to write them by yourself(e.g. graphx)
> on the other hand, scala is not trivial language like java, so it depends
> on your team
>
> On 8 September 2015 at 17:44, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> Thank you for the quick responses.  It's useful to have some insight from
>> folks already extensively using Spark.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> Why would Scala vs Java performance be different Ted? Relatively
>>> speaking there is almost no runtime difference; it's the same APIs or
>>> calls via a thin wrapper. Scala/Java vs Python is a different story.
>>>
>>> Java libraries can be used in Scala. Vice-versa too, though calling
>>> Scala-generated classes can be clunky in Java. What's your concern
>>> about interoperability Jeffrey?
>>>
>>> I disagree that Java 7 vs Scala usability is sooo different, but it's
>>> certainly much more natural to use Spark in Scala. Java 8 closes a lot
>>> of the usability gap with Scala, but not all of it. Enough that it's
>>> not crazy for a Java shop to stick to Java 8 + Spark and not be at a
>>> big disadvantage.
>>>
>>> The downsides of Scala IMHO are that it provides too much: lots of
>>> nice features (closures! superb collections!), lots of rope to hang
>>> yourself too (implicits sometimes!) and some WTF features (XML
>>> literals!) Learning the good useful bits of Scala isn't hard. You can
>>> always write Scala code as much like Java as you like, I find.
>>>
>>> Scala tooling is different from Java tooling; that's an
>>> underappreciated barrier. For example I think SBT is good for
>>> development, bad for general project lifecycle management compared to
>>> Maven, but in any event still less developed. SBT/scalac are huge
>>> resource hogs, since so much of Scala is really implemented in the
>>> compiler; prepare to update your laptop to develop in Scala on your
>>> IDE of choice, and start to think about running long-running compile
>>> servers like we did in the year 2000.
>>>
>>> Still net-net I would choose Scala, FWIW.
>>>
>>> On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>> > Performance wise, Scala is by far the best choice when you use Spark.
>>> >
>>> > The cost of learning Scala is not negligible but not insurmountable
>>> either.
>>> >
>>> > My personal opinion.
>>> >
>>> > On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey <bryan.jeff...@gmail.com
>>> >
>>> > wrote:
>>> >>
>>> >> All,
>>> >>
>>> >> We're looking at language choice in developing a simple streaming
>>> >> processing application in spark.  We've got a small set of example
>>> code
>>> >> built in Scala.  Articles like the following:
>>> >>
>>> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
>>> >> would seem to indicate that Scala is great for use in distributed
>>> >> programming (including Spark).  However, there is a large group of
>>> folks
>>> >> that seem to feel that interoperability with other Java libraries is
>>> much to
>>> >> be desired, and that the cost of learning (yet another) language is
>>> quite
>>> >> high.
>>> >>
>>> >> Has anyone looked at Scala for Spark dev in an enterprise environment?
>>> >> What was the outcome?
>>> >>
>>> >> Regards,
>>> >>
>>> >> Bryan Jeffrey
>>> >
>>> >
>>>
>>
>>
>


Re: Run scala code with spark submit

2015-08-20 Thread Dean Wampler
I haven't tried it, but scala-shell should work if you give it a scala
script file, since it's basically a wrapper around the Scala REPL.

dean

On Thursday, August 20, 2015, MasterSergius master.serg...@gmail.com
wrote:

 Is there any possibility to run standalone scala program via spark submit?
 Or
 I have always put it in some packages, build it with maven (or sbt)?

 What if I have just simple program, like that example word counter?
 Could anyone please, show it on this simple test file Greeting.scala:



 It comiles with scalac, runs with scala. Now I want to run in with spark (I
 can get these files via wget, for example)





 So, how I can run via spark-submit one-filer scala program?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Run-scala-code-with-spark-submit-tp24367.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:;
 For additional commands, e-mail: user-h...@spark.apache.org javascript:;



-- 
Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com


Re: Parquet without hadoop: Possible?

2015-08-11 Thread Dean Wampler
It should work fine. I have an example script here:
https://github.com/deanwampler/spark-workshop/blob/master/src/main/scala/sparkworkshop/SparkSQLParquet10-script.scala
 (Spark 1.4.X)

What does I am failing to do so mean?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Aug 11, 2015 at 9:28 AM, saif.a.ell...@wellsfargo.com wrote:

 Hi all,

 I don’t have any hadoop fs installed on my environment, but I would like
 to store dataframes in parquet files. I am failing to do so, if possible,
 anyone have any pointers?

 Thank you,
 Saif




Re: EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Dean Wampler
Following Hadoop conventions, Spark won't overwrite an existing directory.
You need to provide a unique output path every time you run the program, or
delete or rename the target directory before you run the job.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit
 at local there is no problem , but i run at cluster, saveAsTextFile doesn't
 work.*It says me User class threw exception: Output directory
 hdfs://172.31.42.10:54310/./weblogReadResult
 http://172.31.42.10:54310/./weblogReadResult already exists*

 Is there anyone can help me about this issue ?

 Best,
 yasemin



 --
 hiç ender hiç



Re: question about spark streaming

2015-08-10 Thread Dean Wampler
Have a look at the various versions of
PairDStreamFunctions.updateStateByWindow (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
It supports updating running state in memory. (You can persist the state to
a database/files periodically if you want). Use an in-memory data structure
like a hash map with SKU-price key-values. Update the map as needed on each
iteration. One of the versions of this function lets you specify a
partitioner if you still need to shard keys.

Also, I would be flexible about the 1 second batch interval. Is that really
a mandatory requirement for this problem?

HTH,
dean


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 5:24 AM, sequoiadb mailing-list-r...@sequoiadb.com
wrote:

 hi guys,

 i have a question about spark streaming.
 There’s an application keep sending transaction records into spark stream
 with about 50k tps
 The record represents a sales information including customer id / product
 id / time / price columns

 The application is required to monitor the change of price for each
 product. For example, if the price of a product increases 10% within 3
 minutes, it will send an alert to end user.

 The interval is required to be set every 1 second, window is somewhere
 between 180 to 300 seconds.

 The issue is that I have to compare the price of each transaction (
 totally about 10k different products ) against the lowest/highest price for
 the same product in the all past 180 seconds.

 That means, in every single second, I have to loop through 50k
 transactions and compare the price of the same product in all 180 seconds.
 So it seems I have to separate the calculation based on product id, so that
 each worker only processes a certain list of products.

 For example, if I can make sure the same product id always go to the same
 worker agent, it doesn’t need to shuffle data between worker agent for each
 comparison. Otherwise if it required to compare each transaction with all
 other RDDs that cross multiple worker agent, I guess it may not be fast
 enough for the requirement.

 Is there anyone knows how to specify the worker node for each transaction
 record based on its product id, in order to avoid massive shuffle operation?

 If simply making the product id as the key and price as the value,
 reduceByKeyAndWindow may cause massive shuffle and slow down the whole
 throughput. Am I correct?

 Thanks

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Cassandra Connector issue

2015-08-10 Thread Dean Wampler
Add the other Cassandra dependencies (dse.jar,
spark-cassandra-connect-java_2.10) to your --jars argument on the command
line.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com
wrote:

 HI All,
 Please help me to fix Spark Cassandra Connector issue, find the details
 below

 *Command:*

 dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld
 --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar
 ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar


 *Error:*


 WARN  2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI'
 could not bind on port 4040. Attempting port 4041.

 Exception in thread main java.lang.NoSuchMethodError:
 com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions;

 at HelloWorld$.main(HelloWorld.scala:29)

 at HelloWorld.main(HelloWorld.scala)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)

 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


 *Code:*

 *import* *org.apache*.spark.SparkContext

 *import* *org.apache*.spark.SparkContext._

 *import* *org.apache*.spark.SparkConf

 *import* *org.apache*.spark.rdd.JdbcRDD

 *import* *com.datastax*.spark.connector._

 *import* com.datastax.spark.connector.cql.CassandraConnector

 *import* com.datastax.bdp.spark.DseSparkConfHelper._

 *import* java.sql.{Connection, DriverManager, ResultSet,
 PreparedStatement, SQLException, Statement}

 *object* HelloWorld {

 *def* main(args: Array[String]) {

   *def* createSparkContext() = {

*val** myJar = 
 *getClass.getProtectionDomain.getCodeSource.getLocation.getPath


*val* conf = *new* SparkConf().set(
 spark.cassandra.connection.host, 10.246.43.15)

.setAppName(First Spark App)

.setMaster(local)

 *   .s*etJars(Array(myJar))

.set(cassandra.username, username)

.set(cassandra.password, password)

.forDse

*new* SparkContext(conf)

 }



   *val* sc = createSparkContext()

   *val* user=hkonak0

   *val** pass=*Winter18

   Class.forName(org.postgresql.Driver).newInstance

   *val* url = jdbc:postgresql://gptester:5432/db_test

   *val* myRDD27 = *new* JdbcRDD( sc, ()=
 DriverManager.getConnection(url,user,pass),select * from
 wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?,5,0,1,(r: ResultSet) =
 {(r.getInt(alarm_type_code),r.getString(language_code),r.getString(
 alrm_type_cd_desc))})

   myRDD27.saveToCassandra(keyspace,arm_typ_txt,SomeColumns(
 alarm_type_code,language_code,alrm_type_cd_desc))

   println(myRDD27.count())

   println(myRDD27.first)

   sc.stop()

   sys.exit()



 }

   }



 *POM XML:*


 dependencies

   dependency

  groupIdorg.apache.spark/groupId

  artifactIdspark-core_2.10/artifactId

  version1.2.2/version

   /dependency

   dependency

  groupIdorg.apache.hadoop/groupId

  artifactId*hadoop*-client/artifactId

  version1.2.1/version

   /dependency

   dependency

  groupIdorg.scala-*lang*/groupId

  artifactId*scala*-library/artifactId

  version2.10.5/version

   /dependency

   dependency

  groupId*junit*/groupId

  artifactId*junit*/artifactId

  version3.8.1/version

  scopetest/scope

   /dependency

   dependency

  groupIdcom.datastax.dse/groupId

  artifactId*dse*/artifactId

  version4.7.2/version

   scopesystem/scope

  systemPathC:\workspace\*etl*\*lib*\dse.jar/
 systemPath

/dependency

   dependency

   groupIdcom.datastax.spark/groupId

   artifactIdspark-*cassandra*-connector-java_2.10/
 artifactId

   version1.1.1/version

/dependency

/dependencies


 Please let me know if any further details required to analyze the issue


 Regards,

 Satish Chandra



Re: Spark Streaming Restart at scheduled intervals

2015-08-10 Thread Dean Wampler
org.apache.spark.streaming.twitter.TwitterInputDStream is a small class.
You could write your own that lets you change the filters at run time. Then
provide a mechanism in your app, like periodic polling of a database table
or file for the list of filters.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 3:56 AM, Pankaj Narang pankajnaran...@gmail.com
wrote:

 Hi All,

 I am creating spark twitter streaming connection in my app over long period
 of time. When I have some new keywords I need to add them to the spark
 streaming connection. I need to stop and start the current twitter
 streaming
 connection in this case.

 I have tried akka actor scheduling but could not achieve the same.

 Have anybody have idea how to do that ?

 Regards
 Pankaj



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Restart-at-scheduled-intervals-tp24192.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Dean Wampler
So, just before running the job, if you run the HDFS command at a shell
prompt: hdfs dfs -ls hdfs://172.31.42.10:54310/./weblogReadResult.
Does it say the path doesn't exist?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 7:58 AM, Yasemin Kaya godo...@gmail.com wrote:

 Thanx Dean, i am giving unique output path and in every time i also delete
 the directory before i run the job.

 2015-08-10 15:30 GMT+03:00 Dean Wampler deanwamp...@gmail.com:

 Following Hadoop conventions, Spark won't overwrite an existing
 directory. You need to provide a unique output path every time you run the
 program, or delete or rename the target directory before you run the job.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i
 submit at local there is no problem , but i run at cluster, saveAsTextFile
 doesn't work.*It says me User class threw exception: Output directory
 hdfs://172.31.42.10:54310/./weblogReadResult
 http://172.31.42.10:54310/./weblogReadResult already exists*

 Is there anyone can help me about this issue ?

 Best,
 yasemin



 --
 hiç ender hiç





 --
 hiç ender hiç



Re: Spark Cassandra Connector issue

2015-08-10 Thread Dean Wampler
I don't know if DSE changed spark-submit, but you have to use a
comma-separated list of jars to --jars. It probably looked for HelloWorld
in the second one, the dse.jar file. Do this:

dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld
--jars /home/missingmerch/
postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark-
cassandra-connector-java_2.10-1.1.1.jar /home/missingmerch/etl-0.0.
1-SNAPSHOT.jar

I also removed the extra //. Or put file: in front of them so they are
proper URLs. Note the snapshot jar isn't in the --jars list. I assume
that's where HelloWorld is found. Confusing, yes it is...

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 8:23 AM, satish chandra j jsatishchan...@gmail.com
wrote:

 Hi,
 Thanks for quick input, now I am getting class not found error

 *Command:*

 dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld
 --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar
 ///home/missingmerch/dse.jar
 ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar
 ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar


 *Error:*

 java.lang.ClassNotFoundException: HelloWorld

 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

 at java.security.AccessController.doPrivileged(Native Method)

 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

 at java.lang.Class.forName0(Native Method)

 at java.lang.Class.forName(Class.java:270)

 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342)

 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


 Previously I could fix the issue by changing the order of arguments
 passing in DSE command line interface but now I am not sure why the issue
 again

 Please let me know if still I am missing anything in my Command as
 mentioned above(as insisted I have added dse.jar and
 spark-cassandra-connector-java_2.10.1.1.1.jar)


 Thanks for support


 Satish Chandra

 On Mon, Aug 10, 2015 at 6:19 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Add the other Cassandra dependencies (dse.jar,
 spark-cassandra-connect-java_2.10) to your --jars argument on the command
 line.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j 
 jsatishchan...@gmail.com wrote:

 HI All,
 Please help me to fix Spark Cassandra Connector issue, find the details
 below

 *Command:*

 dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld
 --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar
 ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar


 *Error:*


 WARN  2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI'
 could not bind on port 4040. Attempting port 4041.

 Exception in thread main java.lang.NoSuchMethodError:
 com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions;

 at HelloWorld$.main(HelloWorld.scala:29)

 at HelloWorld.main(HelloWorld.scala)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)

 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


 *Code:*

 *import* *org.apache*.spark.SparkContext

 *import* *org.apache*.spark.SparkContext._

 *import* *org.apache*.spark.SparkConf

 *import* *org.apache*.spark.rdd.JdbcRDD

 *import* *com.datastax*.spark.connector._

 *import* com.datastax.spark.connector.cql.CassandraConnector

 *import* com.datastax.bdp.spark.DseSparkConfHelper._

 *import* java.sql.{Connection, DriverManager, ResultSet,
 PreparedStatement, SQLException, Statement}

 *object* HelloWorld {

 *def* main(args: Array[String]) {

   *def* createSparkContext() = {

*val** myJar = 
 *getClass.getProtectionDomain.getCodeSource.getLocation.getPath


*val* conf = *new* SparkConf().set

Re: Spark-submit fails when jar is in HDFS

2015-08-09 Thread Dean Wampler
Also, Spark on Mesos supports cluster mode:
http://spark.apache.org/docs/latest/running-on-mesos.html#cluster-mode

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, Aug 9, 2015 at 4:30 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try this way?

 /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050
 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class
 org.apache.spark.examples.SparkPi *--jars hdfs://hdfs1/tmp/spark-*
 *examples-1.4.1-hadoop2.6.0-**cdh5.4.4.jar* 100

 Thanks
 Best Regards

 On Fri, Aug 7, 2015 at 5:51 AM, Alan Braithwaite a...@cloudflare.com
 wrote:

 Hi All,

 We're trying to run spark with mesos and docker in client mode (since
 mesos doesn't support cluster mode) and load the application Jar from
 HDFS.  The following is the command we're running:

 /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050
 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class
 org.apache.spark.examples.SparkPi
 hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar 100

 We're getting the following warning before an exception from that command:

 Warning: Skip remote jar
 hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar.
 java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi

 Before I debug further, is this even supported?  I started reading the
 code and it wasn't clear that it's possible to load a remote jar in client
 mode at all.  I did see a related issue in [2] but it didn't quite clarify
 everything I was looking for.

 Thanks,
 - Alan

 [1] https://spark.apache.org/docs/latest/submitting-applications.html

 [2]
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html





Re: spark config

2015-08-07 Thread Dean Wampler
That's the correct URL. Recent change? The last time I looked, earlier this
week, it still had the obsolete artifactory URL for URL1 ;)

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Fri, Aug 7, 2015 at 5:19 PM, Ted Yu yuzhih...@gmail.com wrote:

 In master branch, build/sbt-launch-lib.bash has the following:

   URL1=
 https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar

 I verified that the following exists:


 https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/#sbt-launch.jar

 FYI

 On Fri, Aug 7, 2015 at 2:08 PM, Bryce Lobdell lobde...@gmail.com wrote:


 I Recently downloaded spark package 1.4.0:

 A build of Spark with sbt/sbt clean assembly failed with message
 Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar

 Upon investigation I figured out that sbt-launch-0.13.7.jar is
 downloaded at build time and that it contained the the following:

 html
 headtitle404 Not Found/title/head
 body bgcolor=white
 centerh1404 Not Found/h1/center
 hrcenternginx/center
 /body
 /html

 which is an HTML error message to the effect that the file is missing
 (from the web server).


 The script sbt-launch-lib.bash contains the following lines which
 determine where the file sbt-launch.jar is downloaded from:

 acquire_sbt_jar () {
   SBT_VERSION=`awk -F = '/sbt\.version/ {print $2}'
 ./project/build.properties`
   URL1=
 http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
   URL2=
 http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
   JAR=build/sbt-launch-${SBT_VERSION}.jar


 The script sbt-launch.bash downloads $URL1 first, and incorrectly
 concludes that it succeeded on the basis that the file sbt-launch-0.13.7.jar
 exists (though it contains HTML).

 I succeeded in building Spark by:

 (1)  Downloading the file sbt-launch-0.13.7.jar from $URL2 and placing
 it in the build directory.
 (2)  Modifying sbt-launch-lib.bash to prevent the download of that file.
 (3)  Restarting the download as I usually would, with SPARK_HIVE=true
 SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly


 I think a lot of people will be confused by this.  Probably someone
 should do some of the following:

 (1)  Delete $URL1 and all references, or replace it with the
 correct/current URL which points to the sbt-launch.jar(s).
 (2)  Modify sbt-launch-lib.bash, so that it will not conclude that the
 download of sbt-launch.jar succeeded, when the data returned is an HTML
 error message.


 Let me know if this is not clear, I will gladly explain in more detail or
 with more clarity, if needed.

 -Bryce Lobdell


 A transcript of my console is below:




 @ip-xx-xxx-xx-xxx:~/spark/spark-1.4.0$ SPARK_HIVE=true
 SPARK_HADOOP_VERSION=2.5.1 sbt/sbt clean assembly
 NOTE: The sbt/sbt script has been relocated to build/sbt.
   Please update references to point to the new location.

   Invoking 'build/sbt clean assembly' now ...

 Using /usr/lib/jvm/java-7-openjdk-amd64/ as default JAVA_HOME.
 Note, this will be overridden by -java-home if it is set.
 Attempting to fetch sbt
 Launching sbt from build/sbt-launch-0.13.7.jar
 *Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar*
 inquidia@ip-10-102-69-107:~/spark/spark-1.4.0$ cd build/
 inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls
 mvn  sbt  sbt-launch-0.13.7.jar  sbt-launch-lib.bash
 *inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ unzip -l
 sbt-launch-0.13.7.jar*
 *Archive:  sbt-launch-0.13.7.jar*
 *  End-of-central-directory signature not found.  Either this file is not*
 *  a zipfile, or it constitutes one disk of a multi-part archive.  In the*
 *  latter case the central directory and zipfile comment will be found on*
 *  the last disk(s) of this archive.*
 unzip:  cannot find zipfile directory in one of sbt-launch-0.13.7.jar or
 sbt-launch-0.13.7.jar.zip, and cannot find
 sbt-launch-0.13.7.jar.ZIP, period.
 inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls
 mvn  sbt  sbt-launch-0.13.7.jar  sbt-launch-lib.bash
 inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l
 total 28
 -rwxr-xr-x 1 inquidia inquidia 5384 Jun  3 01:07 mvn
 -rwxr-xr-x 1 inquidia inquidia 5395 Jun  3 01:07 sbt
 -rw-rw-r-- 1 inquidia inquidia  162 Aug  7 20:24 sbt-launch-0.13.7.jar
 -rwxr-xr-x 1 inquidia inquidia 5285 Jun  3 01:07 sbt-launch-lib.bash
 inquidia@ip-10-102-69-107:~/spark/spark-1.4.0/build$ ls -l
 total 28
 -rwxr-xr-x 1 inquidia inquidia 5384 Jun  3 01:07 mvn
 -rwxr-xr-x 1 inquidia inquidia 5395 Jun  3 01:07 sbt
 -rw-rw-r-- 1 inquidia inquidia  *162 *Aug  7 20:24 sbt-launch-0.13.7.jar
 -rwxr-xr-x 1 inquidia inquidia 5285 Jun  3 01:07 sbt-launch-lib.bash
 inquidia@ip-10-102-69-107

[POWERED BY] Please add Typesafe to the list of organizations

2015-07-31 Thread Dean Wampler
Typesafe (http://typesafe.com). We provide commercial support for Spark on
Mesos and Mesosphere DCOS. We contribute to Spark's Mesos integration and
Spark Streaming enhancements.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com


Re: How to set log level in spark-submit ?

2015-07-30 Thread Dean Wampler
Did you use an absolute path in $path_to_file? I just tried this with
spark-shell v1.4.1 and it worked for me. If the URL is wrong, you should
see an error message from log4j that it can't find the file. For windows it
would be something like file:/c:/path/to/file, I believe.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Jul 30, 2015 at 4:41 AM, Alexander Krasheninnikov 
a.krasheninni...@corp.badoo.com wrote:

  I saw such example in docs:
 --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=
 file://$path_to_file
 but, unfortunately, it does not work for me.


 On 30.07.2015 05:12, canan chen wrote:

 Yes, that should work. What I mean is is there any option in spark-submit
 command that I can specify for the log level


 On Thu, Jul 30, 2015 at 10:05 AM, Jonathan Coveney jcove...@gmail.com
 wrote:

 Put a log4j.properties file in conf/. You can copy
 log4j.properties.template as a good base


 El miércoles, 29 de julio de 2015, canan chen ccn...@gmail.com
 escribió:

 Anyone know how to set log level in spark-submit ? Thanks






Re: Spark - Eclipse IDE - Maven

2015-07-29 Thread Dean Wampler
If you don't mind using SBT with your Scala instead of Maven, you can see
the example I created here: https://github.com/deanwampler/spark-workshop

It can be loaded into Eclipse or IntelliJ

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Jul 29, 2015 at 10:52 AM, Venkat Reddy ksiv...@gmail.com wrote:

 Thanks Petar,

 I will purchase. Thanks for the input.


 Thanks
 Siva

 On Tue, Jul 28, 2015 at 4:39 PM, Carol McDonald cmcdon...@maprtech.com
 wrote:

 I agree,  I found this book very useful for getting started with spark
 and eclipse

 On Tue, Jul 28, 2015 at 11:10 AM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Sorry about self-promotion, but there's a really nice tutorial for
 setting up Eclipse for Spark in Spark in Action book:
 http://www.manning.com/bonaci/


 On 24.7.2015. 7:26, Siva Reddy wrote:

 Hi All,

  I am trying to setup the Eclipse (LUNA)  with Maven so that I
 create a
 maven projects for developing spark programs.  I am having some issues
 and I
 am not sure what is the issue.


Can Anyone share a nice step-step document to configure eclipse with
 maven
 for spark development.


 Thanks
 Siva



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Eclipse-IDE-Maven-tp23977.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Multiple operations on same DStream in Spark Streaming

2015-07-28 Thread Dean Wampler
Is this average supposed to be across all partitions? If so, it will
require some one of the reduce operations in every batch interval. If
that's too slow for the data rate, I would investigate using
PairDStreamFunctions.updateStateByKey to compute the sum + count of the 2nd
integers, per 1st integer, then do the filtering and final averaging
downstream if you can, i.e., where you actually need the final value. If
you need it on every batch iteration, then you'll have to do a reduce per
iteration.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Jul 28, 2015 at 3:14 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 One approach would be to store the batch data in an intermediate storage
 (like HBase/MySQL or even in zookeeper), and inside your filter function
 you just go and read the previous value from this storage and do whatever
 operation that you are supposed to do.

 Thanks
 Best Regards

 On Sun, Jul 26, 2015 at 3:37 AM, foobar heath...@fb.com wrote:

 Hi I'm working with Spark Streaming using scala, and trying to figure out
 the
 following problem. In my DStream[(int, int)], each record is an int pair
 tuple. For each batch, I would like to filter out all records with first
 integer below average of first integer in this batch, and for all records
 with first integer above average of first integer in the batch, compute
 the
 average of second integers in such records. What's the best practice to
 implement this? I tried this but kept getting the object not serializable
 exception because it's hard to share variables (such as average of first
 int
 in the batch) between workers and driver. Any suggestions? Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-operations-on-same-DStream-in-Spark-Streaming-tp23995.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Clustetr setup for SPARK standalone application:

2015-07-28 Thread Dean Wampler
When you say you installed Spark, did you install the master and slave
services for standalone mode as described here
http://spark.apache.org/docs/latest/spark-standalone.html? If you
intended to run Spark on Hadoop, see here
http://spark.apache.org/docs/latest/running-on-yarn.html.

It looks like either the master service isn't running or isn't reachable
over your network. Is hadoopm0 publicly routable? Is port 7077 blocked? As
a test, can you telnet to it?
telnet hadoopm0 7077



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Jul 28, 2015 at 7:01 AM, Sagar sagarjad...@yonsei.ac.kr wrote:

 Hello Sir,



 I am MS student working on SPARK.

 I am totally new in this field.

 I have install the spark.



 The local  spark-shell is working fine.



 But whenever I tried the Master configuration I got some errors.



 When I run this command ;

 MASTER=spark://hadoopm0:7077 spark-shell



 I gets the errors likes;



 15/07/27 21:17:26 INFO AppClient$ClientActor: Connecting to master
 spark://hadoopm0:7077...

 15/07/27 21:17:46 ERROR SparkDeploySchedulerBackend: Application has been
 killed. Reason: All masters are unresponsive! Giving up.

 15/07/27 21:17:46 WARN SparkDeploySchedulerBackend: Application ID is not
 initialized yet.

 15/07/27 21:17:46 ERROR TaskSchedulerImpl: Exiting due to error from
 cluster scheduler: All masters are unresponsive! Giving up.



 Also I have attached the my screenshot of Master UI.



 Can you please give me some references, documentations or  how to solve
 this issue.

 Thanks in advance.

 Thanking You,




 --
   [image: Avast logo] http://www.avast.com/

 This email has been checked for viruses by Avast antivirus software.
 www.avast.com



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Comparison between Standalone mode and YARN mode

2015-07-27 Thread Dean Wampler
YARN and Mesos are better for production clusters of non-trivial size
that have mixed job kinds and multiple users, as they manage resources more
intelligently and dynamically. They also support other services you
probably need, like HDFS, databases, workflow tools, etc.

Standalone is fine, though, if you have a limited number of jobs competing
for resources, for example a small cluster dedicated to ingesting or
processing a specific kind of data, or for a dev/QA cluster. Standalone
mode has much lower overhead, but you have to manage the daemon services
yourself, including configuration of Zookeeper if you need master failover.
Hence, you don't see it often in production scenarios.

The Spark page on cluster deployments has more details:
http://spark.apache.org/docs/latest/cluster-overview.html

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 6:56 PM, Dogtail Ray spark.ru...@gmail.com wrote:

 Hi,

 I am very curious about the differences between Standalone mode and YARN
 mode. According to
 http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/,
 it seems that YARN mode is always better than Standalone mode. Is that the
 case? Or I should choose different modes according to my specific
 requirements? Thanks!



Re: Programmatically launch several hundred Spark Streams in parallel

2015-07-24 Thread Dean Wampler
You don't need the par (parallel) versions of the Scala collections,
actually, Recall that you are building a pipeline in the driver, but it
doesn't start running cluster tasks until ssc.start() is called, at which
point Spark will figure out the task parallelism. In fact, you might as
well do the foreachRDD call within the initial map. No need for the streams
collection, unless you need it for something else. Test it out to make sure
I'm not wrong ;)

However, I'm a little confused by the per-stream logic. It looks like
you're using foreachRDD to dump each input stream into the same output
location stream._1. True? If it's a directory, you'll get an error that
it already exists for the *second* stream in streams. If you're just
funneling all 500 inputs into the same output location, how about using
DStream.union to combine all the input streams into one, then have one
foreachRDD to write output?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Fri, Jul 24, 2015 at 11:23 AM, Brandon White bwwintheho...@gmail.com
wrote:

 Hello,

 So I have about 500 Spark Streams and I want to know the fastest and most
 reliable way to process each of them. Right now, I am creating and process
 them in a list:

 val ssc = new StreamingContext(sc, Minutes(10))


 val streams = paths.par.map { nameAndPath =
   (path._1, ssc.textFileStream(path._1))
 }

 streams.par.foreach { nameAndStream =
   streamTuple.foreachRDD { rdd =
 df = sqlContext.jsonRDD(rdd)

 df.insertInto(stream._1)
   }
 }

 ssc.start()



 Is this the best way to do this? Are there any better faster methods?




Re: Mesos + Spark

2015-07-24 Thread Dean Wampler
When running Spark in Mesos cluster mode, the driver program runs in one of
the cluster nodes, like the other Spark processes that are spawned. You
won't need a special node for this purpose. I'm not very familiar with
Chronos, but its UI or the regular Mesos UI should show you where the
driver is running, then you can use the Spark web UI on that machine to see
what the Spark job is doing.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Fri, Jul 24, 2015 at 4:47 PM, boci boci.b...@gmail.com wrote:

 Thanks, but something is not clear...
 I have the mesos cluster.
 - I want to submit my application and scheduled with chronos.
 - For cluster mode I need a dispatcher, this is another container (machine
 in the real world)? What will this do? It's needed when I using chronos?
 - How can I access to my spark job from chronos?

 I think submit in client mode is not fit to my condition, that's right?

 Thanks
 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com

 On Wed, Jul 22, 2015 at 4:51 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 This page, http://spark.apache.org/docs/latest/running-on-mesos.html,
 covers many of these questions. If you submit a job with the option
 --supervise, it will be restarted if it fails.

 You can use Chronos for scheduling. You can create a single streaming job
 with a 10 minute batch interval, if that works for your every 10-min. need.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Wed, Jul 22, 2015 at 3:53 AM, boci boci.b...@gmail.com wrote:

 Hi guys!

 I'm a new in mesos. I have two spark application (one streaming and one
 batch). I want to run both app in mesos cluster. Now for testing I want to
 run in docker container so I started a simple redjack/mesos-master, but I
 think a lot of think unclear for me (both mesos and spark-mesos).

 If I have a mesos cluster (for testing it will be some docker container)
 i need a separate machine (container) to run my spark job? Or can I submit
 the cluster and schedule (chronos or I dunno)?
 How can I run the streaming job? What happened if the controller died?
 Or if I call spark-submit with master=mesos my application started and I
 can forget? How can I run in every 10 min without submit in every 10 min?
 How can I run my streaming app in HA mode?

 Thanks

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com






Re: Mesos + Spark

2015-07-24 Thread Dean Wampler
You can certainly start jobs without Chronos, but to automatically restart
finished jobs or to run jobs at specific times or periods, you'll want
something like Chronos.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Fri, Jul 24, 2015 at 5:08 PM, boci boci.b...@gmail.com wrote:

 Thanks,
 Mesos will show spark is driver is running, but what happened if my batch
 job finished? How can I reschedule without chronos ? Can I submit a job
 without start it?

 Thanks

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com

 On Fri, Jul 24, 2015 at 11:52 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 When running Spark in Mesos cluster mode, the driver program runs in one
 of the cluster nodes, like the other Spark processes that are spawned. You
 won't need a special node for this purpose. I'm not very familiar with
 Chronos, but its UI or the regular Mesos UI should show you where the
 driver is running, then you can use the Spark web UI on that machine to see
 what the Spark job is doing.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Fri, Jul 24, 2015 at 4:47 PM, boci boci.b...@gmail.com wrote:

 Thanks, but something is not clear...
 I have the mesos cluster.
 - I want to submit my application and scheduled with chronos.
 - For cluster mode I need a dispatcher, this is another container
 (machine in the real world)? What will this do? It's needed when I using
 chronos?
 - How can I access to my spark job from chronos?

 I think submit in client mode is not fit to my condition, that's right?

 Thanks
 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com

 On Wed, Jul 22, 2015 at 4:51 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 This page, http://spark.apache.org/docs/latest/running-on-mesos.html,
 covers many of these questions. If you submit a job with the option
 --supervise, it will be restarted if it fails.

 You can use Chronos for scheduling. You can create a single streaming
 job with a 10 minute batch interval, if that works for your every 10-min.
 need.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Wed, Jul 22, 2015 at 3:53 AM, boci boci.b...@gmail.com wrote:

 Hi guys!

 I'm a new in mesos. I have two spark application (one streaming and
 one batch). I want to run both app in mesos cluster. Now for testing I 
 want
 to run in docker container so I started a simple redjack/mesos-master, but
 I think a lot of think unclear for me (both mesos and spark-mesos).

 If I have a mesos cluster (for testing it will be some docker
 container) i need a separate machine (container) to run my spark job? Or
 can I submit the cluster and schedule (chronos or I dunno)?
 How can I run the streaming job? What happened if the controller
 died? Or if I call spark-submit with master=mesos my application started
 and I can forget? How can I run in every 10 min without submit in every 10
 min? How can I run my streaming app in HA mode?

 Thanks

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com








Re: Mesos + Spark

2015-07-22 Thread Dean Wampler
This page, http://spark.apache.org/docs/latest/running-on-mesos.html,
covers many of these questions. If you submit a job with the option
--supervise, it will be restarted if it fails.

You can use Chronos for scheduling. You can create a single streaming job
with a 10 minute batch interval, if that works for your every 10-min. need.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 3:53 AM, boci boci.b...@gmail.com wrote:

 Hi guys!

 I'm a new in mesos. I have two spark application (one streaming and one
 batch). I want to run both app in mesos cluster. Now for testing I want to
 run in docker container so I started a simple redjack/mesos-master, but I
 think a lot of think unclear for me (both mesos and spark-mesos).

 If I have a mesos cluster (for testing it will be some docker container) i
 need a separate machine (container) to run my spark job? Or can I submit
 the cluster and schedule (chronos or I dunno)?
 How can I run the streaming job? What happened if the controller died?
 Or if I call spark-submit with master=mesos my application started and I
 can forget? How can I run in every 10 min without submit in every 10 min?
 How can I run my streaming app in HA mode?

 Thanks

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com



Re: Spark-hive parquet schema evolution

2015-07-22 Thread Dean Wampler
While it's not recommended to overwrite files Hive thinks it understands,
you can add the column to Hive's metastore using an ALTER TABLE command
using HiveQL in the Hive shell or using HiveContext.sql():

ALTER TABLE mytable ADD COLUMNS col_name data_type

See
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column
for full details.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 4:36 AM, Cheng Lian lian.cs@gmail.com wrote:

  Since Hive doesn’t support schema evolution, you’ll have to update the
 schema stored in metastore somehow. For example, you can create a new
 external table with the merged schema. Say you have a Hive table t1:

 CREATE TABLE t1 (c0 INT, c1 DOUBLE);

 By default, this table is stored in HDFS path
 hdfs://some-host:9000/user/hive/warehouse/t1. Now you append some Parquet
 data with an extra column c2 to the same directory:

 import org.apache.spark.sql.types._
 val path = hdfs://some-host:9000/user/hive/warehouse/t1val df1 = 
 sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id cast 
 StringType as 'c2)
 df1.write.mode(append).parquet(path)

 Now you can create a new external table t2 like this:

 val df2 = sqlContext.read.option(
  
 mergeSchema, true).parquet(path)
 df2.write.path(path).saveAsTable(t2)

 Since we specified a path above, the newly created t2 is an external
 table pointing to the original HDFS location. But the schema of t2 is the
 merged version.

 The drawback of this approach is that, t2 is actually a Spark SQL
 specific data source table rather than a genuine Hive table. This means, it
 can be accessed by Spark SQL only. We’re just using Hive metastore to help
 persisting metadata of the data source table. However, since you’re asking
 how to access the new table via Spark SQL CLI, this should work for you. We
 are working on making Parquet and ORC data source tables accessible via
 Hive in Spark 1.5.0.

 Cheng

 On 7/22/15 10:32 AM, Jerrick Hoang wrote:

   Hi Lian,

  Sorry I'm new to Spark so I did not express myself very clearly. I'm
 concerned about the situation when let's say I have a Parquet table some
 partitions and I add a new column A to parquet schema and write some data
 with the new schema to a new partition in the table. If i'm not mistaken,
 if I do a sqlContext.read.parquet(table_path).printSchema() it will print
 the correct schema with new column A. But if I do a 'describe table' from
 SparkSQLCLI I won't see the new column being added. I understand that this
 is because Hive doesn't support schema evolution. So what is the best way
 to support CLI queries in this situation? Do I need to manually alter the
 table everytime the underlying schema changes?

  Thanks

 On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Jerrick,

 What do you mean by schema evolution with Hive metastore tables? Hive
 doesn't take schema evolution into account. Could you please give a
 concrete use case? Are you trying to write Parquet data with extra columns
 into an existing metastore Parquet table?

 Cheng


 On 7/21/15 1:04 AM, Jerrick Hoang wrote:

 I'm new to Spark, any ideas would be much appreciated! Thanks

 On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang 
 jerrickho...@gmail.comjerrickho...@gmail.com wrote:

 Hi all,

  I'm aware of the support for schema evolution via DataFrame API. Just
 wondering what would be the best way to go about dealing with schema
 evolution with Hive metastore tables. So, say I create a table via SparkSQL
 CLI, how would I deal with Parquet schema evolution?

  Thanks,
 J




​



Re: Spark Streaming Checkpointing solutions

2015-07-21 Thread Dean Wampler
TD's Spark Summit talk offers suggestions (
https://spark-summit.org/2015/events/recipes-for-running-spark-streaming-applications-in-production/).
He recommends using HDFS, because you get the triplicate resiliency it
offers, albeit with extra overhead. I believe the driver doesn't need
visibility to the checkpointing directory, e.g., if you're running in
client mode, but all the cluster nodes would need to see it for recovering
a lost stage, where it might get started on a different node. Hence, I
would think NFS could work, if all nodes have the same mount, although
there would be a lot of network overhead. In some situations, a high
performance file system appliance, e.g., NAS, could suffice.

My $0.02,
dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Jul 21, 2015 at 10:43 AM, Emmanuel fortin.emman...@gmail.com
wrote:

 Hi,

 I'm working on a Spark Streaming application and I would like to know what
 is the best storage to use
 for checkpointing.

 For testing purposes we're are using NFS between the worker, the master and
 the driver program (in client mode),
 but we have some issues with the CheckpointWriter (1 thread dedicated). *My
 understanding is that NFS is not a good candidate for this usage.*

 1. What is the best solution for checkpointing and what are the
 alternatives
 ?

 2. Does checkpointings directories need to be shared by the driver
 application and the workers too ?

 Thanks for your replies



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-solutions-tp23932.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Silly question about building Spark 1.4.1

2015-07-20 Thread Dean Wampler
hadoop-2.6 is supported (look for profile XML in the pom.xml file).

For Hive, add -Phive -Phive-thriftserver  (See
http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables)
for more details.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Jul 20, 2015 at 2:55 PM, Michael Segel msegel_had...@hotmail.com
wrote:

 Sorry,

 Should have sent this to user…

 However… it looks like the docs page may need some editing?

 Thx

 -Mike


 Begin forwarded message:

 *From: *Michael Segel msegel_had...@hotmail.com
 *Subject: **Silly question about building Spark 1.4.1*
 *Date: *July 20, 2015 at 12:26:40 PM MST
 *To: *d...@spark.apache.org

 Hi,

 I’m looking at the online docs for building spark 1.4.1 …

 http://spark.apache.org/docs/latest/building-spark.html

 I was interested in building spark for Scala 2.11 (latest scala) and also
 for Hive and JDBC support.

 The docs say:
 “
 To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11
 property:

 dev/change-version-to-2.11.sh
 mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package

 “

 So…

 Is there a reason I shouldn’t build against hadoop-2.6 ?


 If I want to add the Thirft and Hive support, is it possible?

 Looking at the Scala build, it looks like hive support is being built?

 (Looking at the stdout messages…)

 Should the docs be updated? Am I missing something?

 (Dean W. can confirm, I am completely brain dead. ;-)


 Thx


 -Mike

 PS. Yes I can probably download a prebuilt image, but I’m a glutton for 
 punishment. ;-)






Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Dean Wampler
Is myRDD outside a DStream? If so are you persisting on each batch
iteration? It should be checkpointed frequently too.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote:

 The data coming from dstream have the same keys that are in myRDD, so the 
 reduceByKey
 after union keeps the overall tuple count in myRDD fixed. Or even with
 fixed tuple count, it will keep consuming more resources?

 On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples coming
 from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time, the
 lineage of myRDD keeps increasing and stages in each batch of dstream keeps
 increasing, even though all the earlier stages are skipped. When the number
 of stages grow big enough, the overall delay due to scheduling delay starts
 increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand







Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Dean Wampler
I think you're complicating the cache behavior by aggressively re-using
vars when temporary vals would be more straightforward. For example,
newBase = newBase.unpersist()... effectively means that newBase's data is
not actually cached when the subsequent .union(...) is performed, so it
probably goes back to the lineage... Same with the current.unpersist logic
before it.

Names are cheap, so just use local vals:

val newCurrent = rdd.union(current).reduceByKey(_+_)
current.unpersist()

Also, what happens if you omit the 2 argument for the number of
partitions in reduceByKey?

Other minor points:

I would change the joined, toUpdate, toNotUpdate logic to this:

val joined = current.leftOuterJoin(newBase).map(mymap).cache()

val toUpdate = joined.filter(myfilter).cache()
val toNotUpdate = joined.filter(mynotfilter).cache()


Maybe it's just for this email example, but you don't need to call collect
on toUpdate before using foreach(println). If the RDD is huge, you
definitely don't want to do that.

Hope this helps.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya anand.na...@gmail.com wrote:

 Yes, myRDD is outside of DStream. Following is the actual code where newBase
 and current are the rdds being updated with each batch:

   val base = sc.textFile...
   var newBase = base.cache()

   val dstream: DStream[String] = ssc.textFileStream...
   var current: RDD[(String, Long)] = sc.emptyRDD.cache()

   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = {

 current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

 val joined = current.leftOuterJoin(newBase).cache()
 val toUpdate = joined.filter(myfilter).map(mymap).cache()
 val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

 toUpdate.collect().foreach(println) // this goes to some store

 newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
 2).cache()

 current = toNotUpdate.cache()

 toUpdate.unpersist()
 joined.unpersist()
 rdd.unpersist()
   })


 Regards,

 Anand


 On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote:

 Is myRDD outside a DStream? If so are you persisting on each batch
 iteration? It should be checkpointed frequently too.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 The data coming from dstream have the same keys that are in myRDD, so
 the reduceByKey after union keeps the overall tuple count in myRDD
 fixed. Or even with fixed tuple count, it will keep consuming more
 resources?

 On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the
 charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples
 coming from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time,
 the lineage of myRDD keeps increasing and stages in each batch of dstream
 keeps increasing, even though all the earlier stages are skipped. When 
 the
 number of stages grow big enough, the overall delay due to scheduling 
 delay
 starts increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand









Please add the Chicago Spark Users' Group to the community page

2015-07-06 Thread Dean Wampler
Here's our home page: http://www.meetup.com/Chicago-Spark-Users/

Thanks,
Dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com


Re: Spark 1.3.1 On Mesos Issues.

2015-06-01 Thread Dean Wampler
It would be nice to see the code for MapR FS Java API, but my google foo
failed me (assuming it's open source)...

So, shooting in the dark ;) there are a few things I would check, if you
haven't already:

1. Could there be 1.2 versions of some Spark jars that get picked up at run
time (but apparently not in local mode) on one or more nodes? (Side
question: Does your node experiment fail on all nodes?) Put another way,
are the classpaths good for all JVM tasks?
2. Can you use just MapR and Spark 1.3.1 successfully, bypassing Mesos?

Incidentally, how are you combining Mesos and MapR? Are you running Spark
in Mesos, but accessing data in MapR-FS?

Perhaps the MapR shim library doesn't support Spark 1.3.1.

HTH,

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Jun 1, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote:

 All -

 I am facing and odd issue and I am not really sure where to go for support
 at this point.  I am running MapR which complicates things as it relates to
 Mesos, however this HAS worked in the past with no issues so I am stumped
 here.

 So for starters, here is what I am trying to run. This is a simple show
 tables using the Hive Context:

 from pyspark import SparkContext, SparkConf
 from pyspark.sql import SQLContext, Row, HiveContext
 sparkhc = HiveContext(sc)
 test = sparkhc.sql(show tables)
 for r in test.collect():
   print r

 When I run it on 1.3.1 using ./bin/pyspark --master local  This works with
 no issues.

 When I run it using Mesos with all the settings configured (as they had
 worked in the past) I get lost tasks and when I zoom in them, the error
 that is being reported is below.  Basically it's a NullPointerException on
 the com.mapr.fs.ShimLoader.  What's weird to me is is I took each instance
 and compared both together, the class path, everything is exactly the same.
 Yet running in local mode works, and running in mesos fails.  Also of note,
 when the task is scheduled to run on the same node as when I run locally,
 that fails too! (Baffling).

 Ok, for comparison, how I configured Mesos was to download the mapr4
 package from spark.apache.org.  Using the exact same configuration file
 (except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0.
 When I run this example with the mapr4 for 1.2.0 there is no issue in
 Mesos, everything runs as intended. Using the same package for 1.3.1 then
 it fails.

 (Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails as
 well).

 So basically When I used 1.2.0 and followed a set of steps, it worked on
 Mesos and 1.3.1 fails.  Since this is a current version of Spark, MapR is
 supports 1.2.1 only.  (Still working on that).

 I guess I am at a loss right now on why this would be happening, any
 pointers on where I could look or what I could tweak would be greatly
 appreciated. Additionally, if there is something I could specifically draw
 to the attention of MapR on this problem please let me know, I am perplexed
 on the change from 1.2.0 to 1.3.1.

 Thank you,

 John




 Full Error on 1.3.1 on Mesos:
 15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity
 1060.3 MB java.lang.NullPointerException at
 com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:96) at
 com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:232) at
 com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at
 org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60)
 at java.lang.Class.forName0(Native Method) at
 java.lang.Class.forName(Class.java:274) at
 org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847)
 at
 org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062)
 at
 org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272)
 at
 org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224)
 at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141)
 at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at
 org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at
 org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98)
 at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at
 org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:220) at
 org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) at
 org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1959) at
 org.apache.spark.storage.BlockManager.(BlockManager.scala:104) at
 org.apache.spark.storage.BlockManager.(BlockManager.scala:179) at
 org.apache.spark.SparkEnv$.create(SparkEnv.scala:310) at
 org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:186) at
 org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:70

Re: Recommended Scala version

2015-05-26 Thread Dean Wampler
Most of the 2.11 issues are being resolved in Spark 1.4. For a while, the
Spark project has published maven artifacts that are compiled with 2.11 and
2.10, although the downloads at http://spark.apache.org/downloads.html are
still all for 2.10.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, May 26, 2015 at 10:33 AM, Ritesh Kumar Singh 
riteshoneinamill...@gmail.com wrote:

 Yes, recommended version is 2.10 as all the features are not supported by
 2.11 version. Kafka libraries and JDBC components are yet to be ported to
 2.11 version. And so if your project doesn't depend on these components,
 you can give v2.11 a try.

 Here's a link
 https://spark.apache.org/docs/1.2.0/building-spark.html#building-for-scala-211
  for
 building with 2.11 version.

 Though, you won't be running into any issues if you try v2.10 as of now.
 But then again, the future releases will have to shift to 2.11 version once
 support for v2.10 ends in the long run.


 On Tue, May 26, 2015 at 8:21 PM, Punyashloka Biswal 
 punya.bis...@gmail.com wrote:

 Dear Spark developers and users,

 Am I correct in believing that the recommended version of Scala to use
 with Spark is currently 2.10? Is there any plan to switch to 2.11 in
 future? Are there any advantages to using 2.11 today?

 Regards,
 Punya





Re: Tasks randomly stall when running on mesos

2015-05-25 Thread Dean Wampler
Here is a link for builds of 1.4 RC2:

http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc2-bin/

For a mvn repo, I believe the RC2 artifacts are here:

https://repository.apache.org/content/repositories/orgapachespark-1104/

A few experiments you might try:

1. Does spark-shell work? It might start fine, but make sure you can create
an RDD and use it, e.g., something like:

val rdd = sc.parallelize(Seq(1,2,3,4,5,6))
rdd foreach println

2. Try coarse grained mode, which has different logic for executor
management.

You can set it in $SPARK_HOME/conf/spark-defaults.conf file:

spark.mesos.coarse   true

Or, from this page
http://spark.apache.org/docs/latest/running-on-mesos.html, set the
property in a SparkConf object used to construct the SparkContext:

conf.set(spark.mesos.coarse, true)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, May 25, 2015 at 12:06 PM, Reinis Vicups sp...@orbit-x.de wrote:

  Hello,

 I assume I am running spark in a fine-grained mode since I haven't changed
 the default here.

 One question regarding 1.4.0-RC1 - is there a mvn snapshot repository I
 could use for my project config? (I know that I have to download source and
 make-distribution for executor as well)

 thanks
 reinis


 On 25.05.2015 17:07, Iulian Dragoș wrote:


 On Mon, May 25, 2015 at 2:43 PM, Reinis Vicups sp...@orbit-x.de wrote:

  Hello,

 I am using Spark 1.3.1-hadoop2.4 with Mesos 0.22.1 with zookeeper and
 running on a cluster with 3 nodes on 64bit ubuntu.

 My application is compiled with spark 1.3.1 (apparently with mesos 0.21.0
 dependency), hadoop 2.5.1-mapr-1503 and akka 2.3.10. Only with this
 combination I have succeeded to run spark-jobs on mesos at all. Different
 versions are causing class loader issues.

 I am submitting spark jobs with spark-submit with mesos://zk://.../mesos.


  Are you using coarse grained or fine grained mode?

  sandbox log of slave-node app01 (the one that stalls) shows following:

 10:01:25.815506 35409 fetcher.cpp:214] Fetching URI
 'hdfs://dev-hadoop01/apps/spark-1.3.1-bin-hadoop2.4.tgz'
 10:01:26.497764 35409 fetcher.cpp:99] Fetching URI
 'hdfs://dev-hadoop01/apps/spark-1.3.1-bin-hadoop2.4.tgz' using Hadoop Client
 10:01:26.497869 35409 fetcher.cpp:109] Downloading resource from
 'hdfs://dev-hadoop01/apps/spark-1.3.1-bin-hadoop2.4.tgz' to
 '/tmp/mesos/slaves/20150511-150924-3410235146-5050-1903-S3/frameworks/20150511-150924-3410235146-5050-1903-0249/executors/20150511-150924-3410235146-5050-1903-S3/runs/ec3a0f13-2f44-4952-bb23-4d48abaacc05/spark-1.3.1-bin-hadoop2.4.tgz'
 10:01:32.877717 35409 fetcher.cpp:78] Extracted resource
 '/tmp/mesos/slaves/20150511-150924-3410235146-5050-1903-S3/frameworks/20150511-150924-3410235146-5050-1903-0249/executors/20150511-150924-3410235146-5050-1903-S3/runs/ec3a0f13-2f44-4952-bb23-4d48abaacc05/spark-1.3.1-bin-hadoop2.4.tgz'
 into
 '/tmp/mesos/slaves/20150511-150924-3410235146-5050-1903-S3/frameworks/20150511-150924-3410235146-5050-1903-0249/executors/20150511-150924-3410235146-5050-1903-S3/runs/ec3a0f13-2f44-4952-bb23-4d48abaacc05'
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 10:01:34 INFO MesosExecutorBackend: Registered signal handlers for [TERM,
 HUP, INT]
 10:01:34.459292 35730 exec.cpp:132] Version: 0.22.0
 *10:01:34 ERROR MesosExecutorBackend: Received launchTask but executor
 was null*
 10:01:34.540870 35765 exec.cpp:206] Executor registered on slave
 20150511-150924-3410235146-5050-1903-S3
 10:01:34 INFO MesosExecutorBackend: Registered with Mesos as executor ID
 20150511-150924-3410235146-5050-1903-S3 with 1 cpus


  It looks like an inconsistent state on the Mesos scheduler. It tries to
 launch a task on a given slave before the executor has registered. This
 code was improved/refactored in 1.4, could you try 1.4.0-RC1?


​Yes and note the second message after the error you highlighted; that's
when the executor would be registered with Mesos and the local object
created. ​



  iulian


  10:01:34 INFO SecurityManager: Changing view acls to...
 10:01:35 INFO Slf4jLogger: Slf4jLogger started
 10:01:35 INFO Remoting: Starting remoting
 10:01:35 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkExecutor@app01:xxx]
 10:01:35 INFO Utils: Successfully started service 'sparkExecutor' on port
 xxx.
 10:01:35 INFO AkkaUtils: Connecting to MapOutputTracker:
 akka.tcp://sparkDriver@dev-web01/user/MapOutputTracker
 10:01:35 INFO AkkaUtils: Connecting to BlockManagerMaster:
 akka.tcp://sparkDriver@dev-web01/user/BlockManagerMaster
 10:01:36 INFO DiskBlockManager: Created local directory at
 /tmp/spark-52a6585a-f9f2-4ab6-bebc-76be99b0c51c/blockmgr-e6d79818-fe30-4b5c-bcd6-8fbc5a201252
 10:01:36 INFO MemoryStore: MemoryStore started with capacity 88.3 MB
 10:01:36 WARN

Re: Spark SQL: preferred syntax for column reference?

2015-05-13 Thread Dean Wampler
Is the $foo or mydf(foo) or both checked at compile time to verify that
the column reference is valid? Thx.

Dean

On Wednesday, May 13, 2015, Michael Armbrust mich...@databricks.com wrote:

 I would not say that either method is preferred (neither is
 old/deprecated).  One advantage to the second is that you are referencing a
 column from a specific dataframe, instead of just providing a string that
 will be resolved much like an identifier in a SQL query.

 This means given:
 df1 = [id: int, name: string ]
 df2 = [id: int, zip: int]

 I can do something like:

 df1.join(df2, df1(id) === df2(id))

 Where as I would need aliases if I was only using strings:

 df1.as(a).join(df2.as(b), $a.id === $b.id)

 On Wed, May 13, 2015 at 9:55 AM, Diana Carroll dcarr...@cloudera.com
 javascript:_e(%7B%7D,'cvml','dcarr...@cloudera.com'); wrote:

 I'm just getting started with Spark SQL and DataFrames in 1.3.0.

 I notice that the Spark API shows a different syntax for referencing
 columns in a dataframe than the Spark SQL Programming Guide.

 For instance, the API docs for the select method show this:
 df.select($colA, $colB)


 Whereas the programming guide shows this:
 df.filter(df(name)  21).show()

 I tested and both the $column and df(column) syntax works, but I'm
 wondering which is *preferred*.  Is one the original and one a new
 feature we should be using?

 Thanks,
 Diana
 (Spark Curriculum Developer for Cloudera)




-- 
Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com


Re: value toDF is not a member of RDD object

2015-05-12 Thread Dean Wampler
It's the import statement Olivier showed that makes the method available.

Note that you can also use `sc.createDataFrame(myRDD)`, without the need
for the import statement. I personally prefer this approach.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote:

 you need to instantiate a SQLContext :
 val sc : SparkContext = ...
 val sqlContext = new SQLContext(sc)
 import sqlContext.implicits._

 Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit :

 I added `libraryDependencies += org.apache.spark % spark-sql_2.11 %
 1.3.1` to `build.sbt` but the error remains. Do I need to import modules
 other than `import org.apache.spark.sql.{ Row, SQLContext }`?

 On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com
 wrote:

 toDF is part of spark SQL so you need Spark SQL dependency + import
 sqlContext.implicits._ to get the toDF method.

 Regards,

 Olivier.

 Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a
 écrit :

 Hi User Group,

 I’m trying to reproduce the example on Spark SQL Programming Guide
 https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection,
 and got a compile error when packaging with sbt:

 [error] myfile.scala:30: value toDF is not a member of 
 org.apache.spark.rdd.RDD[Person]
 [error] val people = 
 sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p
  = Person(p(0), p(1).trim.toInt)).toDF()
 [error]
   ^
 [error] one error found
 [error] (compile:compileIncremental) Compilation failed
 [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM

 I double checked my code includes import sqlContext.implicits._ after
 reading this post
 https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E
 on spark mailing list, even tried to use toDF(col1, col2)
 suggested by Xiangrui Meng in that post and got the same error.

 The Spark version is specified in build.sbt file as follows:

 scalaVersion := 2.11.6
 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 % 
 provided
 libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1

 Anyone have ideas the cause of this error?

 REGARDS,
 Todd Leo
 ​




Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread Dean Wampler
I don't know the full context of what you're doing, but serialization
errors usually mean you're attempting to serialize something that can't be
serialized, like the SparkContext. Kryo won't help there.

The arguments to spark-submit you posted previously look good:

2)  --num-executors 96 --driver-memory 12g --driver-java-options
-XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

I suspect you aren't getting the parallelism you need. For partitioning, if
your data is in HDFS and your block size is 128MB, then you'll get ~195
partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
have some other serious bottleneck. You should examine the web console and
the logs to determine where all the time is going. Questions you might
pursue:

   - How long does each task take to complete?
   - How many of those 195 partitions/tasks are processed at the same time?
   That is, how many slots are available?  Maybe you need more nodes if the
   number of slots is too low. Based on your command arguments, you should be
   able to process 1/2 of them at a time, unless the cluster is busy.
   - Is the cluster swamped with other work?
   - How much data does each task process? Is the data roughly the same
   from one task to the next? If not, then you might have serious key skew?

You may also need to research the details of how joins are implemented and
some of the common tricks for organizing data to minimize having to shuffle
all N by M records.



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right number
 ? When i am using 400 i do not get NullPointerException that i talked
 about, which is strange. I never saw that exception against small random
 dataset but see it with 25G and again with 400 partitions , i do not see it.


 On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 IMHO, you are trying waaay to hard to optimize work on what is really a
 small data set. 25G, even 250G, is not that much data, especially if you've
 spent a month trying to get something to work that should be simple. All
 these errors are from optimization attempts.

 Kryo is great, but if it's not working reliably for some reason, then
 don't use it. Rather than force 200 partitions, let Spark try to figure out
 a good-enough number. (If you really need to force a partition count, use
 the repartition method instead, unless you're overriding the partitioner.)

 So. I recommend that you eliminate all the optimizations: Kryo,
 partitionBy, etc. Just use the simplest code you can. Make it work first.
 Then, if it really isn't fast enough, look for actual evidence of
 bottlenecks and optimize those.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Dean  Others,
 Thanks for your suggestions.
 I have two data sets and all i want to do is a simple equi join. I have
 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
 Hence i switched back to use .join() API instead of map-side broadcast
 join.
 I am repartitioning the data with 100,200 and i see a
 NullPointerException now.

 When i run against 25G of each input and with .partitionBy(new
 org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


 this trace does not include a line from my code and hence i do not what
 is causing error ?
 I do have registered kryo serializer.

 val conf = new SparkConf()
   .setAppName(detail)
 *  .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)*
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 0)
 * 
 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
 lMetricSum]))
 val sc = new SparkContext(conf)

 I see the exception when this task runs

 val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

 Its a simple mapping of input records to (itemId, record)

 I found this

 http

Re: Questions about Accumulators

2015-05-03 Thread Dean Wampler
Yes, correct.

However, note that when an accumulator operation is *idempotent*, meaning
that repeated application for the same data behaves exactly like one
application, then that accumulator can be safely called in transformation
steps (non-actions), too.

For example, max and min tracking. Just last week I wrote one that used a
hash map to track the latest timestamps seen for specific keys.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 8:07 AM, xiazhuchang hk8...@163.com wrote:

 “For accumulator updates performed inside actions only, Spark guarantees
 that
 each task’s update to the accumulator will only be applied once, i.e.
 restarted tasks will not update the value. In transformations, users should
 be aware of that each task’s update may be applied more than once if tasks
 or job stages are re-executed. ”
 Is this mean the guarantees(accumulator only be updated once) only in
 actions? That is to say, one should use the accumulator only in actions,
 orelse there may be some errors(update more than once) if used in
 transformations?
 e.g. map(x = accumulator += x)
 After executed, the correct result of accumulator should be 1;
 Unfortunately, some errors happened, restart task, the map() operation
 re-executed(map(x = accumulator += x)  re-executed), then the final result
 of acculumator will be 2, twice as the correct result?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746p22747.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread Dean Wampler
How big is the data you're returning to the driver with collectAsMap? You
are probably running out of memory trying to copy too much data back to it.

If you're trying to force a map-side join, Spark can do that for you in
some cases within the regular DataFrame/RDD context. See
http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
and this talk by Michael Armbrust for example,
http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.


dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Full Exception
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
 VISummaryDataProvider.scala:37) failed in 884.087 s*
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap
 at VISummaryDataProvider.scala:37, took 1093.418249 s*
 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
 exitCode: 15, (reason: User class threw exception: Job aborted due to stage
 failure: Exception while getting task result:
 org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)])


 *Code at line 37*

 val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) }
 .collectAsMap

 Listing data set size is 26G (10 files) and my driver memory is 12G (I
 cant go beyond it). The reason i do collectAsMap is to brodcast it and do a
 map-side join instead of regular join.


 Please suggest ?


 On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 My Spark Job is failing  and i see

 ==

 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]

 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)


 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]


 I see

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread Dean Wampler
IMHO, you are trying waaay to hard to optimize work on what is really a
small data set. 25G, even 250G, is not that much data, especially if you've
spent a month trying to get something to work that should be simple. All
these errors are from optimization attempts.

Kryo is great, but if it's not working reliably for some reason, then don't
use it. Rather than force 200 partitions, let Spark try to figure out a
good-enough number. (If you really need to force a partition count, use the
repartition method instead, unless you're overriding the partitioner.)

So. I recommend that you eliminate all the optimizations: Kryo,
partitionBy, etc. Just use the simplest code you can. Make it work first.
Then, if it really isn't fast enough, look for actual evidence of
bottlenecks and optimize those.



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Hello Dean  Others,
 Thanks for your suggestions.
 I have two data sets and all i want to do is a simple equi join. I have
 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
 Hence i switched back to use .join() API instead of map-side broadcast
 join.
 I am repartitioning the data with 100,200 and i see a NullPointerException
 now.

 When i run against 25G of each input and with .partitionBy(new
 org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


 this trace does not include a line from my code and hence i do not what is
 causing error ?
 I do have registered kryo serializer.

 val conf = new SparkConf()
   .setAppName(detail)
 *  .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)*
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 0)
 * 
 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
 lMetricSum]))
 val sc = new SparkContext(conf)

 I see the exception when this task runs

 val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

 Its a simple mapping of input records to (itemId, record)

 I found this

 http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
 and

 http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html

 Looks like Kryo (2.21v)  changed something to stop using default
 constructors.

 (Kryo.DefaultInstantiatorStrategy) 
 kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
 StdInstantiatorStrategy());


 Please suggest


 Trace:
 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in
 stage 2.0 (TID 774)
 com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
 Serialization trace:
 values (org.apache.avro.generic.GenericData$Record)
 datum (org.apache.avro.mapred.AvroKey)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 Regards,


 Any suggestions.
 I am not able to get this thing to work over a month now, its kind of
 getting frustrating.

 On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 How big is the data you're returning to the driver with collectAsMap? You
 are probably running out of memory trying to copy too much data back to it.

 If you're trying to force a map-side join, Spark can do that for you in
 some cases within the regular DataFrame/RDD context. See
 http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
 and this talk by Michael Armbrust for example,
 http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.


 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ

Re: Spark distributed SQL: JSON Data set on all worker node

2015-05-03 Thread Dean Wampler
Note that each JSON object has to be on a single line in the files.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 4:14 AM, ayan guha guha.a...@gmail.com wrote:

 Yes it is possible. You need to use jsonfile method on SQL context and
 then create a dataframe from the rdd. Then register it as a table. Should
 be 3 lines of code, thanks to spark.

 You may see few YouTube video esp for unifying pipelines.
 On 3 May 2015 19:02, Jai jai4l...@gmail.com wrote:

 Hi,

 I am noob to spark and related technology.

 i have JSON stored at same location on all worker clients spark cluster).
 I am looking to load JSON data set on these clients and do SQL query, like
 distributed SQL.

 is it possible to achieve?

 right now, master submits task to one node only.

 Thanks and regards
 Mrityunjay




Re: java.io.IOException: No space left on device

2015-04-29 Thread Dean Wampler
Or multiple volumes. The LOCAL_DIRS (YARN) and SPARK_LOCAL_DIRS (Mesos,
Standalone) environment variables and the spark.local.dir property control
where temporary data is written. The default is /tmp.

See
http://spark.apache.org/docs/latest/configuration.html#runtime-environment
for more details.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Apr 29, 2015 at 6:19 AM, Anshul Singhle ans...@betaglide.com
wrote:

 Do you have multiple disks? Maybe your work directory is not in the right
 disk?

 On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com
 wrote:

 Hi,

 I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf
 output,the training data is a file containing 156060 (size 8.1M).

 The problem is that when trying to presist a partition into memory and
 there
 is not enought memory, the partition is persisted on disk and despite
 Having
 229G of free disk space, I got  No space left on device..

 This is how I'm running the program :

 ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master
 local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv
 testData.tsv

 And this is a part of the log:



 If you need more informations, please let me know.
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: java.io.IOException: No space left on device

2015-04-29 Thread Dean Wampler
Makes sense. / is where /tmp would be. However, 230G should be plenty of
space. If you have INFO logging turned on (set in
$SPARK_HOME/conf/log4j.properties), you'll see messages about saving data
to disk that will list sizes. The web console also has some summary
information about this.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Apr 29, 2015 at 6:25 AM, selim namsi selim.na...@gmail.com wrote:

 This is the output of df -h so as you can see I'm using only one disk
 mounted on /

 df -h
 Filesystem  Size  Used Avail Use% Mounted on
 /dev/sda8   276G   34G  229G  13% /none4.0K 0  4.0K   0% 
 /sys/fs/cgroup
 udev7.8G  4.0K  7.8G   1% /dev
 tmpfs   1.6G  1.4M  1.6G   1% /runnone5.0M 0  5.0M   
 0% /run/locknone7.8G   37M  7.8G   1% /run/shmnone
 100M   40K  100M   1% /run/user
 /dev/sda1   496M   55M  442M  11% /boot/efi

 Also when running the program, I noticed that the Used% disk space related
 to the partition mounted on / was growing very fast

 On Wed, Apr 29, 2015 at 12:19 PM Anshul Singhle ans...@betaglide.com
 wrote:

 Do you have multiple disks? Maybe your work directory is not in the right
 disk?

 On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com
 wrote:

 Hi,

 I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf
 output,the training data is a file containing 156060 (size 8.1M).

 The problem is that when trying to presist a partition into memory and
 there
 is not enought memory, the partition is persisted on disk and despite
 Having
 229G of free disk space, I got  No space left on device..

 This is how I'm running the program :

 ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master
 local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv
 testData.tsv

 And this is a part of the log:



 If you need more informations, please let me know.
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: A problem of using spark streaming to capture network packets

2015-04-29 Thread Dean Wampler
I would use the ps command on each machine while the job is running to
confirm that every process involved is running as root.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 28, 2015 at 8:58 PM, Lin Hao Xu xulin...@cn.ibm.com wrote:

 btw, from spark web ui, the acl is marked with *root*

 Best regards,

 Lin Hao XU
 IBM Research China
 Email: xulin...@cn.ibm.com
 My Flickr: http://www.flickr.com/photos/xulinhao/sets

 [image: Inactive hide details for Dean Wampler ---2015/04/29
 09:40:34---Are the tasks on the slaves also running as root? If not, that]Dean
 Wampler ---2015/04/29 09:40:34---Are the tasks on the slaves also running
 as root? If not, that might explain the problem.

 From: Dean Wampler deanwamp...@gmail.com
 To: Lin Hao Xu/China/IBM@IBMCN
 Cc: Hai Shan Wu/China/IBM@IBMCN, user user@spark.apache.org
 Date: 2015/04/29 09:40
 Subject: Re: A problem of using spark streaming to capture network packets
 --



 Are the tasks on the slaves also running as root? If not, that might
 explain the problem.

 dean

 Dean Wampler, Ph.D.
 Author: *Programming Scala, 2nd Edition*
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 *Typesafe* http://typesafe.com/
 *@deanwampler* http://twitter.com/deanwampler
 *http://polyglotprogramming.com* http://polyglotprogramming.com/

 On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu *xulin...@cn.ibm.com*
 xulin...@cn.ibm.com wrote:

1. The full command line is written in a shell script:

LIB=/home/spark/.m2/repository

/opt/spark/bin/spark-submit \
--class spark.pcap.run.TestPcapSpark \
--jars

 $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/

 org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar
\
/home/spark/napa/napa.jar

2. And we run this script with *sudo*, if you do not use sudo, then
you cannot access network interface.

3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in
a standard Java program, it really worked like a champion.

Best regards,

Lin Hao XU
IBM Research China
Email: *xulin...@cn.ibm.com* xulin...@cn.ibm.com
My Flickr: *http://www.flickr.com/photos/xulinhao/sets*
http://www.flickr.com/photos/xulinhao/sets

[image: Inactive hide details for Dean Wampler ---2015/04/28
20:07:54---It's probably not your code. What's the full command line you 
 u]Dean
Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the
full command line you use to submit the job?

From: Dean Wampler *deanwamp...@gmail.com* deanwamp...@gmail.com
To: Hai Shan Wu/China/IBM@IBMCN
Cc: user *user@spark.apache.org* user@spark.apache.org, Lin Hao
Xu/China/IBM@IBMCN
Date: 2015/04/28 20:07
Subject: Re: A problem of using spark streaming to capture network
packets
--




It's probably not your code.

What's the full command line you use to submit the job?

Are you sure the job on the cluster has access to the network
interface? Can you test the receiver by itself without Spark? For example,
does this line work as expected:

ListPcapNetworkInterface nifs = Pcaps.findAllDevs();

dean

Dean Wampler, Ph.D.
Author: *Programming Scala, 2nd Edition*
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 *Typesafe* http://typesafe.com/
 *@deanwampler* http://twitter.com/deanwampler
 *http://polyglotprogramming.com* http://polyglotprogramming.com/

On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu *wuh...@cn.ibm.com*
wuh...@cn.ibm.com wrote:
   Hi Everyone

   We use pcap4j to capture network packets and then use spark
   streaming to analyze captured packets. However, we met a strange 
 problem.

   If we run our application on spark locally (for example,
   spark-submit --master local[2]), then the program runs successfully.

   If we run our application on spark standalone cluster, then the
   program will tell us that NO NIFs found.

   I also attach two test files for clarification.

   So anyone can help on this? Thanks in advance!


 * (See attached file: PcapReceiver.java)(See attached file:
   TestPcapSpark.java)*

   Best regards,

   - Haishan

   Haishan Wu (吴海珊)

   IBM Research - China
   Tel: 86-10-58748508
   Fax: 86-10-58748330
   Email: *wuh...@cn.ibm.com* wuh...@cn.ibm.com
   Lotus Notes: Hai Shan Wu/China/IBM



   -
   To unsubscribe, e-mail: *user-unsubscr...@spark.apache.org

Re: A problem of using spark streaming to capture network packets

2015-04-28 Thread Dean Wampler
Are the tasks on the slaves also running as root? If not, that might
explain the problem.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu xulin...@cn.ibm.com wrote:

 1. The full command line is written in a shell script:

 LIB=/home/spark/.m2/repository

 /opt/spark/bin/spark-submit \
 --class spark.pcap.run.TestPcapSpark \
 --jars
 $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/
 org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar
 \
 /home/spark/napa/napa.jar

 2. And we run this script with *sudo*, if you do not use sudo, then you
 cannot access network interface.

 3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in
 a standard Java program, it really worked like a champion.

 Best regards,

 Lin Hao XU
 IBM Research China
 Email: xulin...@cn.ibm.com
 My Flickr: http://www.flickr.com/photos/xulinhao/sets

 [image: Inactive hide details for Dean Wampler ---2015/04/28
 20:07:54---It's probably not your code. What's the full command line you 
 u]Dean
 Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the
 full command line you use to submit the job?

 From: Dean Wampler deanwamp...@gmail.com
 To: Hai Shan Wu/China/IBM@IBMCN
 Cc: user user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN
 Date: 2015/04/28 20:07
 Subject: Re: A problem of using spark streaming to capture network packets
 --



 It's probably not your code.

 What's the full command line you use to submit the job?

 Are you sure the job on the cluster has access to the network interface?
 Can you test the receiver by itself without Spark? For example, does this
 line work as expected:

 ListPcapNetworkInterface nifs = Pcaps.findAllDevs();

 dean

 Dean Wampler, Ph.D.
 Author: *Programming Scala, 2nd Edition*
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 *Typesafe* http://typesafe.com/
 *@deanwampler* http://twitter.com/deanwampler
 *http://polyglotprogramming.com* http://polyglotprogramming.com/

 On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu *wuh...@cn.ibm.com*
 wuh...@cn.ibm.com wrote:

Hi Everyone

We use pcap4j to capture network packets and then use spark streaming
to analyze captured packets. However, we met a strange problem.

If we run our application on spark locally (for example, spark-submit
--master local[2]), then the program runs successfully.

If we run our application on spark standalone cluster, then the
program will tell us that NO NIFs found.

I also attach two test files for clarification.

So anyone can help on this? Thanks in advance!


 * (See attached file: PcapReceiver.java)(See attached file:
TestPcapSpark.java)*

Best regards,

- Haishan

Haishan Wu (吴海珊)

IBM Research - China
Tel: 86-10-58748508
Fax: 86-10-58748330
Email: *wuh...@cn.ibm.com* wuh...@cn.ibm.com
Lotus Notes: Hai Shan Wu/China/IBM


-
To unsubscribe, e-mail: *user-unsubscr...@spark.apache.org*
user-unsubscr...@spark.apache.org
For additional commands, e-mail: *user-h...@spark.apache.org*
user-h...@spark.apache.org





Re: A problem of using spark streaming to capture network packets

2015-04-28 Thread Dean Wampler
It's probably not your code.

What's the full command line you use to submit the job?

Are you sure the job on the cluster has access to the network interface?
Can you test the receiver by itself without Spark? For example, does this
line work as expected:

ListPcapNetworkInterface nifs = Pcaps.findAllDevs();

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu wuh...@cn.ibm.com wrote:

 Hi Everyone

 We use pcap4j to capture network packets and then use spark streaming to
 analyze captured packets. However, we met a strange problem.

 If we run our application on spark locally (for example, spark-submit
 --master local[2]), then the program runs successfully.

 If we run our application on spark standalone cluster, then the program
 will tell us that NO NIFs found.

 I also attach two test files for clarification.

 So anyone can help on this? Thanks in advance!


 *(See attached file: PcapReceiver.java)**(See attached file:
 TestPcapSpark.java)*

 Best regards,

 - Haishan

 Haishan Wu (吴海珊)

 IBM Research - China
 Tel: 86-10-58748508
 Fax: 86-10-58748330
 Email: wuh...@cn.ibm.com
 Lotus Notes: Hai Shan Wu/China/IBM


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Cluster Setup

2015-04-24 Thread Dean Wampler
It's mostly manual. You could try automating with something like Chef, of
course, but there's nothing already available in terms of automation.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Fri, Apr 24, 2015 at 10:33 AM, James King jakwebin...@gmail.com wrote:

 Thanks Dean,

 Sure I have that setup locally and testing it with ZK.

 But to start my multiple Masters do I need to go to each host and start
 there or is there a better way to do this.

 Regards
 jk

 On Fri, Apr 24, 2015 at 5:23 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 The convention for standalone cluster is to use Zookeeper to manage
 master failover.

 http://spark.apache.org/docs/latest/spark-standalone.html

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Fri, Apr 24, 2015 at 5:01 AM, James King jakwebin...@gmail.com
 wrote:

 I'm trying to find out how to setup a resilient Spark cluster.

 Things I'm thinking about include:

 - How to start multiple masters on different hosts?
 - there isn't a conf/masters file from what I can see


 Thank you.






Re: Spark Cluster Setup

2015-04-24 Thread Dean Wampler
The convention for standalone cluster is to use Zookeeper to manage master
failover.

http://spark.apache.org/docs/latest/spark-standalone.html

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Fri, Apr 24, 2015 at 5:01 AM, James King jakwebin...@gmail.com wrote:

 I'm trying to find out how to setup a resilient Spark cluster.

 Things I'm thinking about include:

 - How to start multiple masters on different hosts?
 - there isn't a conf/masters file from what I can see


 Thank you.



Re: Instantiating/starting Spark jobs programmatically

2015-04-23 Thread Dean Wampler
I strongly recommend spawning a new process for the Spark jobs. Much
cleaner separation. Your driver program won't be clobbered if the Spark job
dies, etc. It can even watch for failures and restart.

In the Scala standard library, the sys.process package has classes for
constructing and interoperating with external processes. Perhaps Java has
something similar these days?

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 21, 2015 at 2:15 PM, Steve Loughran ste...@hortonworks.com
wrote:


  On 21 Apr 2015, at 17:34, Richard Marscher rmarsc...@localytics.com
 wrote:

 - There are System.exit calls built into Spark as of now that could kill
 your running JVM. We have shadowed some of the most offensive bits within
 our own application to work around this. You'd likely want to do that or to
 do your own Spark fork. For example, if the SparkContext can't connect to
 your cluster master node when it is created, it will System.exit.


 people can block errant System.exit calls by running under a
 SecurityManager. Less than ideal (and there's a small performance hit) -but
 possible



Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Dean Wampler
I wasn't involved in this decision (I just make the fries), but
CompactBuffer is designed for relatively small data sets that at least fit
in memory. It's more or less an Array. In principle, returning an iterator
could hide the actual data structure that might be needed to hold a much
bigger data set, if necessary.

HOWEVER, it actually returns a CompactBuffer.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote:

 Should I repost this to dev list ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Scala Version?

2015-04-21 Thread Dean Wampler
Without the rest of your code it's hard to make sense of errors. Why do you
need to use reflection?

​Make sure you use the same Scala versions throughout and 2.10.4 is
recommended. That's still the official version for Spark, even though
provisional​ support for 2.11 exists.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 21, 2015 at 12:26 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 While running a my Spark Application over 1.3.0 with Scala 2.10.0 i
 encountered

 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage
 2.0 (TID 28)
 java.lang.UnsupportedOperationException: tail of empty list
 at scala.collection.immutable.Nil$.tail(List.scala:339)
 at scala.collection.immutable.Nil$.tail(List.scala:334)
 at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)


 My app has quite a lot of  reflection usage as i need to build avro
 schema. I was told on stack overflow that this error is because reflection
 is not thread safe in scala 2.10 and was advised to use scala 2.11

 Hence I moved to scala 2.11 and moved all spark maven dependencies to 2.11
 (spark-avro is still 2.10 as 2.11 is not available). I ran into this

 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage
 2.0 (TID 28)
 java.lang.UnsupportedOperationException: tail of empty list
 at scala.collection.immutable.Nil$.tail(List.scala:339)
 at scala.collection.immutable.Nil$.tail(List.scala:334)
 at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)


 Code:
 args.map {
   option =
 {
   val pairs = option.split(=)
   arguments += pairs(0) - pairs(1)
 }
 }

 Error is at -. Does Spark use scala libs 2.10 at runtime ? Any
 suggestions for fix ?
 --
 Deepak




Re: Spark on Windows

2015-04-16 Thread Dean Wampler
If you're running Hadoop, too, now that Hortonworks supports Spark, you
might be able to use their distribution.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 16, 2015 at 2:19 PM, Arun Lists lists.a...@gmail.com wrote:

 Thanks, Matei! We'll try that and let you know if it works. You are
 correct in inferring that some of the problems we had were with
 dependencies.

 We also had problems with the spark-submit scripts. I will get the details
 from the engineer who worked on the Windows builds and provide them to you.

 arun


 On Thu, Apr 16, 2015 at 10:44 AM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 You could build Spark with Scala 2.11 on Mac / Linux and transfer it over
 to Windows. AFAIK it should build on Windows too, the only problem is that
 Maven might take a long time to download dependencies. What errors are you
 seeing?

 Matei

  On Apr 16, 2015, at 9:23 AM, Arun Lists lists.a...@gmail.com wrote:
 
  We run Spark on Mac and Linux but also need to run it on Windows 8.1
 and  Windows Server. We ran into problems with the Scala 2.10 binary bundle
 for Spark 1.3.0 but managed to get it working. However, on Mac/Linux, we
 are on Scala 2.11.6 (we built Spark from the sources). On Windows, however
 despite our best efforts we cannot get Spark 1.3.0 as built from sources
 working for Scala 2.11.6. Spark has too many moving parts and dependencies!
 
  When can we expect to see a binary bundle for Spark 1.3.0 that is built
 for Scala 2.11.6?  I read somewhere that the only reason that Spark 1.3.0
 is still built for Scala 2.10 is because Kafka is still on Scala 2.10. For
 those of us who don't use Kafka, can we have a Scala 2.10 bundle.
 
  If there isn't an official bundle arriving any time soon, can someone
 who has built it for Windows 8.1 successfully please share with the group?
 
  Thanks,
  arun
 





Re: Need some guidance

2015-04-13 Thread Dean Wampler
That appears to work, with a few changes to get the types correct:

input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) =
agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com wrote:

 How about this?

 input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1,
 (agg1: Int, agg2: Int) = agg1 + agg2).collect()

 On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 The problem with using collect is that it will fail for large data sets,
 as you'll attempt to copy the entire RDD to the memory of your driver
 program. The following works (Scala syntax, but similar to Python):

 scala val i1 = input.distinct.groupByKey
 scala i1.foreach(println)
 (1,CompactBuffer(beta, alpha, foo))
 (3,CompactBuffer(foo))
 (2,CompactBuffer(alpha, bar))

 scala val i2 = i1.map(tup = (tup._1, tup._2.size))
 scala i1.foreach(println)
 (1,3)
 (3,1)
 (2,2)

 The i2 line passes a function that takes a tuple argument, then
 constructs a new output tuple with the first element and the size of the
 second (each CompactBuffer). An alternative pattern match syntax would be.

 scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

 This should work as long as none of the CompactBuffers are too large,
 which could happen for extremely large data sets.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com
 wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark
 effectively?  I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco






Re: Need some guidance

2015-04-13 Thread Dean Wampler
The problem with using collect is that it will fail for large data sets, as
you'll attempt to copy the entire RDD to the memory of your driver program.
The following works (Scala syntax, but similar to Python):

scala val i1 = input.distinct.groupByKey
scala i1.foreach(println)
(1,CompactBuffer(beta, alpha, foo))
(3,CompactBuffer(foo))
(2,CompactBuffer(alpha, bar))

scala val i2 = i1.map(tup = (tup._1, tup._2.size))
scala i1.foreach(println)
(1,3)
(3,1)
(2,2)

The i2 line passes a function that takes a tuple argument, then
constructs a new output tuple with the first element and the size of the
second (each CompactBuffer). An alternative pattern match syntax would be.

scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

This should work as long as none of the CompactBuffers are too large, which
could happen for extremely large data sets.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark effectively?
 I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco



Re: task not serialize

2015-04-07 Thread Dean Wampler
Foreach() runs in parallel across the cluster, like map, flatMap, etc.
You'll only run into problems if you call collect(), which brings the
entire RDD into memory in the driver program.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 7, 2015 at 3:50 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Lets say I follow below approach and I got RddPair with huge size .. which
 can not fit into one machine ... what to run foreach on this RDD?

 On 7 April 2015 at 04:25, Jeetendra Gangele gangele...@gmail.com wrote:



 On 7 April 2015 at 04:03, Dean Wampler deanwamp...@gmail.com wrote:


 On Mon, Apr 6, 2015 at 6:20 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Thanks a lot.That means Spark does not support the nested RDD?
 if I pass the javaSparkContext that also wont work. I mean passing
 SparkContext not possible since its not serializable

 That's right. RDD don't nest and SparkContexts aren't serializable.


 i have a requirement where I will get JavaRDDVendorRecord matchRdd
 and I need to return the postential matches for this record from Hbase. so
 for each field of VendorRecord I have to do following

 1. query Hbase to get the list of potential record in RDD
 2. run logistic regression on RDD return from steps 1 and each element
 of the passed matchRdd.

 If I understand you correctly, each VectorRecord could correspond to
 0-to-N records in HBase, which you need to fetch, true?

  yes thats correct each Vendorrecord corresponds to 0 to N matches


 If so, you could use the RDD flatMap method, which takes a function a
 that accepts each record, then returns a sequence of 0-to-N new records of
 some other type, like your HBase records. However, running an HBase query
 for each VendorRecord could be expensive. If you can turn this into a range
 query or something like that, it would help. I haven't used HBase much, so
 I don't have good advice on optimizing this, if necessary.

 Alternatively, can you do some sort of join on the VendorRecord RDD and
 an RDD of query results from HBase?

  Join will give too big result RDD of query result is returning around
 1 for each record and i have 2 millions to process so it will be huge
 to have this. 2 m*1 big number


 For #2, it sounds like you need flatMap to return records that combine
 the input VendorRecords and fields pulled from HBase.

 Whatever you can do to make this work like table scans and joins will
 probably be most efficient.

 dean




 On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote:

 The log instance won't be serializable, because it will have a file
 handle to write to. Try defining another static method outside
 matchAndMerge that encapsulates the call to log.error. 
 CompanyMatcherHelper
 might not be serializable either, but you didn't provide it. If it holds a
 database connection, same problem.

 You can't suppress the warning because it's actually an error. The
 VoidFunction can't be serialized to send it over the cluster's network.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 In this code in foreach I am getting task not serialized exception


 @SuppressWarnings(serial)
 public static  void  matchAndMerge(JavaRDDVendorRecord matchRdd,
  final JavaSparkContext jsc) throws IOException{
 log.info(Company matcher started);
 //final JavaSparkContext jsc = getSparkContext();
   matchRdd.foreachAsync(new VoidFunctionVendorRecord(){
 @Override
 public void call(VendorRecord t) throws Exception {
  if(t !=null){
 try{
 CompanyMatcherHelper.UpdateMatchedRecord(jsc,t);
  } catch (Exception e) {
 log.error(ERROR while running Matcher for company  +
 t.getCompanyId(), e);
 }
 }
  }
 });

  }














 -



Re: FlatMapPair run for longer time

2015-04-07 Thread Dean Wampler
It's hard for us to diagnose your performance problems, because we don't
have your environment and fixing one will simply reveal the next one to be
fixed. So, I suggest you use the following strategy to figure out what
takes the most time and hence what you might try to optimize. Try replacing
expressions that might be expensive with stubs. Your calls to HBase for
example. What happens if your replace the call with fake, hard-coded data?
Does performance improve dramatically? If so, then optimize how you query
HBase. If it makes no significant difference, try something else.

Also try looking at the Spark source code to understand what happens under
the hood. At this point, your best bet is to develop your intuition about
the performance overhead of various constructs in real-world scenarios and
how Spark distributes computation. Then you'll find it easier to know what
to optimize. You'll want to understand what happens in flatMap, filter,
join, groupBy, reduce, etc.

Don't forget this guide, too:
https://spark.apache.org/docs/latest/tuning.html. The Learning Spark book
from O'Reilly is also really helpful.

I also recommend that you switch to Java 8 and Lambdas, or go all the way
to Scala, so all that noisy code shrinks down to simpler expressions.
You'll be surprised how helpful that is for comprehending your code and
reasoning about it.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 7, 2015 at 12:54 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Hi All I am running the below code and its running for very long time
 where input to flatMapTopair is record of 50K. and I am calling Hbase for
 50K times just a range scan query to should not take time. can anybody
 guide me what is wrong here?

 JavaPairRDDVendorRecord, IterableVendorRecord pairvendorData
 =matchRdd.flatMapToPair( new PairFlatMapFunctionVendorRecord,
 VendorRecord, VendorRecord(){

 @Override
 public IterableTuple2VendorRecord,VendorRecord call(
 VendorRecord t) throws Exception {
 ListTuple2VendorRecord, VendorRecord pairs = new
 LinkedListTuple2VendorRecord, VendorRecord();
 MatcherKeys matchkeys=CompanyMatcherHelper.getBlockinkeys(t);
 ListVendorRecord Matchedrecords
 =ckdao.getMatchingRecordsWithscan(matchkeys);
 for(int i=0;iMatchedrecords.size();i++){
 pairs.add( new Tuple2VendorRecord,VendorRecord(t,Matchedrecords.get(i)));
 }
  return pairs;
 }
  }
 ).groupByKey(200).persist(StorageLevel.DISK_ONLY_2());



Re: task not serialize

2015-04-06 Thread Dean Wampler
On Mon, Apr 6, 2015 at 6:20 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Thanks a lot.That means Spark does not support the nested RDD?
 if I pass the javaSparkContext that also wont work. I mean passing
 SparkContext not possible since its not serializable

 ​That's right. RDD don't nest and SparkContexts aren't serializable.
​


 i have a requirement where I will get JavaRDDVendorRecord matchRdd and
 I need to return the postential matches for this record from Hbase. so for
 each field of VendorRecord I have to do following

 1. query Hbase to get the list of potential record in RDD
 2. run logistic regression on RDD return from steps 1 and each element of
 the passed matchRdd.

 ​If I understand you correctly, each VectorRecord could correspond to
0-to-N records in HBase, which you need to fetch, true?

If so, you could use the RDD flatMap method, which take​s a function a that
accepts each record, then returns a sequence of 0-to-N new records of some
other type, like your HBase records. However, running an HBase query for
each VendorRecord could be expensive. If you can turn this into a range
query or something like that, it would help. I haven't used HBase much, so
I don't have good advice on optimizing this, if necessary.

Alternatively, can you do some sort of join on the VendorRecord RDD and an
RDD of query results from HBase?

For #2, it sounds like you need flatMap to return records that combine the
input VendorRecords and fields pulled from HBase.

Whatever you can do to make this work like table scans and joins will
probably be most efficient.

dean




 On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote:

 The log instance won't be serializable, because it will have a file
 handle to write to. Try defining another static method outside
 matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper
 might not be serializable either, but you didn't provide it. If it holds a
 database connection, same problem.

 You can't suppress the warning because it's actually an error. The
 VoidFunction can't be serialized to send it over the cluster's network.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 In this code in foreach I am getting task not serialized exception


 @SuppressWarnings(serial)
 public static  void  matchAndMerge(JavaRDDVendorRecord matchRdd,
  final JavaSparkContext jsc) throws IOException{
 log.info(Company matcher started);
 //final JavaSparkContext jsc = getSparkContext();
   matchRdd.foreachAsync(new VoidFunctionVendorRecord(){
 @Override
 public void call(VendorRecord t) throws Exception {
  if(t !=null){
 try{
 CompanyMatcherHelper.UpdateMatchedRecord(jsc,t);
  } catch (Exception e) {
 log.error(ERROR while running Matcher for company  + t.getCompanyId(),
 e);
 }
 }
  }
 });

  }









Re: task not serialize

2015-04-06 Thread Dean Wampler
The log instance won't be serializable, because it will have a file
handle to write to. Try defining another static method outside
matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper
might not be serializable either, but you didn't provide it. If it holds a
database connection, same problem.

You can't suppress the warning because it's actually an error. The
VoidFunction can't be serialized to send it over the cluster's network.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 In this code in foreach I am getting task not serialized exception


 @SuppressWarnings(serial)
 public static  void  matchAndMerge(JavaRDDVendorRecord matchRdd,  final
 JavaSparkContext jsc) throws IOException{
 log.info(Company matcher started);
 //final JavaSparkContext jsc = getSparkContext();
   matchRdd.foreachAsync(new VoidFunctionVendorRecord(){
 @Override
 public void call(VendorRecord t) throws Exception {
  if(t !=null){
 try{
 CompanyMatcherHelper.UpdateMatchedRecord(jsc,t);
  } catch (Exception e) {
 log.error(ERROR while running Matcher for company  + t.getCompanyId(),
 e);
 }
 }
  }
 });

  }



Re: conversion from java collection type to scala JavaRDDObject

2015-04-05 Thread Dean Wampler
The runtime attempts to serialize everything required by records, and also
any lambdas/closures you use. Small, simple types are less likely to run
into this problem.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, Apr 5, 2015 at 4:21 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 You are right I have class called VendorRecord which is not serializable
 also this class object have many sub classed(may be 30 or more).Do I need
 to recursively serialize all?



 On 4 April 2015 at 18:14, Dean Wampler deanwamp...@gmail.com wrote:

 Without the rest of your code, it's hard to know what might be
 unserializable.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sat, Apr 4, 2015 at 7:56 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:


 Hi I have tried with parallelize but i got the below exception

 java.io.NotSerializableException: pacific.dr.VendorRecord

 Here is my code

 ListVendorRecord
 vendorRecords=blockingKeys.getMatchingRecordsWithscan(matchKeysOutput);
 JavaRDDVendorRecord lines = sc.parallelize(vendorRecords)


 On 2 April 2015 at 21:11, Dean Wampler deanwamp...@gmail.com wrote:

 Use JavaSparkContext.parallelize.


 http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#parallelize(java.util.List)

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 2, 2015 at 11:33 AM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Hi All
 Is there an way to make the JavaRDDObject from existing java
 collection type ListObject?
 I know this can be done using scala , but i am looking how to do this
 using java.


 Regards
 Jeetendra













Re: conversion from java collection type to scala JavaRDDObject

2015-04-04 Thread Dean Wampler
Without the rest of your code, it's hard to know what might be
unserializable.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sat, Apr 4, 2015 at 7:56 AM, Jeetendra Gangele gangele...@gmail.com
wrote:


 Hi I have tried with parallelize but i got the below exception

 java.io.NotSerializableException: pacific.dr.VendorRecord

 Here is my code

 ListVendorRecord
 vendorRecords=blockingKeys.getMatchingRecordsWithscan(matchKeysOutput);
 JavaRDDVendorRecord lines = sc.parallelize(vendorRecords)


 On 2 April 2015 at 21:11, Dean Wampler deanwamp...@gmail.com wrote:

 Use JavaSparkContext.parallelize.


 http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#parallelize(java.util.List)

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 2, 2015 at 11:33 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Hi All
 Is there an way to make the JavaRDDObject from existing java
 collection type ListObject?
 I know this can be done using scala , but i am looking how to do this
 using java.


 Regards
 Jeetendra








Re: UNRESOLVED DEPENDENCIES while building Spark 1.3.0

2015-04-04 Thread Dean Wampler
Use the MVN build instead. From the README in the git repo (
https://github.com/apache/spark)

mvn -DskipTests clean package



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sat, Apr 4, 2015 at 4:39 PM, mas mas.ha...@gmail.com wrote:

 Hi All,
 I am trying to build spark 1.3.O on standalone Ubuntu 14.04. I am using the
 sbt command i.e. sbt/sbt assembly to build it. This command works pretty
 good with spark version 1.1 however, it gives following error with spark
 1.3.0. Any help or suggestions to resolve this would highly be appreciated.

 [info] Done updating.
 [info] Updating {file:/home/roott/aamirTest/spark/}network-shuffle...
 [info] Resolving org.fusesource.jansi#jansi;1.4 ...
 [warn]  ::
 [warn]  ::  UNRESOLVED DEPENDENCIES ::
 [warn]  ::
 [warn]  :: org.apache.spark#spark-network-common_2.10;1.3.0: configuration
 not p
 ublic in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was
 requir
 ed from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test
 [warn]  ::
 [warn]
 [warn]  Note: Unresolved dependencies path:
 [warn]  org.apache.spark:spark-network-common_2.10:1.3.0
 ((com.typesafe.
 sbt.pom.MavenHelper) MavenHelper.scala#L76)
 [warn]+- org.apache.spark:spark-network-shuffle_2.10:1.3.0
 sbt.ResolveException: unresolved dependency:
 org.apache.spark#spark-network-comm
 on_2.10;1.3.0: configuration not public in
 org.apache.spark#spark-network-common
 _2.10;1.3.0: 'test'. It was required from
 org.apache.spark#spark-network-shuffle
 _2.10;1.3.0 test
 at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:278)
 at
 sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:175)
 at
 sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:157)
 at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151)
 at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151)
 at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:128)
 at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:56)
 at sbt.IvySbt$$anon$4.call(Ivy.scala:64)
 at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
 at
 xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRet
 ries$1(Locks.scala:78)
 at
 xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:
 97)
 at xsbt.boot.Using$.withResource(Using.scala:10)
 at xsbt.boot.Using$.apply(Using.scala:9)
 at
 xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
 at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
 at xsbt.boot.Locks$.apply0(Locks.scala:31)
 at xsbt.boot.Locks$.apply(Locks.scala:28)
 at sbt.IvySbt.withDefaultLogger(Ivy.scala:64)
 at sbt.IvySbt.withIvy(Ivy.scala:123)
 at sbt.IvySbt.withIvy(Ivy.scala:120)
 at sbt.IvySbt$Module.withModule(Ivy.scala:151)
 at sbt.IvyActions$.updateEither(IvyActions.scala:157)
 at
 sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala
 :1318)
 at
 sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala
 :1315)
 at
 sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1
 345)
 at
 sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1
 343)
 at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35)
 at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1348)
 at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1342)
 at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45)
 at sbt.Classpaths$.cachedUpdate(Defaults.scala:1360)
 at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1300)
 at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1275)
 at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
 at
 sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
 at sbt.std.Transform$$anon$4.work(System.scala:63)
 at
 sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:22
 6)
 at
 sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:22
 6)
 at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
 at sbt.Execute.work(Execute.scala:235)
 at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
 at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
 at
 sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestric
 tions.scala:159)
 at sbt.CompletionService$$anon$2.call(CompletionService.scala:28

Re: ArrayBuffer within a DataFrame

2015-04-03 Thread Dean Wampler
A hack workaround is to use flatMap:

rdd.flatMap{ case (date, array) = for (x - array) yield (date, x) }

For those of you who don't know Scala, the for comprehension iterates
through the ArrayBuffer, named array and yields new tuples with the date
and each element. The case expression to the left of the = pattern matches
on the input tuples.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 2, 2015 at 10:45 PM, Denny Lee denny.g@gmail.com wrote:

 Thanks Michael - that was it!  I was drawing a blank on this one for some
 reason - much appreciated!


 On Thu, Apr 2, 2015 at 8:27 PM Michael Armbrust mich...@databricks.com
 wrote:

 A lateral view explode using HiveQL.  I'm hopping to add explode
 shorthand directly to the df API in 1.4.

 On Thu, Apr 2, 2015 at 7:10 PM, Denny Lee denny.g@gmail.com wrote:

 Quick question - the output of a dataframe is in the format of:

 [2015-04, ArrayBuffer(A, B, C, D)]

 and I'd like to return it as:

 2015-04, A
 2015-04, B
 2015-04, C
 2015-04, D

 What's the best way to do this?

 Thanks in advance!






Re: Reading a large file (binary) into RDD

2015-04-03 Thread Dean Wampler
This might be overkill for your needs, but the scodec parser combinator
library might be useful for creating a parser.

https://github.com/scodec/scodec

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 2, 2015 at 6:53 PM, java8964 java8...@hotmail.com wrote:

 I think implementing your own InputFormat and using
 SparkContext.hadoopFile() is the best option for your case.

 Yong

 --
 From: kvi...@vt.edu
 Date: Thu, 2 Apr 2015 17:31:30 -0400
 Subject: Re: Reading a large file (binary) into RDD
 To: freeman.jer...@gmail.com
 CC: user@spark.apache.org


 The file has a specific structure. I outline it below.

 The input file is basically a representation of a graph.

 INT
 INT(A)
 LONG (B)
 A INTs(Degrees)
 A SHORTINTs  (Vertex_Attribute)
 B INTs
 B INTs
 B SHORTINTs
 B SHORTINTs

 A - number of vertices
 B - number of edges (note that the INTs/SHORTINTs associated with this are
 edge attributes)

 After reading in the file, I need to create two RDDs (one with vertices
 and the other with edges)

 On Thu, Apr 2, 2015 at 4:46 PM, Jeremy Freeman freeman.jer...@gmail.com
 wrote:

 Hm, that will indeed be trickier because this method assumes records are
 the same byte size. Is the file an arbitrary sequence of mixed types, or is
 there structure, e.g. short, long, short, long, etc.?

 If you could post a gist with an example of the kind of file and how it
 should look once read in that would be useful!

 -
 jeremyfreeman.net
 @thefreemanlab

 On Apr 2, 2015, at 2:09 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 Thanks for the reply. Unfortunately, in my case, the binary file is a mix
 of short and long integers. Is there any other way that could of use here?

 My current method happens to have a large overhead (much more than actual
 computation time). Also, I am short of memory at the driver when it has to
 read the entire file.

 On Thu, Apr 2, 2015 at 1:44 PM, Jeremy Freeman freeman.jer...@gmail.com
 wrote:

 If it’s a flat binary file and each record is the same length (in bytes),
 you can use Spark’s binaryRecords method (defined on the SparkContext),
 which loads records from one or more large flat binary files into an RDD.
 Here’s an example in python to show how it works:

 # write data from an array
 from numpy import random
 dat = random.randn(100,5)
 f = open('test.bin', 'w')
 f.write(dat)
 f.close()


 # load the data back in

 from numpy import frombuffer

 nrecords = 5
 bytesize = 8
 recordsize = nrecords * bytesize
 data = sc.binaryRecords('test.bin', recordsize)
 parsed = data.map(lambda v: frombuffer(buffer(v, 0, recordsize), 'float'))


 # these should be equal
 parsed.first()
 dat[0,:]


 Does that help?

 -
 jeremyfreeman.net
 @thefreemanlab

 On Apr 2, 2015, at 1:33 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 What are some efficient ways to read a large file into RDDs?

 For example, have several executors read a specific/unique portion of the
 file and construct RDDs. Is this possible to do in Spark?

 Currently, I am doing a line-by-line read of the file at the driver and
 constructing the RDD.








Re: Error in SparkSQL/Scala IDE

2015-04-02 Thread Dean Wampler
It failed to find the class class org.apache.spark.sql.catalyst.ScalaReflection
 in the Spark SQL library. Make sure it's in the classpath and the version
is correct, too.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 2, 2015 at 8:39 AM, Sathish Kumaran Vairavelu 
vsathishkuma...@gmail.com wrote:

 Hi Everyone,

 I am getting following error while registering table using Scala IDE.
 Please let me know how to resolve this error. I am using Spark 1.2.1

   import sqlContext.createSchemaRDD



   val empFile = sc.textFile(/tmp/emp.csv, 4)

   .map ( _.split(,) )

   .map( row= Employee(row(0),row(1), row(2), row(3), row(
 4)))

   empFile.registerTempTable(Employees)

 Thanks

 Sathish

 Exception in thread main scala.reflect.internal.MissingRequirementError:
 class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with
 primordial classloader with boot classpath
 [/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-library.jar:/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-reflect.jar:/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-actor.jar:/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-swing.jar:/Applications/eclipse/plugins/org.scala-ide.scala210.jars_4.0.0.201412161056/target/jars/scala-compiler.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_65.jdk/Contents/Home/jre/classes]
 not found.

 at scala.reflect.internal.MissingRequirementError$.signal(
 MissingRequirementError.scala:16)

 at scala.reflect.internal.MissingRequirementError$.notFound(
 MissingRequirementError.scala:17)

 at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(
 Mirrors.scala:48)

 at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(
 Mirrors.scala:61)

 at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(
 Mirrors.scala:72)

 at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)

 at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)

 at org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(
 ScalaReflection.scala:115)

 at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(
 TypeTags.scala:231)

 at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)

 at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:335)

 at scala.reflect.api.Universe.typeOf(Universe.scala:59)

 at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(
 ScalaReflection.scala:115)

 at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
 ScalaReflection.scala:33)

 at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(
 ScalaReflection.scala:100)

 at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
 ScalaReflection.scala:33)

 at org.apache.spark.sql.catalyst.ScalaReflection$class.attributesFor(
 ScalaReflection.scala:94)

 at org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(
 ScalaReflection.scala:33)

 at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:111)

 at com.svairavelu.examples.QueryCSV$.main(QueryCSV.scala:24)

  at com.svairavelu.examples.QueryCSV.main(QueryCSV.scala)



Re: How to learn Spark ?

2015-04-02 Thread Dean Wampler
I have a self-study workshop here:

https://github.com/deanwampler/spark-workshop

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 2, 2015 at 8:33 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com
 wrote:

 You can start with http://spark.apache.org/docs/1.3.0/index.html

 Also get the Learning Spark book http://amzn.to/1NDFI5x. It's great.

 Enjoy!

 Vadim
 ᐧ

 On Thu, Apr 2, 2015 at 4:19 AM, Star Guo st...@ceph.me wrote:

 Hi, all



 I am new to here. Could you give me some suggestion to learn Spark ?
 Thanks.



 Best Regards,

 Star Guo





Re: workers no route to host

2015-04-02 Thread Dean Wampler
It appears you are using a Cloudera Spark build, 1.3.0-cdh5.4.0-SNAPSHOT,
which expects to find the hadoop command:

/data/PlatformDep/cdh5/dist/bin/compute-classpath.sh: line 164: hadoop:
command not found

If you don't want to use Hadoop, download one of the pre-built Spark
releases from spark.apache.org. Even the Hadoop builds there will work
okay, as they don't actually attempt to run Hadoop commands.


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Mar 31, 2015 at 3:12 AM, ZhuGe t...@outlook.com wrote:

 Hi,
 i set up a standalone cluster of 5 machines(tmaster, tslave1,2,3,4) with
 spark-1.3.0-cdh5.4.0-snapshort.
 when i execute the sbin/start-all.sh, the master is ok, but i cant see the
 web ui. Moreover, the worker logs is something like this:

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 /data/PlatformDep/cdh5/dist/bin/compute-classpath.sh: line 164: hadoop:
 command not found
 Spark Command: java -cp
 :/data/PlatformDep/cdh5/dist/sbin/../conf:/data/PlatformDep/cdh5/dist/lib/spark-assembly-1.3.0-cdh5.4.0-SNAPSHOT-hadoop2.6.0-cdh5.4.0-SNAPSHOT.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-rdbms-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-api-jdo-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-core-3.2.2.jar:
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m
 org.apache.spark.deploy.worker.Worker spark://192.168.128.16:7071
 --webui-port 8081
 

 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/03/31 06:47:22 INFO Worker: Registered signal handlers for [TERM, HUP,
 INT]
 15/03/31 06:47:23 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/03/31 06:47:23 INFO SecurityManager: Changing view acls to: dcadmin
 15/03/31 06:47:23 INFO SecurityManager: Changing modify acls to: dcadmin
 15/03/31 06:47:23 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(dcadmin);
 users with modify permissions: Set(dcadmin)
 15/03/31 06:47:23 INFO Slf4jLogger: Slf4jLogger started
 15/03/31 06:47:23 INFO Remoting: Starting remoting
 15/03/31 06:47:23 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkWorker@tslave2:60815]
 15/03/31 06:47:24 INFO Utils: Successfully started service 'sparkWorker'
 on port 60815.
 15/03/31 06:47:24 INFO Worker: Starting Spark worker tslave2:60815 with 2
 cores, 3.0 GB RAM
 15/03/31 06:47:24 INFO Worker: Running Spark version 1.3.0
 15/03/31 06:47:24 INFO Worker: Spark home: /data/PlatformDep/cdh5/dist
 15/03/31 06:47:24 INFO Server: jetty-8.y.z-SNAPSHOT
 15/03/31 06:47:24 INFO AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:8081
 15/03/31 06:47:24 INFO Utils: Successfully started service 'WorkerUI' on
 port 8081.
 15/03/31 06:47:24 INFO WorkerWebUI: Started WorkerWebUI at
 http://tslave2:8081
 15/03/31 06:47:24 INFO Worker: Connecting to master akka.tcp://
 sparkMaster@192.168.128.16:7071/user/Master...
 15/03/31 06:47:24 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp://
 sparkMaster@192.168.128.16:7071]: Error [Association failed with
 [akka.tcp://sparkMaster@192.168.128.16:7071]] [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkMaster@192.168.128.16:7071]
 Caused by:
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No
 route to host
 ]
 15/03/31 06:47:24 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp://
 sparkMaster@192.168.128.16:7071]: Error [Association failed with
 [akka.tcp://sparkMaster@192.168.128.16:7071]] [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkMaster@192.168.128.16:7071]
 Caused by:
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No
 route to host
 ]
 15/03/31 06:47:24 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp://
 sparkMaster@192.168.128.16:7071]: Error [Association failed with
 [akka.tcp://sparkMaster@192.168.128.16:7071]] [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkMaster@192.168.128.16:7071]
 Caused by:
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No
 route to host
 ]
 15/03/31 06:47:24 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp://
 sparkMaster@192.168.128.16:7071]: Error [Association failed with
 [akka.tcp://sparkMaster@192.168.128.16:7071]] [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkMaster@192.168.128.16:7071]



 the worker machines

Re: conversion from java collection type to scala JavaRDDObject

2015-04-02 Thread Dean Wampler
Use JavaSparkContext.parallelize.

http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#parallelize(java.util.List)

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 2, 2015 at 11:33 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Hi All
 Is there an way to make the JavaRDDObject from existing java collection
 type ListObject?
 I know this can be done using scala , but i am looking how to do this
 using java.


 Regards
 Jeetendra



Re: Spark Streaming Error in block pushing thread

2015-04-02 Thread Dean Wampler
Are you allocating 1 core per input stream plus additional cores for the
rest of the processing? Each input stream Reader requires a dedicated core.
So, if you have two input streams, you'll need local[3] at least.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 2, 2015 at 11:45 AM, byoung bill.yo...@threatstack.com wrote:

 I am running a spark streaming stand-alone cluster, connected to rabbitmq
 endpoint(s). The application will run for 20-30 minutes before failing with
 the following error:

 WARN 2015-04-01 21:00:53,944
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove
 RDD 22 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,944
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove
 RDD 23 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,951
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove
 RDD 20 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,951
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove
 RDD 19 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,952
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove
 RDD 18 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,952
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove
 RDD 17 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,952
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to remove
 RDD 16 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:54,151
 org.apache.spark.streaming.scheduler.ReceiverTracker.logWarning.71: Error
 reported by receiver for stream 0: Error in block pushing thread -
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at

 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at

 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:166)
 at

 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:127)
 at

 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:112)
 at

 org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:182)
 at
 org.apache.spark.streaming.receiver.BlockGenerator.org
 $apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:155)
 at

 org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:87)


 Has anyone run into this before?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Error-in-block-pushing-thread-tp22356.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark 1.3.0 DataFrame count() method throwing java.io.EOFException

2015-04-02 Thread Dean Wampler
To clarify one thing, is count() the first action (
http://spark.apache.org/docs/latest/programming-guide.html#actions) you're
attempting? As defined in the programming guide, an action forces
evaluation of the pipeline of RDDs. It's only then that reading the data
actually occurs. So, count() might not be the issue, but some upstream step
that attempted to read the file.

As a sanity check, if you just read the text file and don't convert the
strings, then call count(), does that work? If so, it might be something
about your JavaBean BERecord after all. Can you post its definition?

Also calling take(1) to grab the first element should also work, even if
the RDD is empty. (It will return an empty RDD in that case, but not throw
an exception.)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 2, 2015 at 10:16 AM, Ashley Rose ashley.r...@telarix.com
wrote:

  That’s precisely what I was trying to check. It should have 42577
 records in it, because that’s how many there were in the text file I read
 in.



 // Load a text file and convert each line to a JavaBean.

 JavaRDDString lines = sc.textFile(file.txt);



 JavaRDDBERecord tbBER = lines.map(s - convertToBER(s));



 // Apply a schema to an RDD of JavaBeans and register it as a
 table.

 schemaBERecords = sqlContext.createDataFrame(tbBER,
 BERecord.class);

 schemaBERecords.registerTempTable(tbBER);



 The BERecord class is a standard Java Bean that implements Serializable,
 so that shouldn’t be the issue. As you said, count() shouldn’t fail like
 this even if the table was empty. I was able to print out the schema of the
 DataFrame just fine with df.printSchema(), and I just wanted to see if data
 was populated correctly.



 *From:* Dean Wampler [mailto:deanwamp...@gmail.com]
 *Sent:* Wednesday, April 01, 2015 6:05 PM
 *To:* Ashley Rose
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark 1.3.0 DataFrame count() method throwing
 java.io.EOFException



 Is it possible tbBER is empty? If so, it shouldn't fail like this, of
 course.


   Dean Wampler, Ph.D.

 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)

 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler

 http://polyglotprogramming.com



 On Wed, Apr 1, 2015 at 5:57 PM, ARose ashley.r...@telarix.com wrote:

 Note: I am running Spark on Windows 7 in standalone mode.

 In my app, I run the following:

 DataFrame df = sqlContext.sql(SELECT * FROM tbBER);
 System.out.println(Count:  + df.count());

 tbBER is registered as a temp table in my SQLContext. When I try to print
 the number of rows in the DataFrame, the job fails and I get the following
 error message:

 java.io.EOFException
 at

 java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747)
 at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033)
 at

 org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
 at
 org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
 at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
 at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
 at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
 at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
 at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66)
 at

 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
 at

 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:483)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351

Re: How to learn Spark ?

2015-04-02 Thread Dean Wampler
You're welcome. Two limitations to know about:

1. I haven't updated it to 1.3
2. It uses Scala for all examples (my bias ;), so less useful if you don't
want to use Scala.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 2, 2015 at 12:54 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 Thanks Dean. This is great.
 ᐧ

 On Thu, Apr 2, 2015 at 9:01 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I have a self-study workshop here:

 https://github.com/deanwampler/spark-workshop

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fshop.oreilly.com%2Fproduct%2F0636920033073.dosi=5533377798602752pi=4b4c247b-b7e9-4031-81d5-9b9a8f5f1963
  (O'Reilly)
 Typesafe
 http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Ftypesafe.com%2Fsi=5533377798602752pi=4b4c247b-b7e9-4031-81d5-9b9a8f5f1963
 @deanwampler
 http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Ftwitter.com%2Fdeanwamplersi=5533377798602752pi=4b4c247b-b7e9-4031-81d5-9b9a8f5f1963
 http://polyglotprogramming.com

 On Thu, Apr 2, 2015 at 8:33 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 You can start with http://spark.apache.org/docs/1.3.0/index.html

 Also get the Learning Spark book http://amzn.to/1NDFI5x. It's great.

 Enjoy!

 Vadim

 On Thu, Apr 2, 2015 at 4:19 AM, Star Guo st...@ceph.me wrote:

 Hi, all



 I am new to here. Could you give me some suggestion to learn Spark ?
 Thanks.



 Best Regards,

 Star Guo







Re: Spark Streaming Error in block pushing thread

2015-04-02 Thread Dean Wampler
I misread that you're running in standalone mode, so ignore the local[3]
example ;)  How many separate readers are listening to rabbitmq topics?

This might not be the problem, but I'm just eliminating possibilities.
Another possibility is that the in-bound data rate exceeds your ability to
process it. What's your streaming batch window size?

See also here for ideas:
http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 2, 2015 at 10:57 AM, Bill Young bill.yo...@threatstack.com
wrote:

 Sorry for the obvious typo, I have 4 workers with 16 cores total*

 On Thu, Apr 2, 2015 at 11:56 AM, Bill Young bill.yo...@threatstack.com
 wrote:

 Thank you for the response, Dean. There are 2 worker nodes, with 8 cores
 total, attached to the stream. I have the following settings applied:

 spark.executor.memory 21475m
 spark.cores.max 16
 spark.driver.memory 5235m


 On Thu, Apr 2, 2015 at 11:50 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Are you allocating 1 core per input stream plus additional cores for the
 rest of the processing? Each input stream Reader requires a dedicated core.
 So, if you have two input streams, you'll need local[3] at least.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 2, 2015 at 11:45 AM, byoung bill.yo...@threatstack.com
 wrote:

 I am running a spark streaming stand-alone cluster, connected to
 rabbitmq
 endpoint(s). The application will run for 20-30 minutes before failing
 with
 the following error:

 WARN 2015-04-01 21:00:53,944
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to
 remove
 RDD 22 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,944
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to
 remove
 RDD 23 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,951
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to
 remove
 RDD 20 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,951
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to
 remove
 RDD 19 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,952
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to
 remove
 RDD 18 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,952
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to
 remove
 RDD 17 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:53,952
 org.apache.spark.storage.BlockManagerMaster.logWarning.71: Failed to
 remove
 RDD 16 - Ask timed out on
 [Actor[akka.tcp://
 sparkExecutor@10.1.242.221:43018/user/BlockManagerActor1#-1913092216]]
 after [3 ms]}
 WARN 2015-04-01 21:00:54,151
 org.apache.spark.streaming.scheduler.ReceiverTracker.logWarning.71:
 Error
 reported by receiver for stream 0: Error in block pushing thread -
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at

 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at

 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:166)
 at

 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:127)
 at

 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:112)
 at

 org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:182)
 at
 org.apache.spark.streaming.receiver.BlockGenerator.org
 $apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:155)
 at

 org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:87)


 Has

Re: Spark 1.3.0 DataFrame count() method throwing java.io.EOFException

2015-04-01 Thread Dean Wampler
Is it possible tbBER is empty? If so, it shouldn't fail like this, of
course.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Apr 1, 2015 at 5:57 PM, ARose ashley.r...@telarix.com wrote:

 Note: I am running Spark on Windows 7 in standalone mode.

 In my app, I run the following:

 DataFrame df = sqlContext.sql(SELECT * FROM tbBER);
 System.out.println(Count:  + df.count());

 tbBER is registered as a temp table in my SQLContext. When I try to print
 the number of rows in the DataFrame, the job fails and I get the following
 error message:

 java.io.EOFException
 at

 java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747)
 at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033)
 at

 org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
 at
 org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
 at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
 at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
 at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
 at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
 at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66)
 at

 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
 at

 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:483)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at

 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
 at

 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 This only happens when I try to call df.count(). The rest runs fine. Is the
 count() function not supported in standalone mode? The stack trace makes it
 appear to be Hadoop functionality...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-DataFrame-count-method-throwing-java-io-EOFException-tp22344.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems

2015-03-25 Thread Dean Wampler
For the Spark SQL parts, 1.3 breaks backwards compatibility, because before
1.3, Spark SQL was considered experimental where API changes were allowed.

So, H2O and ADA compatible with 1.2.X might not work with 1.3.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Mar 25, 2015 at 9:39 AM, roni roni.epi...@gmail.com wrote:

 Even if H2o and ADA are dependent on 1.2.1 , it should be backword
 compatible, right?
 So using 1.3 should not break them.
 And the code is not using the classes from those libs.
 I tried sbt clean compile .. same errror
 Thanks
 _R

 On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 What version of Spark do the other dependencies rely on (Adam and H2O?) -
 that could be it

 Or try sbt clean compile

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote:

 I have a EC2 cluster created using spark version 1.2.1.
 And I have a SBT project .
 Now I want to upgrade to spark 1.3 and use the new features.
 Below are issues .
 Sorry for the long post.
 Appreciate your help.
 Thanks
 -Roni

 Question - Do I have to create a new cluster using spark 1.3?

 Here is what I did -

 In my SBT file I  changed to -
 libraryDependencies += org.apache.spark %% spark-core % 1.3.0

 But then I started getting compilation error. along with
 Here are some of the libraries that were evicted:
 [warn]  * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0
 [warn]  * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) -
 2.6.0
 [warn] Run 'evicted' to see detailed eviction warnings

  constructor cannot be instantiated to expected type;
 [error]  found   : (T1, T2)
 [error]  required: org.apache.spark.sql.catalyst.expressions.Row
 [error] val ty = joinRDD.map{case(word,
 (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)}
 [error]  ^

 Here is my SBT and code --
 SBT -

 version := 1.0

 scalaVersion := 2.10.4

 resolvers += Sonatype OSS Snapshots at 
 https://oss.sonatype.org/content/repositories/snapshots;;
 resolvers += Maven Repo1 at https://repo1.maven.org/maven2;;
 resolvers += Maven Repo at 
 https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/;;

 /* Dependencies - %% appends Scala version to artifactId */
 libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0
 libraryDependencies += org.apache.spark %% spark-core % 1.3.0
 libraryDependencies += org.bdgenomics.adam % adam-core % 0.16.0
 libraryDependencies += ai.h2o % sparkling-water-core_2.10 % 0.2.10


 CODE --
 import org.apache.spark.{SparkConf, SparkContext}
 case class KmerIntesect(kmer: String, kCount: Int, fileName: String)

 object preDefKmerIntersection {
   def main(args: Array[String]) {

  val sparkConf = new SparkConf().setAppName(preDefKmer-intersect)
  val sc = new SparkContext(sparkConf)
 import sqlContext.createSchemaRDD
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 val bedFile = sc.textFile(s3n://a/b/c,40)
  val hgfasta = sc.textFile(hdfs://a/b/c,40)
  val hgPair = hgfasta.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
  val filtered = hgPair.filter(kv = kv._2 == 1)
  val bedPair = bedFile.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
  val joinRDD = bedPair.join(filtered)
 val ty = joinRDD.map{case(word, (file1Counts,
 file2Counts)) = KmerIntesect(word, file1Counts,xyz)}
 ty.registerTempTable(KmerIntesect)

 ty.saveAsParquetFile(hdfs://x/y/z/kmerIntersect.parquet)
   }
 }






Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems

2015-03-25 Thread Dean Wampler
Weird. Are you running using SBT console? It should have the spark-core jar
on the classpath. Similarly, spark-shell or spark-submit should work, but
be sure you're using the same version of Spark when running as when
compiling. Also, you might need to add spark-sql to your SBT dependencies,
but that shouldn't be this issue.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Mar 25, 2015 at 12:09 PM, roni roni.epi...@gmail.com wrote:

 Thanks Dean and Nick.
 So, I removed the ADAM and H2o from my SBT as I was not using them.
 I got the code to compile  - only for fail while running with -
 SparkContext: Created broadcast 1 from textFile at kmerIntersetion.scala:21
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/rdd/RDD$
 at preDefKmerIntersection$.main(kmerIntersetion.scala:26)

 This line is where I do a JOIN operation.
 val hgPair = hgfasta.map(_.split (,)).map(a= (a(0), a(1).trim().toInt))
  val filtered = hgPair.filter(kv = kv._2 == 1)
  val bedPair = bedFile.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
 * val joinRDD = bedPair.join(filtered)   *
 Any idea whats going on?
 I have data on the EC2 so I am avoiding creating the new cluster , but
 just upgrading and changing the code to use 1.3 and Spark SQL
 Thanks
 Roni



 On Wed, Mar 25, 2015 at 9:50 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 For the Spark SQL parts, 1.3 breaks backwards compatibility, because
 before 1.3, Spark SQL was considered experimental where API changes were
 allowed.

 So, H2O and ADA compatible with 1.2.X might not work with 1.3.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Wed, Mar 25, 2015 at 9:39 AM, roni roni.epi...@gmail.com wrote:

 Even if H2o and ADA are dependent on 1.2.1 , it should be backword
 compatible, right?
 So using 1.3 should not break them.
 And the code is not using the classes from those libs.
 I tried sbt clean compile .. same errror
 Thanks
 _R

 On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 What version of Spark do the other dependencies rely on (Adam and H2O?)
 - that could be it

 Or try sbt clean compile

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote:

 I have a EC2 cluster created using spark version 1.2.1.
 And I have a SBT project .
 Now I want to upgrade to spark 1.3 and use the new features.
 Below are issues .
 Sorry for the long post.
 Appreciate your help.
 Thanks
 -Roni

 Question - Do I have to create a new cluster using spark 1.3?

 Here is what I did -

 In my SBT file I  changed to -
 libraryDependencies += org.apache.spark %% spark-core % 1.3.0

 But then I started getting compilation error. along with
 Here are some of the libraries that were evicted:
 [warn]  * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0
 [warn]  * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) -
 2.6.0
 [warn] Run 'evicted' to see detailed eviction warnings

  constructor cannot be instantiated to expected type;
 [error]  found   : (T1, T2)
 [error]  required: org.apache.spark.sql.catalyst.expressions.Row
 [error] val ty =
 joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word,
 file1Counts,xyz)}
 [error]  ^

 Here is my SBT and code --
 SBT -

 version := 1.0

 scalaVersion := 2.10.4

 resolvers += Sonatype OSS Snapshots at 
 https://oss.sonatype.org/content/repositories/snapshots;;
 resolvers += Maven Repo1 at https://repo1.maven.org/maven2;;
 resolvers += Maven Repo at 
 https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/;;

 /* Dependencies - %% appends Scala version to artifactId */
 libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0
 libraryDependencies += org.apache.spark %% spark-core % 1.3.0
 libraryDependencies += org.bdgenomics.adam % adam-core % 0.16.0
 libraryDependencies += ai.h2o % sparkling-water-core_2.10 %
 0.2.10


 CODE --
 import org.apache.spark.{SparkConf, SparkContext}
 case class KmerIntesect(kmer: String, kCount: Int, fileName: String)

 object preDefKmerIntersection {
   def main(args: Array[String]) {

  val sparkConf = new SparkConf().setAppName(preDefKmer-intersect)
  val sc = new SparkContext(sparkConf)
 import sqlContext.createSchemaRDD
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 val bedFile = sc.textFile(s3n://a/b/c,40)
  val hgfasta = sc.textFile(hdfs://a/b/c,40)
  val hgPair

Re: newbie quesiton - spark with mesos

2015-03-25 Thread Dean Wampler
I think the problem is the use the loopback address:

export SPARK_LOCAL_IP=127.0.0.1

In the stack trace from the slave, you see this:

...  Reason: Connection refused: localhost/127.0.0.1:51849
akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://sparkDriver@localhost:51849/),
Path(/user/MapOutputTracker)]

It's trying to connect to an Akka actor on itself, using the loopback
address.

Try changing SPARK_LOCAL_IP to the publicly routable IP address.

dean


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 7:37 PM, Anirudha Jadhav anirudh...@gmail.com
wrote:

 My bad there, I was using the correct link for docs. The spark shell runs
 correctly, the framework is registered fine on mesos.

 is there some setting i am missing:
 this is my spark-env.sh

 export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
 export SPARK_EXECUTOR_URI=http://100.125.5.93/sparkx.tgz
 export SPARK_LOCAL_IP=127.0.0.1



 here is what i see on the slave node.
 
 less
 20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56/stderr
 

 WARNING: Logging before InitGoogleLogging() is written to STDERR
 I0324 02:30:29.389225 27755 fetcher.cpp:76] Fetching URI '
 http://100.125.5.93/sparkx.tgz'
 I0324 02:30:29.389361 27755 fetcher.cpp:126] Downloading '
 http://100.125.5.93/sparkx.tgz' to
 '/tmp/mesos/slaves/20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56/sparkx.tgz'
 I0324 02:30:35.353446 27755 fetcher.cpp:64] Extracted resource
 '/tmp/mesos/slaves/20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56/sparkx.tgz'
 into
 '/tmp/mesos/slaves/20150226-160708-78932-5050-8971-S0/frameworks/20150323-205508-78932-5050-29804-0012/executors/20150226-160708-78932-5050-8971-S0/runs/cceea834-c4d9-49d6-a579-8352f1889b56'
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/03/24 02:30:37 INFO MesosExecutorBackend: Registered signal handlers
 for [TERM, HUP, INT]
 I0324 02:30:37.071077 27863 exec.cpp:132] Version: 0.21.1
 I0324 02:30:37.080971 27885 exec.cpp:206] Executor registered on slave
 20150226-160708-78932-5050-8971-S0
 15/03/24 02:30:37 INFO MesosExecutorBackend: Registered with Mesos as
 executor ID 20150226-160708-78932-5050-8971-S0 with 1 cpus
 15/03/24 02:30:37 INFO SecurityManager: Changing view acls to: ubuntu
 15/03/24 02:30:37 INFO SecurityManager: Changing modify acls to: ubuntu
 15/03/24 02:30:37 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(ubuntu); users
 with modify permissions: Set(ubuntu)
 15/03/24 02:30:37 INFO Slf4jLogger: Slf4jLogger started
 15/03/24 02:30:37 INFO Remoting: Starting remoting
 15/03/24 02:30:38 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkexecu...@mesos-si2.dny1.bcpc.bloomberg.com:50542]
 15/03/24 02:30:38 INFO Utils: Successfully started service 'sparkExecutor'
 on port 50542.
 15/03/24 02:30:38 INFO AkkaUtils: Connecting to MapOutputTracker:
 akka.tcp://sparkDriver@localhost:51849/user/MapOutputTracker
 15/03/24 02:30:38 WARN Remoting: Tried to associate with unreachable
 remote address [akka.tcp://sparkDriver@localhost:51849]. Address is now
 gated for 5000 ms, all messages to this address will be delivered to dead
 letters. Reason: Connection refused: localhost/127.0.0.1:51849
 akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Anchor(akka.tcp://sparkDriver@localhost:51849/),
 Path(/user/MapOutputTracker)]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58

Re: Date and decimal datatype not working

2015-03-25 Thread Dean Wampler
Recall that the input isn't actually read until to do something that forces
evaluation, like call saveAsTextFile. You didn't show the whole stack trace
here, but it probably occurred while parsing an input line where one of
your long fields is actually an empty string.

Because this is such a common problem, I usually define a parse method
that converts input text to the desired schema. It catches parse exceptions
like this and reports the bad line at least. If you can return a default
long in this case, say 0, that makes it easier to return something.

dean



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Mar 25, 2015 at 11:48 AM, BASAK, ANANDA ab9...@att.com wrote:

  Thanks. This library is only available with Spark 1.3. I am using
 version 1.2.1. Before I upgrade to 1.3, I want to try what can be done in
 1.2.1.



 So I am using following:

 val MyDataset = sqlContext.sql(my select query”)



 MyDataset.map(t =
 t(0)+|+t(1)+|+t(2)+|+t(3)+|+t(4)+|+t(5)).saveAsTextFile(/my_destination_path)



 But it is giving following error:

 15/03/24 17:05:51 ERROR Executor: Exception in task 1.0 in stage 13.0 (TID
 106)

 java.lang.NumberFormatException: For input string: 

 at
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

 at java.lang.Long.parseLong(Long.java:453)

 at java.lang.Long.parseLong(Long.java:483)

 at
 scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230)



 is there something wrong with the TSTAMP field which is Long datatype?



 Thanks  Regards

 ---

 Ananda Basak

 Ph: 425-213-7092



 *From:* Yin Huai [mailto:yh...@databricks.com]
 *Sent:* Monday, March 23, 2015 8:55 PM

 *To:* BASAK, ANANDA
 *Cc:* user@spark.apache.org
 *Subject:* Re: Date and decimal datatype not working



 To store to csv file, you can use Spark-CSV
 https://github.com/databricks/spark-csv library.



 On Mon, Mar 23, 2015 at 5:35 PM, BASAK, ANANDA ab9...@att.com wrote:

  Thanks. This worked well as per your suggestions. I had to run following:

 val TABLE_A =
 sc.textFile(/Myhome/SPARK/files/table_a_file.txt).map(_.split(|)).map(p
 = ROW_A(p(0).trim.toLong, p(1), p(2).trim.toInt, p(3), BigDecimal(p(4)),
 BigDecimal(p(5)), BigDecimal(p(6



 Now I am stuck at another step. I have run a SQL query, where I am
 Selecting from all the fields with some where clause , TSTAMP filtered with
 date range and order by TSTAMP clause. That is running fine.



 Then I am trying to store the output in a CSV file. I am using
 saveAsTextFile(“filename”) function. But it is giving error. Can you please
 help me to write a proper syntax to store output in a CSV file?





 Thanks  Regards

 ---

 Ananda Basak

 Ph: 425-213-7092



 *From:* BASAK, ANANDA
 *Sent:* Tuesday, March 17, 2015 3:08 PM
 *To:* Yin Huai
 *Cc:* user@spark.apache.org
 *Subject:* RE: Date and decimal datatype not working



 Ok, thanks for the suggestions. Let me try and will confirm all.



 Regards

 Ananda



 *From:* Yin Huai [mailto:yh...@databricks.com]
 *Sent:* Tuesday, March 17, 2015 3:04 PM
 *To:* BASAK, ANANDA
 *Cc:* user@spark.apache.org
 *Subject:* Re: Date and decimal datatype not working



 p(0) is a String. So, you need to explicitly convert it to a Long. e.g.
 p(0).trim.toLong. You also need to do it for p(2). For those BigDecimals
 value, you need to create BigDecimal objects from your String values.



 On Tue, Mar 17, 2015 at 5:55 PM, BASAK, ANANDA ab9...@att.com wrote:

   Hi All,

 I am very new in Spark world. Just started some test coding from last
 week. I am using spark-1.2.1-bin-hadoop2.4 and scala coding.

 I am having issues while using Date and decimal data types. Following is
 my code that I am simply running on scala prompt. I am trying to define a
 table and point that to my flat file containing raw data (pipe delimited
 format). Once that is done, I will run some SQL queries and put the output
 data in to another flat file with pipe delimited format.



 ***

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 import sqlContext.createSchemaRDD





 // Define row and table

 case class ROW_A(

   TSTAMP:   Long,

   USIDAN: String,

   SECNT:Int,

   SECT:   String,

   BLOCK_NUM:BigDecimal,

   BLOCK_DEN:BigDecimal,

   BLOCK_PCT:BigDecimal)



 val TABLE_A =
 sc.textFile(/Myhome/SPARK/files/table_a_file.txt).map(_.split(|)).map(p
 = ROW_A(p(0), p(1), p(2), p(3), p(4), p(5), p(6)))



 TABLE_A.registerTempTable(TABLE_A)



 ***



 The second last command is giving error, like following:

 console:17: error: type mismatch;

 found   : String

Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems

2015-03-25 Thread Dean Wampler
Yes, that's the problem. The RDD class exists in both binary jar files, but
the signatures probably don't match. The bottom line, as always for tools
like this, is that you can't mix versions.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Mar 25, 2015 at 3:13 PM, roni roni.epi...@gmail.com wrote:

 My cluster is still on spark 1.2 and in SBT I am using 1.3.
 So probably it is compiling with 1.3 but running with 1.2 ?

 On Wed, Mar 25, 2015 at 12:34 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Weird. Are you running using SBT console? It should have the spark-core
 jar on the classpath. Similarly, spark-shell or spark-submit should work,
 but be sure you're using the same version of Spark when running as when
 compiling. Also, you might need to add spark-sql to your SBT dependencies,
 but that shouldn't be this issue.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Wed, Mar 25, 2015 at 12:09 PM, roni roni.epi...@gmail.com wrote:

 Thanks Dean and Nick.
 So, I removed the ADAM and H2o from my SBT as I was not using them.
 I got the code to compile  - only for fail while running with -
 SparkContext: Created broadcast 1 from textFile at
 kmerIntersetion.scala:21
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/rdd/RDD$
 at preDefKmerIntersection$.main(kmerIntersetion.scala:26)

 This line is where I do a JOIN operation.
 val hgPair = hgfasta.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
  val filtered = hgPair.filter(kv = kv._2 == 1)
  val bedPair = bedFile.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
 * val joinRDD = bedPair.join(filtered)   *
 Any idea whats going on?
 I have data on the EC2 so I am avoiding creating the new cluster , but
 just upgrading and changing the code to use 1.3 and Spark SQL
 Thanks
 Roni



 On Wed, Mar 25, 2015 at 9:50 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 For the Spark SQL parts, 1.3 breaks backwards compatibility, because
 before 1.3, Spark SQL was considered experimental where API changes were
 allowed.

 So, H2O and ADA compatible with 1.2.X might not work with 1.3.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Wed, Mar 25, 2015 at 9:39 AM, roni roni.epi...@gmail.com wrote:

 Even if H2o and ADA are dependent on 1.2.1 , it should be backword
 compatible, right?
 So using 1.3 should not break them.
 And the code is not using the classes from those libs.
 I tried sbt clean compile .. same errror
 Thanks
 _R

 On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 What version of Spark do the other dependencies rely on (Adam and
 H2O?) - that could be it

 Or try sbt clean compile

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote:

 I have a EC2 cluster created using spark version 1.2.1.
 And I have a SBT project .
 Now I want to upgrade to spark 1.3 and use the new features.
 Below are issues .
 Sorry for the long post.
 Appreciate your help.
 Thanks
 -Roni

 Question - Do I have to create a new cluster using spark 1.3?

 Here is what I did -

 In my SBT file I  changed to -
 libraryDependencies += org.apache.spark %% spark-core % 1.3.0

 But then I started getting compilation error. along with
 Here are some of the libraries that were evicted:
 [warn]  * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0
 [warn]  * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) -
 2.6.0
 [warn] Run 'evicted' to see detailed eviction warnings

  constructor cannot be instantiated to expected type;
 [error]  found   : (T1, T2)
 [error]  required: org.apache.spark.sql.catalyst.expressions.Row
 [error] val ty =
 joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word,
 file1Counts,xyz)}
 [error]  ^

 Here is my SBT and code --
 SBT -

 version := 1.0

 scalaVersion := 2.10.4

 resolvers += Sonatype OSS Snapshots at 
 https://oss.sonatype.org/content/repositories/snapshots;;
 resolvers += Maven Repo1 at https://repo1.maven.org/maven2;;
 resolvers += Maven Repo at 
 https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/
 ;

 /* Dependencies - %% appends Scala version to artifactId */
 libraryDependencies += org.apache.hadoop % hadoop-client %
 2.6.0
 libraryDependencies += org.apache.spark %% spark-core

Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems

2015-03-25 Thread Dean Wampler
You could stop the running the processes and run the same processes using
the new version, starting with the master and then the slaves. You would
have to snoop around a bit to get the command-line arguments right, but
it's doable. Use `ps -efw` to find the command-lines used. Be sure to rerun
them as the same user. Or look at what the EC2 scripts do.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Mar 25, 2015 at 4:54 PM, roni roni.epi...@gmail.com wrote:

 Is there any way that I can install the new one and remove previous
 version.
 I installed spark 1.3 on my EC2 master and set teh spark home to the new
 one.
 But when I start teh spark-shell I get -
  java.lang.UnsatisfiedLinkError:
 org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative()V
 at
 org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative(Native
 Method)

 Is There no way to upgrade without creating new cluster?
 Thanks
 Roni



 On Wed, Mar 25, 2015 at 1:18 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Yes, that's the problem. The RDD class exists in both binary jar files,
 but the signatures probably don't match. The bottom line, as always for
 tools like this, is that you can't mix versions.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Wed, Mar 25, 2015 at 3:13 PM, roni roni.epi...@gmail.com wrote:

 My cluster is still on spark 1.2 and in SBT I am using 1.3.
 So probably it is compiling with 1.3 but running with 1.2 ?

 On Wed, Mar 25, 2015 at 12:34 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Weird. Are you running using SBT console? It should have the spark-core
 jar on the classpath. Similarly, spark-shell or spark-submit should work,
 but be sure you're using the same version of Spark when running as when
 compiling. Also, you might need to add spark-sql to your SBT dependencies,
 but that shouldn't be this issue.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Wed, Mar 25, 2015 at 12:09 PM, roni roni.epi...@gmail.com wrote:

 Thanks Dean and Nick.
 So, I removed the ADAM and H2o from my SBT as I was not using them.
 I got the code to compile  - only for fail while running with -
 SparkContext: Created broadcast 1 from textFile at
 kmerIntersetion.scala:21
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/rdd/RDD$
 at preDefKmerIntersection$.main(kmerIntersetion.scala:26)

 This line is where I do a JOIN operation.
 val hgPair = hgfasta.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
  val filtered = hgPair.filter(kv = kv._2 == 1)
  val bedPair = bedFile.map(_.split (,)).map(a=
 (a(0), a(1).trim().toInt))
 * val joinRDD = bedPair.join(filtered)   *
 Any idea whats going on?
 I have data on the EC2 so I am avoiding creating the new cluster , but
 just upgrading and changing the code to use 1.3 and Spark SQL
 Thanks
 Roni



 On Wed, Mar 25, 2015 at 9:50 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 For the Spark SQL parts, 1.3 breaks backwards compatibility, because
 before 1.3, Spark SQL was considered experimental where API changes were
 allowed.

 So, H2O and ADA compatible with 1.2.X might not work with 1.3.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Wed, Mar 25, 2015 at 9:39 AM, roni roni.epi...@gmail.com wrote:

 Even if H2o and ADA are dependent on 1.2.1 , it should be backword
 compatible, right?
 So using 1.3 should not break them.
 And the code is not using the classes from those libs.
 I tried sbt clean compile .. same errror
 Thanks
 _R

 On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 What version of Spark do the other dependencies rely on (Adam and
 H2O?) - that could be it

 Or try sbt clean compile

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com
 wrote:

 I have a EC2 cluster created using spark version 1.2.1.
 And I have a SBT project .
 Now I want to upgrade to spark 1.3 and use the new features.
 Below are issues .
 Sorry for the long post.
 Appreciate your help.
 Thanks
 -Roni

 Question - Do I have to create a new cluster using spark 1.3?

 Here is what I did -

 In my SBT file I  changed to -
 libraryDependencies += org.apache.spark

Re: How to deploy binary dependencies to workers?

2015-03-24 Thread Dean Wampler
Both spark-submit and spark-shell have a --jars option for passing
additional jars to the cluster. They will be added to the appropriate
classpaths.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Mar 24, 2015 at 4:13 AM, Xi Shen davidshe...@gmail.com wrote:

 Hi,

 I am doing ML using Spark mllib. However, I do not have full control to
 the cluster. I am using Microsoft Azure HDInsight

 I want to deploy the BLAS or whatever required dependencies to accelerate
 the computation. But I don't know how to deploy those DLLs when I submit my
 JAR to the cluster.

 I know how to pack those DLLs into a jar. The real challenge is how to let
 the system find them...


 Thanks,
 David




Re: Optimal solution for getting the header from CSV with Spark

2015-03-24 Thread Dean Wampler
Good point. There's no guarantee that you'll get the actual first
partition. One reason why I wouldn't allow a CSV header line in a real data
file, if I could avoid it.

Back to Spark, a safer approach is RDD.foreachPartition, which takes a
function expecting an iterator. You'll only need to grab the first element
(being careful that the partition isn't empty!) and then determine which of
those first lines has the header info.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Mar 24, 2015 at 11:12 AM, Sean Owen so...@cloudera.com wrote:

 I think this works in practice, but I don't know that the first block
 of the file is guaranteed to be in the first partition? certainly
 later down the pipeline that won't be true but presumably this is
 happening right after reading the file.

 I've always just written some filter that would only match the header,
 which assumes that this is possible to distinguish, but usually is.

 On Tue, Mar 24, 2015 at 2:41 PM, Dean Wampler deanwamp...@gmail.com
 wrote:
  Instead of data.zipWithIndex().filter(_._2==0), which will cause Spark to
  read the whole file, use data.take(1), which is simpler.
 
  From the Rdd.take documentation, it works by first scanning one
 partition,
  and using the results from that partition to estimate the number of
  additional partitions needed to satisfy the limit. In this case, it will
  trivially stop at the first.
 
 
  Dean Wampler, Ph.D.
  Author: Programming Scala, 2nd Edition (O'Reilly)
  Typesafe
  @deanwampler
  http://polyglotprogramming.com
 
  On Tue, Mar 24, 2015 at 7:12 AM, Spico Florin spicoflo...@gmail.com
 wrote:
 
  Hello!
 
  I would like to know what is the optimal solution for getting the header
  with from a CSV file with Spark? My aproach was:
 
  def getHeader(data: RDD[String]): String = {
  data.zipWithIndex().filter(_._2==0).map(x=x._1).take(1).mkString() }
 
  Thanks.
 
 



Re: Optimal solution for getting the header from CSV with Spark

2015-03-24 Thread Dean Wampler
Instead of data.zipWithIndex().filter(_._2==0), which will cause Spark to
read the whole file, use data.take(1), which is simpler.

From the Rdd.take documentation, it works by first scanning one partition,
and using the results from that partition to estimate the number of
additional partitions needed to satisfy the limit. In this case, it will
trivially stop at the first.


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Mar 24, 2015 at 7:12 AM, Spico Florin spicoflo...@gmail.com wrote:

 Hello!

 I would like to know what is the optimal solution for getting the header
 with from a CSV file with Spark? My aproach was:

 def getHeader(data: RDD[String]): String = {
 data.zipWithIndex().filter(_._2==0).map(x=x._1).take(1).mkString() }

 Thanks.



Re: Saving Dstream into a single file

2015-03-23 Thread Dean Wampler
You can use the coalesce method to reduce the number of partitions. You can
reduce to one if the data is not too big. Then write the output.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 16, 2015 at 2:42 PM, Zhan Zhang zzh...@hortonworks.com wrote:

 Each RDD has multiple partitions, each of them will produce one hdfs file
 when saving output. I don’t think you are allowed to have multiple file
 handler writing to the same hdfs file.  You still can load multiple files
 into hive tables, right?

 Thanks..

 Zhan Zhang

 On Mar 15, 2015, at 7:31 AM, tarek_abouzeid tarek.abouzei...@yahoo.com
 wrote:

  i am doing word count example on flume stream and trying to save output
 as
  text files in HDFS , but in the save directory i got multiple sub
  directories each having files with small size , i wonder if there is a
 way
  to append in a large file instead of saving in multiple files , as i
 intend
  to save the output in hive hdfs directory so i can query the result using
  hive
 
  hope anyone have a workaround for this issue , Thanks in advance
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Dstream-into-a-single-file-tp22058.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread Dean Wampler
In 1.2 it's a member of SchemaRDD and it becomes available on RDD (through
the type class mechanism) when you add a SQLContext, like so.

val sqlContext = new SQLContext(sc)import sqlContext._


In 1.3, the method has moved to the new DataFrame type.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 5:25 AM, IT CTO goi@gmail.com wrote:

 Hi,

 I am running spark when I use sc.version I get 1.2 but when I call
 registerTempTable(MyTable) I get error saying registedTempTable is not a
 member of RDD

 Why?

 --
 Eran | CTO



Re: newbie quesiton - spark with mesos

2015-03-23 Thread Dean Wampler
That's a very old page, try this instead:

http://spark.apache.org/docs/latest/running-on-mesos.html

When you run your Spark job on Mesos, tasks will be started on the slave
nodes as needed, since fine-grained mode is the default.

For a job like your example, very few tasks will be needed. Actually only
one would be enough, but the default number of partitions will be used. I
believe 8 is the default for Mesos. For local mode (local[*]), it's the
number of cores. You can also set the propoerty spark.default.parallelism.

HTH,

Dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 11:46 AM, Anirudha Jadhav aniru...@nyu.edu wrote:

 i have a mesos cluster, which i deploy spark to by using instructions on
 http://spark.apache.org/docs/0.7.2/running-on-mesos.html

 after that the spark shell starts up fine.
 then i try the following on the shell:

 val data = 1 to 1

 val distData = sc.parallelize(data)

 distData.filter(_ 10).collect()

 open spark web ui at host:4040 and see an active job.

 NOW, how do i start workers or spark workers on mesos ? who completes my
 job?
 thanks,

 --
 Ani



Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Dean Wampler
Well, it's complaining about trait OptionInstances which is defined in
Option.scala in the std package. Use scalap or javap on the scalaz library
to find out which member of the trait is the problem, but since it says
$$anon$1, I suspect it's the first value member, implicit val
optionInstance, which has a long list of mixin traits, one of which is
probably at fault. OptionInstances is huge, so there might be other
offenders.

Scalaz wasn't designed for distributed systems like this, so you'll
probably find many examples of nonserializability. An alternative is to
avoid using Scalaz in any closures passed to Spark methods, but that's
probably not what you want.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote:

 Hey all,

 I'd like to use the Scalaz library in some of my Spark jobs, but am running
 into issues where some stuff I use from Scalaz is not serializable. For
 instance, in Scalaz there is a trait

 /** In Scalaz */
 trait Applicative[F[_]] {
   def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C]
   def point[A](a: = A): F[A]
 }

 But when I try to use it in say, in an `RDD#aggregate` call I get:


 Caused by: java.io.NotSerializableException:
 scalaz.std.OptionInstances$$anon$1
 Serialization stack:
 - object not serializable (class:
 scalaz.std.OptionInstances$$anon$1,
 value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
 - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1,
 type:
 interface scalaz.Applicative)
 - object (class dielectric.syntax.RDDOps$$anonfun$1, function2)
 - field (class:
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 name: apConcat$1, type: interface scala.Function2)
 - object (class
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 function2)

 Outside of submitting a PR to Scalaz to make things Serializable, what can
 I
 do to make things Serializable? I considered something like

 implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
 SomeSerializableType[F] =
   new SomeSerializableType { ... } ??

 Not sure how to go about doing it - I looked at java.io.Externalizable but
 given `scalaz.Applicative` has no value members I'm not sure how to
 implement the interface.

 Any guidance would be much appreciated - thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




  1   2   >