Re: Correct way to use spark streaming with apache zeppelin

2016-03-13 Thread Skanda
Hi

Storing states/intermediate data in realtime processing depends on how much
throughput/latency your application requires. There are lot of technologies
that help you build this realtime datastore. Some examples include HBase,
Memsql, etc or in some cases an RDBMS like MySQL itself. This is a
judgement that you will have to make.

Regards,
Skanda

On Sun, Mar 13, 2016 at 11:23 PM, trung kien  wrote:

> Thanks all for actively sharing your experience.
>
> @Chris: using something like Redis is something I am trying to figure out.
> I have  a lots of transactions, so I couldn't trigger update event for
> every single transaction.
> I'm looking at Spark Streaming because it provide batch processing (e.g I
> can update the cache every 5 seconds). In addition Spark can scale pretty
> well and I don't have to worry about losing data.
>
> Now having the cache with following information:
>  * Date
>  * BranchID
>  * ProductID
>  TotalQty
>  TotalDollar
>
> * is key, note that I have history data as well (byday).
>
> Now I want to use zeppelin for querying again the cache (while the cache
> is updating).
> I don't need the Zeppelin update automatically (I can hit the run button
> myself :) )
> Just curious if parquet is the right solution for us?
>
>
>
> On Sun, Mar 13, 2016 at 3:25 PM, Chris Miller 
> wrote:
>
>> Cool! Thanks for sharing.
>>
>>
>> --
>> Chris Miller
>>
>> On Sun, Mar 13, 2016 at 12:53 AM, Todd Nist  wrote:
>>
>>> Below is a link to an example which Silvio Fiorito put together
>>> demonstrating how to link Zeppelin with Spark Stream for real-time charts.
>>> I think the original thread was pack in early November 2015, subject: Real
>>> time chart in Zeppelin, if you care to try to find it.
>>>
>>> https://gist.github.com/granturing/a09aed4a302a7367be92
>>>
>>> HTH.
>>>
>>> -Todd
>>>
>>> On Sat, Mar 12, 2016 at 6:21 AM, Chris Miller 
>>> wrote:
>>>
 I'm pretty new to all of this stuff, so bare with me.

 Zeppelin isn't really intended for realtime dashboards as far as I
 know. Its reporting features (tables, graphs, etc.) are more for displaying
 the results from the output of something. As far as I know, there isn't
 really anything to "watch" a dataset and have updates pushed to the
 Zeppelin UI.

 As for Spark, unless you're doing a lot of processing that you didn't
 mention here, I don't think it's a good fit just for this.

 If it were me (just off the top of my head), I'd just build a simple
 web service that uses websockets to push updates to the client which could
 then be used to update graphs, tables, etc. The data itself -- that is, the
 accumulated totals -- you could store in something like Redis. When an
 order comes in, just add that quantity and price to the existing value and
 trigger your code to push out an updated value to any clients via the
 websocket. You could use something like a Redis pub/sub channel to trigger
 the web app to notify clients of an update.

 There are about 5 million other ways you could design this, but I would
 just keep it as simple as possible. I just threw one idea out...

 Good luck.


 --
 Chris Miller

 On Sat, Mar 12, 2016 at 6:58 PM, trung kien  wrote:

> Thanks Chris and Mich for replying.
>
> Sorry for not explaining my problem clearly.  Yes i am talking about a
> flexibke dashboard when mention Zeppelin.
>
> Here is the problem i am having:
>
> I am running a comercial website where we selle many products and we
> have many branchs in many place. We have a lots of realtime transactions
> and want to anaylyze it in realtime.
>
> We dont want every time doing analytics we have to aggregate every
> single transactions ( each transaction have BranchID, ProductID, Qty,
> Price). So, we maintain intermediate data which contains : BranchID,
> ProducrID, totalQty, totalDollar
>
> Ideally, we have 2 tables:
>Transaction ( BranchID, ProducrID, Qty, Price, Timestamp)
>
> And intermediate table Stats is just sum of every transaction group by
> BranchID and ProductID( i am using Sparkstreaming to calculate this table
> realtime)
>
> My thinking is that doing statistics ( realtime dashboard)  on Stats
> table is much easier, this table is also not enough for maintain.
>
> I'm just wondering, whats the best way to store Stats table( a
> database or parquet file?)
> What exactly are you trying to do? Zeppelin is for interactive
> analysis of a dataset. What do you mean "realtime analytics" -- do you 
> mean
> build a report or dashboard that automatically updates as new data comes 
> in?
>
>
> --
> Chris Miller
>
> On Sat, Mar 12, 2016 at 3:13 PM, trung kien 
> wrote:
>
>> Hi all,
>>
>> I've just viewe

Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Hi Ted,

I see that you’re working on the hbase-spark module for hbase. I recently 
packaged the SparkOnHBase project and gave it a test run. It works like a charm 
on CDH 5.4 and 5.5. All I had to do was add 
/opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
/path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the easy-to-use 
HBaseContext for HBase operations. Now, I want to use the latest in Dataframes. 
Since the new functionality is only in the hbase-spark module, I want to know 
how to get it and package it for CDH 5.5, which still uses HBase 1.0.0. Can you 
tell me what version of hbase master is still backwards compatible?

By the way, we are using Spark 1.6 if it matters.

Thanks,
Ben

> On Feb 10, 2016, at 2:34 AM, Ted Yu  wrote:
> 
> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
> 
> Cheers
> 
> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph  > wrote:
> + Spark-Dev
> 
> For a Spark job on YARN accessing hbase table, added all hbase client jars 
> into spark.yarn.dist.files, NodeManager when launching container i.e 
> executor, does localization and brings all hbase-client jars into executor 
> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
> client jars, when i checked launch container.sh , Classpath does not have 
> $PWD/* and hence all the hbase client jars are ignored.
> 
> Is spark.yarn.dist.files not for adding jars into the executor classpath.
> 
> Thanks,
> Prabhu Joseph 
> 
> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph  > wrote:
> Hi All,
> 
>  When i do count on a Hbase table from Spark Shell which runs as yarn-client 
> mode, the job fails at count().
> 
> MASTER=yarn-client ./spark-shell
> 
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, 
> TableName}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>  
> val conf = HBaseConfiguration.create()
> conf.set(TableInputFormat.INPUT_TABLE,"spark")
> 
> val hBaseRDD = sc.newAPIHadoopRDD(conf, 
> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
> hBaseRDD.count()
> 
> 
> Tasks throw below exception, the actual exception is swallowed, a bug 
> JDK-7172206. After installing hbase client on all NodeManager machines, the 
> Spark job ran fine. So I confirmed that the issue is with executor classpath.
> 
> But i am searching for some other way of including hbase jars in spark 
> executor classpath instead of installing hbase client on all NM machines. 
> Tried adding all hbase jars in spark.yarn.dist.files , NM logs shows that it 
> localized all hbase jars, still the job fails. Tried 
> spark.executor.extraClasspath, still the job fails.
> 
> Is there any way we can access hbase from Executor without installing 
> hbase-client on all machines.
> 
> 
> 16/02/09 02:34:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
> prabhuFS1): java.lang.IllegalStateException: unread block data
> at 
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2428)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 
> 
> 
> Thanks,
> Prabhu Joseph
> 
> 



Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Ted Yu
Benjamin:
Since hbase-spark is in its own module, you can pull the whole hbase-spark
subtree into hbase 1.0 root dir and add the following to root pom.xml:
hbase-spark

Then you would be able to build the module yourself.

hbase-spark module uses APIs which are compatible with hbase 1.0

Cheers

On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim  wrote:

> Hi Ted,
>
> I see that you’re working on the hbase-spark module for hbase. I recently
> packaged the SparkOnHBase project and gave it a test run. It works like a
> charm on CDH 5.4 and 5.5. All I had to do was
> add /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the
> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars
> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the
> easy-to-use HBaseContext for HBase operations. Now, I want to use the
> latest in Dataframes. Since the new functionality is only in the
> hbase-spark module, I want to know how to get it and package it for CDH
> 5.5, which still uses HBase 1.0.0. Can you tell me what version of hbase
> master is still backwards compatible?
>
> By the way, we are using Spark 1.6 if it matters.
>
> Thanks,
> Ben
>
> On Feb 10, 2016, at 2:34 AM, Ted Yu  wrote:
>
> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>
> Cheers
>
> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph <
> prabhujose.ga...@gmail.com> wrote:
>
>> + Spark-Dev
>>
>> For a Spark job on YARN accessing hbase table, added all hbase client
>> jars into spark.yarn.dist.files, NodeManager when launching container i.e
>> executor, does localization and brings all hbase-client jars into executor
>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase
>> client jars, when i checked launch container.sh , Classpath does not have
>> $PWD/* and hence all the hbase client jars are ignored.
>>
>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>>
>> Thanks,
>> Prabhu Joseph
>>
>> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph > > wrote:
>>
>>> Hi All,
>>>
>>>  When i do count on a Hbase table from Spark Shell which runs as
>>> yarn-client mode, the job fails at count().
>>>
>>> MASTER=yarn-client ./spark-shell
>>>
>>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor,
>>> TableName}
>>> import org.apache.hadoop.hbase.client.HBaseAdmin
>>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>>
>>> val conf = HBaseConfiguration.create()
>>> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>>>
>>> val hBaseRDD = sc.newAPIHadoopRDD(conf,
>>> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
>>> hBaseRDD.count()
>>>
>>>
>>> Tasks throw below exception, the actual exception is swallowed, a bug
>>> JDK-7172206. After installing hbase client on all NodeManager machines, the
>>> Spark job ran fine. So I confirmed that the issue is with executor
>>> classpath.
>>>
>>> But i am searching for some other way of including hbase jars in spark
>>> executor classpath instead of installing hbase client on all NM machines.
>>> Tried adding all hbase jars in spark.yarn.dist.files , NM logs shows that
>>> it localized all hbase jars, still the job fails. Tried
>>> spark.executor.extraClasspath, still the job fails.
>>>
>>> Is there any way we can access hbase from Executor without installing
>>> hbase-client on all machines.
>>>
>>>
>>> 16/02/09 02:34:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID
>>> 0, prabhuFS1): *java.lang.IllegalStateException: unread block data*
>>> at
>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2428)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
>>> at
>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>
>>
>
>


Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Ted,

That’s great! I didn’t know. I will proceed with it as you said.

Thanks,
Ben

> On Mar 13, 2016, at 12:42 PM, Ted Yu  wrote:
> 
> Benjamin:
> Since hbase-spark is in its own module, you can pull the whole hbase-spark 
> subtree into hbase 1.0 root dir and add the following to root pom.xml:
> hbase-spark
> 
> Then you would be able to build the module yourself.
> 
> hbase-spark module uses APIs which are compatible with hbase 1.0
> 
> Cheers
> 
> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim  > wrote:
> Hi Ted,
> 
> I see that you’re working on the hbase-spark module for hbase. I recently 
> packaged the SparkOnHBase project and gave it a test run. It works like a 
> charm on CDH 5.4 and 5.5. All I had to do was add 
> /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the easy-to-use 
> HBaseContext for HBase operations. Now, I want to use the latest in 
> Dataframes. Since the new functionality is only in the hbase-spark module, I 
> want to know how to get it and package it for CDH 5.5, which still uses HBase 
> 1.0.0. Can you tell me what version of hbase master is still backwards 
> compatible?
> 
> By the way, we are using Spark 1.6 if it matters.
> 
> Thanks,
> Ben
> 
>> On Feb 10, 2016, at 2:34 AM, Ted Yu > > wrote:
>> 
>> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>> 
>> Cheers
>> 
>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph > > wrote:
>> + Spark-Dev
>> 
>> For a Spark job on YARN accessing hbase table, added all hbase client jars 
>> into spark.yarn.dist.files, NodeManager when launching container i.e 
>> executor, does localization and brings all hbase-client jars into executor 
>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
>> client jars, when i checked launch container.sh , Classpath does not have 
>> $PWD/* and hence all the hbase client jars are ignored.
>> 
>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>> 
>> Thanks,
>> Prabhu Joseph 
>> 
>> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph > > wrote:
>> Hi All,
>> 
>>  When i do count on a Hbase table from Spark Shell which runs as yarn-client 
>> mode, the job fails at count().
>> 
>> MASTER=yarn-client ./spark-shell
>> 
>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, 
>> TableName}
>> import org.apache.hadoop.hbase.client.HBaseAdmin
>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>  
>> val conf = HBaseConfiguration.create()
>> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>> 
>> val hBaseRDD = sc.newAPIHadoopRDD(conf, 
>> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
>> hBaseRDD.count()
>> 
>> 
>> Tasks throw below exception, the actual exception is swallowed, a bug 
>> JDK-7172206. After installing hbase client on all NodeManager machines, the 
>> Spark job ran fine. So I confirmed that the issue is with executor classpath.
>> 
>> But i am searching for some other way of including hbase jars in spark 
>> executor classpath instead of installing hbase client on all NM machines. 
>> Tried adding all hbase jars in spark.yarn.dist.files , NM logs shows that it 
>> localized all hbase jars, still the job fails. Tried 
>> spark.executor.extraClasspath, still the job fails.
>> 
>> Is there any way we can access hbase from Executor without installing 
>> hbase-client on all machines.
>> 
>> 
>> 16/02/09 02:34:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
>> prabhuFS1): java.lang.IllegalStateException: unread block data
>> at 
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2428)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
>> at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at 
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
>> at 
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
>> at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:

Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Ted,

I did as you said, but it looks like that HBaseContext relies on some 
differences in HBase itself.

[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:30:
 error: object HFileWriterImpl is not a member of package 
org.apache.hadoop.hbase.io.hfile
[ERROR] import org.apache.hadoop.hbase.io.hfile.{CacheConfig, 
HFileContextBuilder, HFileWriterImpl}
[ERROR]^
[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:627:
 error: not found: value HFileWriterImpl
[ERROR] val hfileCompression = HFileWriterImpl
[ERROR]^
[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:750:
 error: not found: value HFileWriterImpl
[ERROR] val defaultCompression = HFileWriterImpl
[ERROR]  ^
[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:898:
 error: value COMPARATOR is not a member of object 
org.apache.hadoop.hbase.CellComparator
[ERROR] 
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)

So… back to my original question… do you know when these incompatibilities were 
introduced? If so, I can pulled that version at time and try again.

Thanks,
Ben

> On Mar 13, 2016, at 12:42 PM, Ted Yu  wrote:
> 
> Benjamin:
> Since hbase-spark is in its own module, you can pull the whole hbase-spark 
> subtree into hbase 1.0 root dir and add the following to root pom.xml:
> hbase-spark
> 
> Then you would be able to build the module yourself.
> 
> hbase-spark module uses APIs which are compatible with hbase 1.0
> 
> Cheers
> 
> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim  > wrote:
> Hi Ted,
> 
> I see that you’re working on the hbase-spark module for hbase. I recently 
> packaged the SparkOnHBase project and gave it a test run. It works like a 
> charm on CDH 5.4 and 5.5. All I had to do was add 
> /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the easy-to-use 
> HBaseContext for HBase operations. Now, I want to use the latest in 
> Dataframes. Since the new functionality is only in the hbase-spark module, I 
> want to know how to get it and package it for CDH 5.5, which still uses HBase 
> 1.0.0. Can you tell me what version of hbase master is still backwards 
> compatible?
> 
> By the way, we are using Spark 1.6 if it matters.
> 
> Thanks,
> Ben
> 
>> On Feb 10, 2016, at 2:34 AM, Ted Yu > > wrote:
>> 
>> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>> 
>> Cheers
>> 
>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph > > wrote:
>> + Spark-Dev
>> 
>> For a Spark job on YARN accessing hbase table, added all hbase client jars 
>> into spark.yarn.dist.files, NodeManager when launching container i.e 
>> executor, does localization and brings all hbase-client jars into executor 
>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
>> client jars, when i checked launch container.sh , Classpath does not have 
>> $PWD/* and hence all the hbase client jars are ignored.
>> 
>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>> 
>> Thanks,
>> Prabhu Joseph 
>> 
>> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph > > wrote:
>> Hi All,
>> 
>>  When i do count on a Hbase table from Spark Shell which runs as yarn-client 
>> mode, the job fails at count().
>> 
>> MASTER=yarn-client ./spark-shell
>> 
>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, 
>> TableName}
>> import org.apache.hadoop.hbase.client.HBaseAdmin
>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>  
>> val conf = HBaseConfiguration.create()
>> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>> 
>> val hBaseRDD = sc.newAPIHadoopRDD(conf, 
>> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
>> hBaseRDD.count()
>> 
>> 
>> Tasks throw below exception, the actual exception is swallowed, a bug 
>> JDK-7172206. After installing hbase client on all NodeManager machines, the 
>> Spark job ran fine. So I confirmed that the issue is with executor classpath.
>> 
>> But i am searching for some other way of including hbase jars in spark 
>> executor classpath instead of installing hbase client on all NM machines. 
>> Tried adding all hbase jars in spark.yarn.dist.files , NM logs shows that it 
>> localized all hbase jars, still the job fails. Tried 
>> spark.executor.extraClasspath, still the job fails.
>> 
>> Is there any way we can access hbase from Executor without install

Trying to serialize/deserialize Spark ML Pipeline (RandomForest) Spark 1.6

2016-03-13 Thread Mario Lazaro
Hi!

I have a pipelineModel (use RandomForestClassifier) that I am trying to
save locally. I can save it using:

//save locally
val fileOut = new FileOutputStream("file:///home/user/forest.model")
val out  = new ObjectOutputStream(fileOut)
out.writeObject(model)
out.close()
fileOut.close()

 Then I deserialize it using:

val fileIn = new FileInputStream("/home/forest.model")
val in  = new ObjectInputStream(fileIn)
val cvModel =
in.readObject().asInstanceOf[org.apache.spark.ml.PipelineModel]
in.close()
fileIn.close()

but when I try to use it:

val predictions2 = cvModel.transform(testingData)

It throws an exception:

java.lang.IllegalArgumentException: Field "browser_index" does not exist.

at
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:212)
at
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:212)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at
scala.collection.AbstractMap.getOrElse(Map.scala:58) at
org.apache.spark.sql.types.StructType.apply(StructType.scala:211) at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$5.apply(VectorAssembler.scala:111)
at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$5.apply(VectorAssembler.scala:111)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at
org.apache.spark.ml.feature.VectorAssembler.transformSchema(VectorAssembler.scala:111)
at
org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:301)
at
org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:301)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at
org.apache.spark.ml.PipelineModel.transformSchema(Pipeline.scala:301) at
org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:68) at
org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:296) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60) at
$iwC$$iwC$$iwC$$iwC$$iwC.(:62) at
$iwC$$iwC$$iwC$$iwC.(:64) at
$iwC$$iwC$$iwC.(:66) at $iwC$$iwC.(:68) at
$iwC.(:70) at (:72) at .(:76)
at .() at .(:7) at .() at
$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at
org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:664)
at
org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629)
at
org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622)
at
org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at
org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


I am not using .save and .load because they do not work in Spark 1.6 for
RandomForest.

Any idea how to do this? any alternatives?

Thanks!
-- 
*Mario Lazaro*  |  Software Engineer, Big Data
*GumGum*   |  *Ads that stick*
310-985-3792 |  ma...@gumgum.com


Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Ted Yu
class HFileWriterImpl (in standalone file) is only present in master branch.
It is not in branch-1.

compressionByName() resides in class with @InterfaceAudience.Private which
got moved in master branch.

So looks like there is some work to be done for backporting to branch-1 :-)

On Sun, Mar 13, 2016 at 1:35 PM, Benjamin Kim  wrote:

> Ted,
>
> I did as you said, but it looks like that HBaseContext relies on some
> differences in HBase itself.
>
> [ERROR]
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:30:
> error: object HFileWriterImpl is not a member of package
> org.apache.hadoop.hbase.io.hfile
> [ERROR] import org.apache.hadoop.hbase.io.hfile.{CacheConfig,
> HFileContextBuilder, HFileWriterImpl}
> [ERROR]^
> [ERROR]
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:627:
> error: not found: value HFileWriterImpl
> [ERROR] val hfileCompression = HFileWriterImpl
> [ERROR]^
> [ERROR]
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:750:
> error: not found: value HFileWriterImpl
> [ERROR] val defaultCompression = HFileWriterImpl
> [ERROR]  ^
> [ERROR]
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:898:
> error: value COMPARATOR is not a member of object
> org.apache.hadoop.hbase.CellComparator
> [ERROR]
> .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
>
> So… back to my original question… do you know when these incompatibilities
> were introduced? If so, I can pulled that version at time and try again.
>
> Thanks,
> Ben
>
> On Mar 13, 2016, at 12:42 PM, Ted Yu  wrote:
>
> Benjamin:
> Since hbase-spark is in its own module, you can pull the whole hbase-spark
> subtree into hbase 1.0 root dir and add the following to root pom.xml:
> hbase-spark
>
> Then you would be able to build the module yourself.
>
> hbase-spark module uses APIs which are compatible with hbase 1.0
>
> Cheers
>
> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim  wrote:
>
>> Hi Ted,
>>
>> I see that you’re working on the hbase-spark module for hbase. I recently
>> packaged the SparkOnHBase project and gave it a test run. It works like a
>> charm on CDH 5.4 and 5.5. All I had to do was
>> add /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the
>> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars
>> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the
>> easy-to-use HBaseContext for HBase operations. Now, I want to use the
>> latest in Dataframes. Since the new functionality is only in the
>> hbase-spark module, I want to know how to get it and package it for CDH
>> 5.5, which still uses HBase 1.0.0. Can you tell me what version of hbase
>> master is still backwards compatible?
>>
>> By the way, we are using Spark 1.6 if it matters.
>>
>> Thanks,
>> Ben
>>
>> On Feb 10, 2016, at 2:34 AM, Ted Yu  wrote:
>>
>> Have you tried adding hbase client jars to spark.executor.extraClassPath
>> ?
>>
>> Cheers
>>
>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> + Spark-Dev
>>>
>>> For a Spark job on YARN accessing hbase table, added all hbase client
>>> jars into spark.yarn.dist.files, NodeManager when launching container i.e
>>> executor, does localization and brings all hbase-client jars into executor
>>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase
>>> client jars, when i checked launch container.sh , Classpath does not have
>>> $PWD/* and hence all the hbase client jars are ignored.
>>>
>>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph <
>>> prabhujose.ga...@gmail.com> wrote:
>>>
 Hi All,

  When i do count on a Hbase table from Spark Shell which runs as
 yarn-client mode, the job fails at count().

 MASTER=yarn-client ./spark-shell

 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor,
 TableName}
 import org.apache.hadoop.hbase.client.HBaseAdmin
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat

 val conf = HBaseConfiguration.create()
 conf.set(TableInputFormat.INPUT_TABLE,"spark")

 val hBaseRDD = sc.newAPIHadoopRDD(conf,
 classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
 hBaseRDD.count()


 Tasks throw below exception, the actual exception is swallowed, a bug
 JDK-7172206. After installing hbase client on all NodeManager machines, the
 Spark job ran fine. So I confirmed that the issue is with executor
 classpath.

 But i am searching for some other way of including hb

Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Ted,

Is there anything in the works or are there tasks already to do the 
back-porting?

Just curious.

Thanks,
Ben

> On Mar 13, 2016, at 3:46 PM, Ted Yu  wrote:
> 
> class HFileWriterImpl (in standalone file) is only present in master branch.
> It is not in branch-1.
> 
> compressionByName() resides in class with @InterfaceAudience.Private which 
> got moved in master branch.
> 
> So looks like there is some work to be done for backporting to branch-1 :-)
> 
> On Sun, Mar 13, 2016 at 1:35 PM, Benjamin Kim  > wrote:
> Ted,
> 
> I did as you said, but it looks like that HBaseContext relies on some 
> differences in HBase itself.
> 
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:30:
>  error: object HFileWriterImpl is not a member of package 
> org.apache.hadoop.hbase.io.hfile
> [ERROR] import org.apache.hadoop.hbase.io.hfile.{CacheConfig, 
> HFileContextBuilder, HFileWriterImpl}
> [ERROR]^
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:627:
>  error: not found: value HFileWriterImpl
> [ERROR] val hfileCompression = HFileWriterImpl
> [ERROR]^
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:750:
>  error: not found: value HFileWriterImpl
> [ERROR] val defaultCompression = HFileWriterImpl
> [ERROR]  ^
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:898:
>  error: value COMPARATOR is not a member of object 
> org.apache.hadoop.hbase.CellComparator
> [ERROR] 
> .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
> 
> So… back to my original question… do you know when these incompatibilities 
> were introduced? If so, I can pulled that version at time and try again.
> 
> Thanks,
> Ben
> 
>> On Mar 13, 2016, at 12:42 PM, Ted Yu > > wrote:
>> 
>> Benjamin:
>> Since hbase-spark is in its own module, you can pull the whole hbase-spark 
>> subtree into hbase 1.0 root dir and add the following to root pom.xml:
>> hbase-spark
>> 
>> Then you would be able to build the module yourself.
>> 
>> hbase-spark module uses APIs which are compatible with hbase 1.0
>> 
>> Cheers
>> 
>> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim > > wrote:
>> Hi Ted,
>> 
>> I see that you’re working on the hbase-spark module for hbase. I recently 
>> packaged the SparkOnHBase project and gave it a test run. It works like a 
>> charm on CDH 5.4 and 5.5. All I had to do was add 
>> /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
>> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
>> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the 
>> easy-to-use HBaseContext for HBase operations. Now, I want to use the latest 
>> in Dataframes. Since the new functionality is only in the hbase-spark 
>> module, I want to know how to get it and package it for CDH 5.5, which still 
>> uses HBase 1.0.0. Can you tell me what version of hbase master is still 
>> backwards compatible?
>> 
>> By the way, we are using Spark 1.6 if it matters.
>> 
>> Thanks,
>> Ben
>> 
>>> On Feb 10, 2016, at 2:34 AM, Ted Yu >> > wrote:
>>> 
>>> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>>> 
>>> Cheers
>>> 
>>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph >> > wrote:
>>> + Spark-Dev
>>> 
>>> For a Spark job on YARN accessing hbase table, added all hbase client jars 
>>> into spark.yarn.dist.files, NodeManager when launching container i.e 
>>> executor, does localization and brings all hbase-client jars into executor 
>>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
>>> client jars, when i checked launch container.sh , Classpath does not have 
>>> $PWD/* and hence all the hbase client jars are ignored.
>>> 
>>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>>> 
>>> Thanks,
>>> Prabhu Joseph 
>>> 
>>> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph >> > wrote:
>>> Hi All,
>>> 
>>>  When i do count on a Hbase table from Spark Shell which runs as 
>>> yarn-client mode, the job fails at count().
>>> 
>>> MASTER=yarn-client ./spark-shell
>>> 
>>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, 
>>> TableName}
>>> import org.apache.hadoop.hbase.client.HBaseAdmin
>>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>>  
>>> val conf = HBaseConfiguration.create()
>>> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>>> 
>>> val hBaseRDD = sc.newAPIHadoopRDD(conf, 
>>> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classO

Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Ted Yu
The backport would be done under HBASE-14160.

FYI

On Sun, Mar 13, 2016 at 4:14 PM, Benjamin Kim  wrote:

> Ted,
>
> Is there anything in the works or are there tasks already to do the
> back-porting?
>
> Just curious.
>
> Thanks,
> Ben
>
> On Mar 13, 2016, at 3:46 PM, Ted Yu  wrote:
>
> class HFileWriterImpl (in standalone file) is only present in master
> branch.
> It is not in branch-1.
>
> compressionByName() resides in class with @InterfaceAudience.Private which
> got moved in master branch.
>
> So looks like there is some work to be done for backporting to branch-1 :-)
>
> On Sun, Mar 13, 2016 at 1:35 PM, Benjamin Kim  wrote:
>
>> Ted,
>>
>> I did as you said, but it looks like that HBaseContext relies on some
>> differences in HBase itself.
>>
>> [ERROR]
>> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:30:
>> error: object HFileWriterImpl is not a member of package
>> org.apache.hadoop.hbase.io.hfile
>> [ERROR] import org.apache.hadoop.hbase.io.hfile.{CacheConfig,
>> HFileContextBuilder, HFileWriterImpl}
>> [ERROR]^
>> [ERROR]
>> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:627:
>> error: not found: value HFileWriterImpl
>> [ERROR] val hfileCompression = HFileWriterImpl
>> [ERROR]^
>> [ERROR]
>> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:750:
>> error: not found: value HFileWriterImpl
>> [ERROR] val defaultCompression = HFileWriterImpl
>> [ERROR]  ^
>> [ERROR]
>> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:898:
>> error: value COMPARATOR is not a member of object
>> org.apache.hadoop.hbase.CellComparator
>> [ERROR]
>> .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
>>
>> So… back to my original question… do you know when these
>> incompatibilities were introduced? If so, I can pulled that version at time
>> and try again.
>>
>> Thanks,
>> Ben
>>
>> On Mar 13, 2016, at 12:42 PM, Ted Yu  wrote:
>>
>> Benjamin:
>> Since hbase-spark is in its own module, you can pull the whole
>> hbase-spark subtree into hbase 1.0 root dir and add the following to root
>> pom.xml:
>> hbase-spark
>>
>> Then you would be able to build the module yourself.
>>
>> hbase-spark module uses APIs which are compatible with hbase 1.0
>>
>> Cheers
>>
>> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim 
>> wrote:
>>
>>> Hi Ted,
>>>
>>> I see that you’re working on the hbase-spark module for hbase. I
>>> recently packaged the SparkOnHBase project and gave it a test run. It works
>>> like a charm on CDH 5.4 and 5.5. All I had to do was
>>> add /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the
>>> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars
>>> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the
>>> easy-to-use HBaseContext for HBase operations. Now, I want to use the
>>> latest in Dataframes. Since the new functionality is only in the
>>> hbase-spark module, I want to know how to get it and package it for CDH
>>> 5.5, which still uses HBase 1.0.0. Can you tell me what version of hbase
>>> master is still backwards compatible?
>>>
>>> By the way, we are using Spark 1.6 if it matters.
>>>
>>> Thanks,
>>> Ben
>>>
>>> On Feb 10, 2016, at 2:34 AM, Ted Yu  wrote:
>>>
>>> Have you tried adding hbase client jars to spark.executor.extraClassPath
>>> ?
>>>
>>> Cheers
>>>
>>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph <
>>> prabhujose.ga...@gmail.com> wrote:
>>>
 + Spark-Dev

 For a Spark job on YARN accessing hbase table, added all hbase client
 jars into spark.yarn.dist.files, NodeManager when launching container i.e
 executor, does localization and brings all hbase-client jars into executor
 CWD, but still the executor tasks fail with ClassNotFoundException of hbase
 client jars, when i checked launch container.sh , Classpath does not have
 $PWD/* and hence all the hbase client jars are ignored.

 Is spark.yarn.dist.files not for adding jars into the executor
 classpath.

 Thanks,
 Prabhu Joseph

 On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph <
 prabhujose.ga...@gmail.com> wrote:

> Hi All,
>
>  When i do count on a Hbase table from Spark Shell which runs as
> yarn-client mode, the job fails at count().
>
> MASTER=yarn-client ./spark-shell
>
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor,
> TableName}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>
> val conf = HBaseConfiguration.create()
> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>
> val hBaseRDD = sc.newAPIHadoopRDD(conf,
> classOf[TableInputFormat],classOf[org.a

Re: Hive on Spark performance

2016-03-13 Thread Mich Talebzadeh
Depending on the version of Hive on Spark engine.

As far as I am aware the latest version of Hive that I am using (Hive 2)
has improvements compared to the previous versions of Hive (0.14,1.2.1) on
Spark engine.

As of today I have managed to use Hive 2.0 on Spark version 1.3.1. So it is
not the latest Spark but it is pretty good.

What specific concerns do you have in mind?

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 13 March 2016 at 23:27, sjayatheertha  wrote:

> Just curious if you could share your experience on the performance of
> spark in your company? How much data do you process? And what's the latency
> you are getting with spark engine?
>
> Vidya


append rows to dataframe

2016-03-13 Thread Divya Gehlot
Hi,

Please bear me for asking such a naive question
I have list of conditions (dynamic sqls) sitting in hbase table .
I need to iterate through those dynamic sqls and add the data to dataframes.
As we know dataframes are immutable ,when I try to iterate in for loop as
shown below I get only last dynamic sql result set .

var dffiltered : DataFrame = sqlContext.emptyDataFrame
 for ( i <- 0 to (dfFilterSQLs.length - 1)) {
 println("Condition="+dfFilterSQLs(i))
 dffiltered =
dfresult.filter(dfFilterSQLs(i)).select("Col1","Col2","Col3","Col4","Col5")
  dffiltered.show
  }


How can I keep on appending data to dataframe and get the final result
having all the sql conditions.

Thanks in advance for the help.

Thanks,
Divya


Re: append rows to dataframe

2016-03-13 Thread Ted Yu
Have you tried unionAll() method of DataFrame ?

On Sun, Mar 13, 2016 at 8:44 PM, Divya Gehlot 
wrote:

> Hi,
>
> Please bear me for asking such a naive question
> I have list of conditions (dynamic sqls) sitting in hbase table .
> I need to iterate through those dynamic sqls and add the data to
> dataframes.
> As we know dataframes are immutable ,when I try to iterate in for loop as
> shown below I get only last dynamic sql result set .
>
> var dffiltered : DataFrame = sqlContext.emptyDataFrame
>  for ( i <- 0 to (dfFilterSQLs.length - 1)) {
>  println("Condition="+dfFilterSQLs(i))
>  dffiltered =
> dfresult.filter(dfFilterSQLs(i)).select("Col1","Col2","Col3","Col4","Col5")
>   dffiltered.show
>   }
>
>
> How can I keep on appending data to dataframe and get the final result
> having all the sql conditions.
>
> Thanks in advance for the help.
>
> Thanks,
> Divya
>


Can someone fix this download URL?

2016-03-13 Thread Akhil Das
http://d3kbcqa49mib13.cloudfront.net/spark-1.6.1-bin-hadoop2.6.tgz

[image: Inline image 1]

There's a broken link for Spark 1.6.1 prebuilt hadoop 2.6 direct download.


Thanks
Best Regards


Re: append rows to dataframe

2016-03-13 Thread Ted Yu
 dffiltered = unionAll(dfresult.filter(dfFilterSQLs(
i)).select("Col1","Col2","Col3","Col4","Col5"))

FYI

On Sun, Mar 13, 2016 at 8:50 PM, Ted Yu  wrote:

> Have you tried unionAll() method of DataFrame ?
>
> On Sun, Mar 13, 2016 at 8:44 PM, Divya Gehlot 
> wrote:
>
>> Hi,
>>
>> Please bear me for asking such a naive question
>> I have list of conditions (dynamic sqls) sitting in hbase table .
>> I need to iterate through those dynamic sqls and add the data to
>> dataframes.
>> As we know dataframes are immutable ,when I try to iterate in for loop as
>> shown below I get only last dynamic sql result set .
>>
>> var dffiltered : DataFrame = sqlContext.emptyDataFrame
>>  for ( i <- 0 to (dfFilterSQLs.length - 1)) {
>>  println("Condition="+dfFilterSQLs(i))
>>  dffiltered =
>> dfresult.filter(dfFilterSQLs(i)).select("Col1","Col2","Col3","Col4","Col5")
>>   dffiltered.show
>>   }
>>
>>
>> How can I keep on appending data to dataframe and get the final result
>> having all the sql conditions.
>>
>> Thanks in advance for the help.
>>
>> Thanks,
>> Divya
>>
>
>


Hive Query on Spark fails with OOM

2016-03-13 Thread Prabhu Joseph
Hi All,

A Hive Join query which runs fine and faster in MapReduce takes lot of time
with Spark and finally fails with OOM.

*Query:  hivejoin.py*

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf().setAppName("Hive_Join")
sc = SparkContext(conf=conf)
hiveCtx = HiveContext(sc)
hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
a.instance_id");
results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
print results


*Data Study:*

Number of Rows:

A table has 1002093508
B table has5371668
C table has  1000

No Data Skewness:

item_id in B is unique and A has multiple rows with same item_id, so after
first INNER_JOIN the result set is same 1002093508 rows

instance_id in C is unique and A has multiple rows with same instance_id
(maximum count of number of rows with same instance_id is 250)

Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN has
allotted all the requested resource immediately and no other job is running
on the
cluster.

spark.storage.memoryFraction 0.6
spark.shuffle.memoryFraction 0.2

Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
500GB of intermediate data

Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
output of 400GB is shuffled

Stage 4 - tasks fails with OOM on reading the shuffled output data when it
reached 40GB data itself

First of all, what kind of Hive queries when run on Spark gets a better
performance than Mapreduce. And what are the hive queries that won't perform
well in Spark.

How to calculate the optimal Heap for Executor Memory and the number of
executors for given input data size. We don't specify Spark Executors to
cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
is failing immediately
when it has just read 40GB data, is it caching data in Memory.

And in a Spark job, some stage will need lot of memory for shuffle and some
need lot of memory for cache. So, when a Spark Executor has lot of memory
available
for cache and does not use the cache but when there is a need to do lot of
shuffle, will executors only use the shuffle fraction which is set for
doing shuffle or will it use
the free memory available for cache as well.


Thanks,
Prabhu Joseph


spark streaming web ui remains completed jobs as active jobs

2016-03-13 Thread t7s

 

I am sure these job completed according to the log

For example, job 9816
This is info about job 9816 in log:
[2016-03-14 07:15:05,088] INFO Job 9816 finished: transform at
ErrorStreaming2.scala:396, took 8.218500 s
(org.apache.spark.scheduler.DAGScheduler) 

Stages in job 9816 are completed too according to the log 

But job 9816 is still in active job of web ui, why?
How can I clear these remaining jobs?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-web-ui-remains-completed-jobs-as-active-jobs-tp26473.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.6.1 : SPARK-12089 : java.lang.NegativeArraySizeException

2016-03-13 Thread Ravindra Rawat
Greetings,

I am getting following exception on joining a few parquet files.
SPARK-12089 description has details of the overflow condition which is
marked as fixed in 1.6.1. I recall seeing another issue related to csv
files creating same exception.

Any pointers on how to debug this or possible workarounds? Google
searches and JIRA comments point to either a > 2GB record size (less
likely) or RDD sizes being too large.

I had upgraded to Spark 1.6.1 due to Serialization errors from
Catalyst while reading Parquet files.

Related JIRA Issue => https://issues.apache.org/jira/browse/SPARK-12089

Related PR => https://github.com/apache/spark/pull/10142


java.lang.NegativeArraySizeException
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:45)
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:196)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Thanks.

-- 
Regards
Ravindra


spark streaming web ui remains completed jobs as active jobs

2016-03-13 Thread t7s

 

I am sure these job completed according to the log

For example, job 9816
This is info about job 9816 in log:
[2016-03-14 07:15:05,088] INFO Job 9816 finished: transform at
ErrorStreaming2.scala:396, took 8.218500 s
(org.apache.spark.scheduler.DAGScheduler)

Stages in job 9816 are completed too according to the log

But job 9816 is still in active job of web ui, why?
How can I clear these remaining jobs? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-web-ui-remains-completed-jobs-as-active-jobs-tp26474.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.6.1 : SPARK-12089 : java.lang.NegativeArraySizeException

2016-03-13 Thread Ted Yu
Here is related code:

final int length = totalSize() + neededSize;
if (buffer.length < length) {
  // This will not happen frequently, because the buffer is re-used.
  final byte[] tmp = new byte[length * 2];

Looks like length was positive (since it was bigger than buffer.length) but
length * 2 became negative.
We just need to allocate length bytes instead of length * 2 bytes.

On Sun, Mar 13, 2016 at 10:39 PM, Ravindra Rawat 
wrote:

> Greetings,
>
> I am getting following exception on joining a few parquet files. SPARK-12089 
> description has details of the overflow condition which is marked as fixed in 
> 1.6.1. I recall seeing another issue related to csv files creating same 
> exception.
>
> Any pointers on how to debug this or possible workarounds? Google searches 
> and JIRA comments point to either a > 2GB record size (less likely) or RDD 
> sizes being too large.
>
> I had upgraded to Spark 1.6.1 due to Serialization errors from Catalyst while 
> reading Parquet files.
>
> Related JIRA Issue => https://issues.apache.org/jira/browse/SPARK-12089
>
> Related PR => https://github.com/apache/spark/pull/10142
>
>
> java.lang.NegativeArraySizeException
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:45)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:196)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> Thanks.
>
> --
> Regards
> Ravindra
>


Re: Correct way to use spark streaming with apache zeppelin

2016-03-13 Thread Chris Miller
Cool! Thanks for sharing.


--
Chris Miller

On Sun, Mar 13, 2016 at 12:53 AM, Todd Nist  wrote:

> Below is a link to an example which Silvio Fiorito put together
> demonstrating how to link Zeppelin with Spark Stream for real-time charts.
> I think the original thread was pack in early November 2015, subject: Real
> time chart in Zeppelin, if you care to try to find it.
>
> https://gist.github.com/granturing/a09aed4a302a7367be92
>
> HTH.
>
> -Todd
>
> On Sat, Mar 12, 2016 at 6:21 AM, Chris Miller 
> wrote:
>
>> I'm pretty new to all of this stuff, so bare with me.
>>
>> Zeppelin isn't really intended for realtime dashboards as far as I know.
>> Its reporting features (tables, graphs, etc.) are more for displaying the
>> results from the output of something. As far as I know, there isn't really
>> anything to "watch" a dataset and have updates pushed to the Zeppelin UI.
>>
>> As for Spark, unless you're doing a lot of processing that you didn't
>> mention here, I don't think it's a good fit just for this.
>>
>> If it were me (just off the top of my head), I'd just build a simple web
>> service that uses websockets to push updates to the client which could then
>> be used to update graphs, tables, etc. The data itself -- that is, the
>> accumulated totals -- you could store in something like Redis. When an
>> order comes in, just add that quantity and price to the existing value and
>> trigger your code to push out an updated value to any clients via the
>> websocket. You could use something like a Redis pub/sub channel to trigger
>> the web app to notify clients of an update.
>>
>> There are about 5 million other ways you could design this, but I would
>> just keep it as simple as possible. I just threw one idea out...
>>
>> Good luck.
>>
>>
>> --
>> Chris Miller
>>
>> On Sat, Mar 12, 2016 at 6:58 PM, trung kien  wrote:
>>
>>> Thanks Chris and Mich for replying.
>>>
>>> Sorry for not explaining my problem clearly.  Yes i am talking about a
>>> flexibke dashboard when mention Zeppelin.
>>>
>>> Here is the problem i am having:
>>>
>>> I am running a comercial website where we selle many products and we
>>> have many branchs in many place. We have a lots of realtime transactions
>>> and want to anaylyze it in realtime.
>>>
>>> We dont want every time doing analytics we have to aggregate every
>>> single transactions ( each transaction have BranchID, ProductID, Qty,
>>> Price). So, we maintain intermediate data which contains : BranchID,
>>> ProducrID, totalQty, totalDollar
>>>
>>> Ideally, we have 2 tables:
>>>Transaction ( BranchID, ProducrID, Qty, Price, Timestamp)
>>>
>>> And intermediate table Stats is just sum of every transaction group by
>>> BranchID and ProductID( i am using Sparkstreaming to calculate this table
>>> realtime)
>>>
>>> My thinking is that doing statistics ( realtime dashboard)  on Stats
>>> table is much easier, this table is also not enough for maintain.
>>>
>>> I'm just wondering, whats the best way to store Stats table( a database
>>> or parquet file?)
>>> What exactly are you trying to do? Zeppelin is for interactive analysis
>>> of a dataset. What do you mean "realtime analytics" -- do you mean build a
>>> report or dashboard that automatically updates as new data comes in?
>>>
>>>
>>> --
>>> Chris Miller
>>>
>>> On Sat, Mar 12, 2016 at 3:13 PM, trung kien  wrote:
>>>
 Hi all,

 I've just viewed some Zeppenlin's videos. The intergration between
 Zeppenlin and Spark is really amazing and i want to use it for my
 application.

 In my app, i will have a Spark streaming app to do some basic realtime
 aggregation ( intermediate data). Then i want to use Zeppenlin to do some
 realtime analytics on the intermediate data.

 My question is what's the most efficient storage engine to store
 realtime intermediate data? Is parquet file somewhere is suitable?

>>>
>>>
>>
>


Re: Spark SQL is not returning records for HIVE transactional tables on HDP

2016-03-13 Thread @Sanjiv Singh
Hi All,
We are using for Spark SQL :


   - Hive :1.2.1
   - Spark : 1.3.1
   - Hadoop :2.7.1

Let me know if needs other details to debug the issue.


Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Sun, Mar 13, 2016 at 1:07 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> Thanks for the input. I use Hive 2 and still have this issue.
>
>
>
>1. Hive version 2
>2. Hive on Spark engine 1.3.1
>3. Spark 1.5.2
>
>
> I have added Hive user group  to this as well. So hopefully we may get
> some resolution.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 12 March 2016 at 19:25, Timur Shenkao  wrote:
>
>> Hi,
>>
>> I have suffered from Hive Streaming , Transactions enough, so I can share
>> my experience with you.
>>
>> 1) It's not a problem of Spark. It happens because of "peculiarities" /
>> bugs of Hive Streaming.  Hive Streaming, transactions are very raw
>> technologies. If you look at Hive JIRA, you'll see several critical bugs
>> concerning Hive Streaming, transactions. Some of them are resolved in Hive
>> 2+ only. But Cloudera & Hortonworks ship their distributions with outdated
>> & buggy Hive.
>> So use Hive 2+. Earlier versions of Hive didn't run compaction at all.
>>
>> 2) In Hive 1.1, I  issue the following lines
>> ALTER TABLE default.foo COMPACT 'MAJOR';
>> SHOW COMPACTIONS;
>>
>> My manual compaction was shown but it was never fulfilled.
>>
>> 3) If you use Hive Streaming, it's not recommended or even forbidden to
>> insert rows into Hive Streaming tables manually. Only the process that
>> writes to such table should insert incoming rows sequentially. Otherwise
>> you'll get unpredictable behaviour.
>>
>> 4) Ordinary Hive tables are catalogs with text, ORC, etc. files.
>> Hive Streaming / transactional tables are catalogs that have numerous
>> subcatalogs with "delta" prefix. Moreover, there are files with
>> "flush_length" suffix in some delta subfolders. "flush_length" files have 8
>> bytes length. The presence of "flush_length" file in some subfolder means
>> that Hive writes updates to this subfolder right now. When Hive fails or is
>> restarted, it begins to write into new delta subfolder with new
>> "flush_length" file. And old "flush_length" file (that was used before
>> failure) still remains.
>> One of the goal of compaction is to delete outdated "flush_length" files.
>> Not every application / library can read such folder structure or knows
>> details of Hive Streaming / transactions implementation. Most of the
>> software solutions still expect ordinary Hive tables as input.
>> When they encounter subcatalogs or special files "flush_length" file,
>> applications / libraries either "see nothing" (return 0 or empty result
>> set) or stumble over "flush_length" files (return unexplainable errors).
>>
>> For instance, Facebook Presto couldn't read subfolders by default unless
>> you activate special parameters. But it stumbles over "flush_length" files
>> as Presto expect legal ORC files not 8-byte-length text files in folders.
>>
>> So, I don't advise you to use Hive Streaming, transactions right now in
>> real production systems (24 / 7 /365) with hundreds millions of events a
>> day.
>>
>> On Sat, Mar 12, 2016 at 11:24 AM, @Sanjiv Singh 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am facing this issue on HDP setup on which COMPACTION is required only
>>> once for transactional tables to fetch records with Spark SQL.
>>> On the other hand, Apache setup doesn't required compaction even once.
>>>
>>> May be something got triggered on meta-store after compaction, Spark SQL
>>> start recognizing delta files.
>>>
>>> Let know me if needed other details to get root cause.
>>>
>>> Try this,
>>>
>>> *See complete scenario :*
>>>
>>> hive> create table default.foo(id int) clustered by (id) into 2 buckets
>>> STORED AS ORC TBLPROPERTIES ('transactional'='true');
>>> hive> insert into default.foo values(10);
>>>
>>> scala> sqlContext.table("default.foo").count // Gives 0, which is wrong
>>> because data is still in delta files
>>>
>>> Now run major compaction:
>>>
>>> hive> ALTER TABLE default.foo COMPACT 'MAJOR';
>>>
>>> scala> sqlContext.table("default.foo").count // Gives 1
>>>
>>> hive> insert into foo values(20);
>>>
>>> scala> sqlContext.table("default.foo").count* // Gives 2 , no
>>> compaction required.*
>>>
>>>
>>>
>>>
>>> Regards
>>> Sanjiv Singh
>>> Mob :  +091 9990-447-339
>>>
>>
>>
>


Re: Kafka + Spark streaming, RDD partitions not processed in parallel

2016-03-13 Thread Mukul Gupta
Sorry for the late reply. I am new to Java and it took me a while to set things 
up.

Yes, you are correct that kafka client libs need not be specifically added. I 
didn't realized that . I removed the same and code still compiled. However, 
upon execution, I encountered the same issue as before.

Following is the link to repository:
https://github.com/guptamukul/sparktest.git


From: Cody Koeninger 
Sent: 11 March 2016 23:04
To: Mukul Gupta
Cc: user@spark.apache.org
Subject: Re: Kafka + Spark streaming, RDD partitions not processed in parallel

Why are you including a specific dependency on Kafka?  Spark's
external streaming kafka module already depends on kafka.

Can you link to an actual repo with build file etc?

On Fri, Mar 11, 2016 at 11:21 AM, Mukul Gupta  wrote:
> Please note that while building jar of code below, i used spark 1.6.0 + kafka 
> 0.9.0.0 libraries
> I also tried spark 1.5.0 + kafka 0.9.0.1 combination, but encountered the 
> same issue.
>
> I could not use the ideal combination spark 1.6.0 + kafka 0.9.0.1 (which 
> matches with spark and kafka versions installed on my machine) because while 
> doing so, i get the following error at run time:
> Exception in thread "main" java.lang.ClassCastException: 
> kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
>
> package sparktest;
>
> import java.util.Arrays;
> import java.util.HashMap;
> import java.util.HashSet;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaPairInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import kafka.serializer.StringDecoder;
> import scala.Tuple2;
>
> package sparktest;
>
> import java.util.Arrays;
> import java.util.HashMap;
> import java.util.HashSet;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaPairInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import kafka.serializer.StringDecoder;
> import scala.Tuple2;
>
> public class SparkTest {
>
> public static void main(String[] args) {
>
> if (args.length < 5) {
> System.err.println("Usage: SparkTest
>  ");
> System.exit(1);
> }
>
> String kafkaBroker = args[0];
> String sparkMaster = args[1];
> String topics = args[2];
> String consumerGroupID = args[3];
> String durationSec = args[4];
>
> int duration = 0;
>
> try {
> duration = Integer.parseInt(durationSec);
> } catch (Exception e) {
> System.err.println("Illegal duration");
> System.exit(1);
> }
>
> HashSet topicsSet = new 
> HashSet(Arrays.asList(topics.split(",")));
>
> SparkConf  conf = new 
> SparkConf().setMaster(sparkMaster).setAppName("DirectStreamDemo");
>
> JavaStreamingContext jssc = new JavaStreamingContext(conf, 
> Durations.seconds(duration));
>
> HashMap kafkaParams = new HashMap();
> kafkaParams.put("metadata.broker.list", kafkaBroker);
> kafkaParams.put("group.id", consumerGroupID);
>
> JavaPairInputDStream messages = 
> KafkaUtils.createDirectStream(jssc, String.class, String.class,
> StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
>
> JavaDStream processed = messages.map(new Function String>, String>() {
>
> @Override
> public String call(Tuple2 arg0) throws Exception {
>
> Thread.sleep(7000);
> return arg0._2;
> }
> });
>
> processed.print(90);
>
> try {
> jssc.start();
> jssc.awaitTermination();
> } catch (Exception e) {
>
> } finally {
> jssc.close();
> }
> }
> }
>
>
> 
> From: Cody Koeninger 
> Sent: 11 March 2016 20:42
> To: Mukul Gupta
> Cc: user@spark.apache.org
> Subject: Re: Kafka + Spark streaming, RDD partitions not processed in parallel
>
> Can you post your actual code?
>
> On Thu, Mar 10, 2016 at 9:55 PM, Mukul Gupta  wrote:
>> Hi All, I was running the following test: Setup 9 VM runing spark workers
>> with 1 spark executor each. 1 VM running kafka and spark master. Spark
>> version is 1.6.0 Kafka version is 0.9.0.1 Spark is using its own resource
>> manager and is not running over YARN. Test I created a kafka topic with 3
>> partition. next I used "KafkaUtils.createDirectStream" to get a DStream.
>> JavaPairInputDStream stream =
>> KafkaUtils.createDirectStream(…); JavaDStream stream1 = stream.map(func1);
>> stream1.print(); where func1 just contains a sleep followed by returning of
>> value. Observation First RDD partition corresponding to partition 1 of kafka
>> was processed on one of the spark executor. Once processing is finished,
>> then RDD partitions corresponding to remaining two kaf

Re: Strange behavior of collectNeighbors API in GraphX

2016-03-13 Thread Zhaokang Wang
After further debugging into this issue, I find that this bug is related to
the triplets view of a graph in GraphX. 
If a graph is generated by outer join two other graphs via outerJoinVertices
operation, the graph's triplets view and the vertices view may be
inconsistent. In the following example, dataGraph's triplets view will be
inconsistent with its vertices view.

> // purGraph is a toy graph with edges: 2->1, 3->1, 2->3.
> val purGraph = Graph(dataVertex, dataEdge).persist()
> purGraph.triplets.count()
> val inNeighborGraph = purGraph.collectNeighbors(EdgeDirection.In)
> val dataGraph = purGraph.outerJoinVertices(inNeighborGraph)((vid,
> property, inNeighborList) => {...
> })
> 
> //
*
>  dataGraph's vertices view and triplets view will be inconsistent. 
*
> dataGraph.vertices.foreach {...}
> dataGraph.triplets.foreach {...}

I find a similar issue on JIRA  SPARK-6378
  . I have gave more
details and a code demo in the issue's comment. If you are interested in
this issue, you can follow it there. Thanks a lot.

Regards,
Zhaokang



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behavior-of-collectNeighbors-API-in-GraphX-tp26459p26468.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Save the model produced after training with ALS.

2016-03-13 Thread Shishir Anshuman
hello,

I am using the sample code

for ALS algorithm implementation. I want to save the model produced after
training in a separate file. The 'modelPath' in model.save() stores some
metadata.
I am new to Apache spark, please suggest me how to get the model after
training on the data-set.

Thank you.


Re: Correct way to use spark streaming with apache zeppelin

2016-03-13 Thread trung kien
Thanks all for actively sharing your experience.

@Chris: using something like Redis is something I am trying to figure out.
I have  a lots of transactions, so I couldn't trigger update event for
every single transaction.
I'm looking at Spark Streaming because it provide batch processing (e.g I
can update the cache every 5 seconds). In addition Spark can scale pretty
well and I don't have to worry about losing data.

Now having the cache with following information:
 * Date
 * BranchID
 * ProductID
 TotalQty
 TotalDollar

* is key, note that I have history data as well (byday).

Now I want to use zeppelin for querying again the cache (while the cache is
updating).
I don't need the Zeppelin update automatically (I can hit the run button
myself :) )
Just curious if parquet is the right solution for us?



On Sun, Mar 13, 2016 at 3:25 PM, Chris Miller 
wrote:

> Cool! Thanks for sharing.
>
>
> --
> Chris Miller
>
> On Sun, Mar 13, 2016 at 12:53 AM, Todd Nist  wrote:
>
>> Below is a link to an example which Silvio Fiorito put together
>> demonstrating how to link Zeppelin with Spark Stream for real-time charts.
>> I think the original thread was pack in early November 2015, subject: Real
>> time chart in Zeppelin, if you care to try to find it.
>>
>> https://gist.github.com/granturing/a09aed4a302a7367be92
>>
>> HTH.
>>
>> -Todd
>>
>> On Sat, Mar 12, 2016 at 6:21 AM, Chris Miller 
>> wrote:
>>
>>> I'm pretty new to all of this stuff, so bare with me.
>>>
>>> Zeppelin isn't really intended for realtime dashboards as far as I know.
>>> Its reporting features (tables, graphs, etc.) are more for displaying the
>>> results from the output of something. As far as I know, there isn't really
>>> anything to "watch" a dataset and have updates pushed to the Zeppelin UI.
>>>
>>> As for Spark, unless you're doing a lot of processing that you didn't
>>> mention here, I don't think it's a good fit just for this.
>>>
>>> If it were me (just off the top of my head), I'd just build a simple web
>>> service that uses websockets to push updates to the client which could then
>>> be used to update graphs, tables, etc. The data itself -- that is, the
>>> accumulated totals -- you could store in something like Redis. When an
>>> order comes in, just add that quantity and price to the existing value and
>>> trigger your code to push out an updated value to any clients via the
>>> websocket. You could use something like a Redis pub/sub channel to trigger
>>> the web app to notify clients of an update.
>>>
>>> There are about 5 million other ways you could design this, but I would
>>> just keep it as simple as possible. I just threw one idea out...
>>>
>>> Good luck.
>>>
>>>
>>> --
>>> Chris Miller
>>>
>>> On Sat, Mar 12, 2016 at 6:58 PM, trung kien  wrote:
>>>
 Thanks Chris and Mich for replying.

 Sorry for not explaining my problem clearly.  Yes i am talking about a
 flexibke dashboard when mention Zeppelin.

 Here is the problem i am having:

 I am running a comercial website where we selle many products and we
 have many branchs in many place. We have a lots of realtime transactions
 and want to anaylyze it in realtime.

 We dont want every time doing analytics we have to aggregate every
 single transactions ( each transaction have BranchID, ProductID, Qty,
 Price). So, we maintain intermediate data which contains : BranchID,
 ProducrID, totalQty, totalDollar

 Ideally, we have 2 tables:
Transaction ( BranchID, ProducrID, Qty, Price, Timestamp)

 And intermediate table Stats is just sum of every transaction group by
 BranchID and ProductID( i am using Sparkstreaming to calculate this table
 realtime)

 My thinking is that doing statistics ( realtime dashboard)  on Stats
 table is much easier, this table is also not enough for maintain.

 I'm just wondering, whats the best way to store Stats table( a database
 or parquet file?)
 What exactly are you trying to do? Zeppelin is for interactive analysis
 of a dataset. What do you mean "realtime analytics" -- do you mean build a
 report or dashboard that automatically updates as new data comes in?


 --
 Chris Miller

 On Sat, Mar 12, 2016 at 3:13 PM, trung kien  wrote:

> Hi all,
>
> I've just viewed some Zeppenlin's videos. The intergration between
> Zeppenlin and Spark is really amazing and i want to use it for my
> application.
>
> In my app, i will have a Spark streaming app to do some basic realtime
> aggregation ( intermediate data). Then i want to use Zeppenlin to do some
> realtime analytics on the intermediate data.
>
> My question is what's the most efficient storage engine to store
> realtime intermediate data? Is parquet file somewhere is suitable?
>


>>>
>>
>


-- 
Thanks
Kien