sortBy transformation shows as a job

2016-01-05 Thread Soumitra Kumar
Fellows,
I have a simple code.
sc.parallelize (Array (1, 4, 3, 2), 2).sortBy (i=>i).foreach (println)
This results in 2 jobs (sortBy, foreach) in Spark's application master ui.
I thought there is one to one relationship between RDD action and job. Here, 
only action is foreach, so should be only one job.
Please help me understand.
Thanks,-Soumitra.

Re: Use Spark Streaming for Batch?

2015-02-22 Thread Soumitra Kumar
See if https://issues.apache.org/jira/browse/SPARK-3660 helps you. My patch
has been accepted and, this enhancement is scheduled for 1.3.0.

This lets you specify initialRDD for updateStateByKey operation. Let me
know if you need any information.

On Sun, Feb 22, 2015 at 5:21 PM, Tobias Pfeiffer  wrote:

> Hi,
>
> On Sat, Feb 21, 2015 at 1:05 AM, craigv 
>> wrote:
>> > /Might it be possible to perform "large batches" processing on HDFS time
>> > series data using Spark Streaming?/
>> >
>> > 1.I understand that there is not currently an InputDStream that could do
>> > what's needed.  I would have to create such a thing.
>> > 2. Time is a problem.  I would have to use the timestamps on our events
>> for
>> > any time-based logic and state management
>> > 3. The "batch duration" would become meaningless in this scenario.
>> Could I
>> > just set it to something really small (say 1 second) and then let it
>> "fall
>> > behind", processing the data as quickly as it could?
>>
>
> So, if it is not an issue for you if everything is processed in one batch,
> you can use streamingContext.textFileStream(hdfsDirectory). This will
> create a DStream that has a huge RDD with all data in the first batch and
> then empty batches afterwards. You can have small batch size, should not be
> a problem.
> An alternative would be to write some code that creates one RDD per file
> in your HDFS directory, create a Queue of those RDDs and then use
> streamingContext.queueStream(), possibly with the oneAtATime=true parameter
> (which will process only one RDD per batch).
>
> However, to do window computations etc with the timestamps embedded *in*
> your data will be a major effort, as in: You cannot use the existing
> windowing functionality from Spark Streaming. If you want to read more
> about that, there have been a number of discussions about that topic on
> this list; maybe you can look them up.
>
> Tobias
>
>


Re: Error reporting/collecting for users

2015-01-27 Thread Soumitra Kumar
It is a Streaming application, so how/when do you plan to access the
accumulator on driver?

On Tue, Jan 27, 2015 at 6:48 PM, Tobias Pfeiffer  wrote:

> Hi,
>
> thanks for your mail!
>
> On Wed, Jan 28, 2015 at 11:44 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> That seems reasonable to me. Are you having any problems doing it this
>> way?
>>
>
> Well, actually I haven't done that yet. The idea of using accumulators to
> collect errors just came while writing the email, but I thought I'd just
> keep writing and see if anyone has any other suggestions ;-)
>
> Thanks
> Tobias
>
>


Re: Stop streaming context gracefully when SIGTERM is passed

2014-12-15 Thread Soumitra Kumar
Hi Adam,

I have following scala actor based code to do graceful shutdown:

class TimerActor (val timeout : Long, val who : Actor) extends Actor {
def act {
reactWithin (timeout) {
case TIMEOUT => who ! SHUTDOWN
}
}
}

class SSCReactor (val ssc : StreamingContext) extends Actor with Logging {
def act {
react {
case SHUTDOWN =>
logger.info (s"Shutting down gracefully ...")
ssc.stop (true, true)
}
}
}

I see following message:

14/10/22 01:40:49 INFO SSCReactor: Shutting down gracefully ...
14/10/22 01:40:49 INFO JobGenerator: Stopping JobGenerator gracefully
14/10/22 01:40:49 INFO JobGenerator: Waiting for all received blocks to be
consumed for job generation
14/10/22 01:40:49 INFO JobGenerator: Waited for all received blocks to be
consumed for job generation

-Soumitra.


On Mon, Dec 15, 2014 at 1:32 PM, Budde, Adam  wrote:
>
>  Hi all,
>
>  We are using Spark Streaming ETL a large volume of time series datasets.
> In our current design, each dataset we ETL will have a corresponding Spark
> Streaming context + process running on our cluster. Each of these processes
> will be passed configuration options specifying the data source to process
> as well as various tuning parameters such as the number of Receiver objects
> to use, batch interval size, number of partitions, etc.
>
>  Since the volume of data we're ingesting for each dataset will fluctuate
> over time, we'd like to be able to regularly send a SIGTERM to the Spark
> Streaming process handling the ETL, have that process gracefully complete
> processing any in-flight data, and restart the process with updated
> configuration options. The most obvious solution seems to be to call the
> stop(stopSparkContext: Boolean, stopGracefully: Boolean) method provided
> by StreamingContext in a shutdown hook, but this approach doesn't seem to
> be working for me. Here's a rough idea of what my code looks like:
>
>  > val ssc = new StreamingContext(conf, Seconds(15))
> >
> > ...
> >
> > // Add shutdown hook to exit gracefully upon termination.
> > Runtime.getRuntime().addShutdownHook(new Thread() extends Logging {
> >   override def run() = {
> > logInfo("Exiting gracefully...")
> > ssc.stop(true, true)
> >   }
> > })
> >
> > ...
> >
> > ssc.start()
> > ssc.awaitTermination()
>
>  Whenever I try to kill the process, I don't see the "Exiting
> gracefully…" log message I've added. I tried grokking through the Spark
> source code to see if some other shutdown hook might be squashing the hook
> I've added by causing the process to exit before this hook is invoked, but
> I haven't found anything that would cause concern yet. Does anybody have
> any advice or insight on this? I'm a bit of a novice when it comes to the
> JVM and I'm afraid that I'm reaching the limits of my diagnostic abilities
> here.
>
>  Thanks,
> Adam
>


Re: Question about textFileStream

2014-11-10 Thread Soumitra Kumar
Entire file in a window.

On Mon, Nov 10, 2014 at 9:20 AM, Saiph Kappa  wrote:

> Hi,
>
> In my application I am doing something like this "new
> StreamingContext(sparkConf, Seconds(10)).textFileStream("logs/")", and I
> get some unknown exceptions when I copy a file with about 800 MB to that
> folder ("logs/"). I have a single worker running with 512 MB of memory.
>
> Anyone can tell me if every 10 seconds spark reads parts of that big file,
> or if it attempts to read the entire file in a single window? How does it
> work?
>
> Thanks.
>
>


Print dependency graph as DOT file

2014-10-16 Thread Soumitra Kumar
Hello,

Is there a way to print the dependency graph of complete program or RDD/DStream 
as a DOT file? It would be very helpful to have such a thing.

Thanks,
-Soumitra.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to name a DStream

2014-10-16 Thread Soumitra Kumar
Hello,

I am debugging my code to find out what else to cache.

Following is a line in log:

14/10/16 12:00:01 INFO TransformedDStream: Persisting RDD 6 for time 
141348600 ms to StorageLevel(true, true, false, false, 1) at time 
141348600 ms

Is there a way to name a DStream? RDD has a name method, but DStream does not.

Please let me know if there a way to map the DStream to a location in my source.

Thanks,
-Soumitra.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to add HBase dependencies and conf with spark-submit?

2014-10-16 Thread Soumitra Kumar
Great, it worked.

I don't have an answer what is special about SPARK_CLASSPATH vs --jars, just 
found the working setting through trial an error.

- Original Message -
From: "Fengyun RAO" 
To: "Soumitra Kumar" 
Cc: user@spark.apache.org, u...@hbase.apache.org
Sent: Thursday, October 16, 2014 12:50:01 AM
Subject: Re: How to add HBase dependencies and conf with spark-submit?





Thanks, Soumitra Kumar, 

I didn’t know why you put hbase-protocol.jar in SPARK_CLASSPATH, while add 
hbase-protocol.jar , hbase-common.jar , hbase-client.jar , htrace-core.jar in 
--jar, but it did work. 

Actually, I put all these four jars in SPARK_CLASSPATH along with HBase conf 
directory. 
 


2014-10-15 22:39 GMT+08:00 Soumitra Kumar < kumar.soumi...@gmail.com > : 


I am writing to HBase, following are my options: 

export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar 

spark-submit \ 
--jars 
/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar
 \ 



- Original Message - 
From: "Fengyun RAO" < raofeng...@gmail.com > 
To: user@spark.apache.org , u...@hbase.apache.org 
Sent: Wednesday, October 15, 2014 6:29:21 AM 
Subject: Re: How to add HBase dependencies and conf with spark-submit? 


+user@hbase 



2014-10-15 20:48 GMT+08:00 Fengyun RAO < raofeng...@gmail.com > : 



We use Spark 1.1, and HBase 0.98.1-cdh5.1.0, and need to read and write an 
HBase table in Spark program. 



I notice there are: 

spark.driver.extraClassPath spark.executor.extraClassPath properties to manage 
extra ClassPath, over even an deprecated SPARK_CLASSPATH. 


The problem is what classpath or jars should we append? 
I can simplely add the whole `hbase classpath`, which is huge, 
but this leads to dependencies conflict, e.g. HBase uses guava-12 while Spark 
uses guava-14. 




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to add HBase dependencies and conf with spark-submit?

2014-10-15 Thread Soumitra Kumar
I am writing to HBase, following are my options:

export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar

spark-submit \
--jars 
/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar,/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar,/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar
 \

- Original Message -
From: "Fengyun RAO" 
To: user@spark.apache.org, u...@hbase.apache.org
Sent: Wednesday, October 15, 2014 6:29:21 AM
Subject: Re: How to add HBase dependencies and conf with spark-submit?


+user@hbase 



2014-10-15 20:48 GMT+08:00 Fengyun RAO < raofeng...@gmail.com > : 



We use Spark 1.1, and HBase 0.98.1-cdh5.1.0, and need to read and write an 
HBase table in Spark program. 



I notice there are: 

spark.driver.extraClassPath spark.executor.extraClassPath properties to manage 
extra ClassPath, over even an deprecated SPARK_CLASSPATH. 


The problem is what classpath or jars should we append? 
I can simplely add the whole `hbase classpath`, which is huge, 
but this leads to dependencies conflict, e.g. HBase uses guava-12 while Spark 
uses guava-14. 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka->HDFS to store as Parquet format

2014-10-07 Thread Soumitra Kumar
Currently I am not doing anything, if anything change start from scratch.

In general I doubt there are many options to account for schema changes. If you 
are reading files using impala, then it may allow if the schema changes are 
append only. Otherwise existing Parquet files have to be migrated to new schema.

- Original Message -
From: "Buntu Dev" 
To: "Soumitra Kumar" 
Cc: u...@spark.incubator.apache.org
Sent: Tuesday, October 7, 2014 10:18:16 AM
Subject: Re: Kafka->HDFS to store as Parquet format


Thanks for the info Soumitra.. its a good start for me. 


Just wanted to know how you are managing schema changes/evolution as 
parquetSchema is provided to setSchema in the above sample code. 


On Tue, Oct 7, 2014 at 10:09 AM, Soumitra Kumar < kumar.soumi...@gmail.com > 
wrote: 


I have used it to write Parquet files as: 

val job = new Job 
val conf = job.getConfiguration 
conf.set (ParquetOutputFormat.COMPRESSION, CompressionCodecName.SNAPPY.name ()) 
ExampleOutputFormat.setSchema (job, MessageTypeParser.parseMessageType 
(parquetSchema)) 
rdd saveAsNewAPIHadoopFile (rddToFileName (outputDir, em, time), classOf[Void], 
classOf[Group], classOf[ExampleOutputFormat], conf) 



- Original Message - 
From: "bdev" < buntu...@gmail.com > 
To: u...@spark.incubator.apache.org 
Sent: Tuesday, October 7, 2014 9:51:40 AM 
Subject: Re: Kafka->HDFS to store as Parquet format 

After a bit of looking around, I found saveAsNewAPIHadoopFile could be used 
to specify the ParquetOutputFormat. Has anyone used it to convert JSON to 
Parquet format or any pointers are welcome, thanks! 



-- 
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-HDFS-to-store-as-Parquet-format-tp15768p15852.html
 
Sent from the Apache Spark User List mailing list archive at Nabble.com. 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka->HDFS to store as Parquet format

2014-10-07 Thread Soumitra Kumar
I have used it to write Parquet files as:

val job = new Job
val conf = job.getConfiguration
conf.set (ParquetOutputFormat.COMPRESSION, CompressionCodecName.SNAPPY.name ())
ExampleOutputFormat.setSchema (job, MessageTypeParser.parseMessageType 
(parquetSchema))
rdd saveAsNewAPIHadoopFile (rddToFileName (outputDir, em, time), classOf[Void], 
classOf[Group], classOf[ExampleOutputFormat], conf)

- Original Message -
From: "bdev" 
To: u...@spark.incubator.apache.org
Sent: Tuesday, October 7, 2014 9:51:40 AM
Subject: Re: Kafka->HDFS to store as Parquet format

After a bit of looking around, I found saveAsNewAPIHadoopFile could be used
to specify the ParquetOutputFormat. Has anyone used it to convert JSON to
Parquet format or any pointers are welcome, thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-HDFS-to-store-as-Parquet-format-tp15768p15852.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to initialize updateStateByKey operation

2014-09-23 Thread Soumitra Kumar
I thought I did a good job ;-)

OK, so what is the best way to initialize updateStateByKey operation? I have 
counts from previous spark-submit, and want to load that in next spark-submit 
job.

- Original Message -
From: "Soumitra Kumar" 
To: "spark users" 
Sent: Sunday, September 21, 2014 10:43:01 AM
Subject: How to initialize updateStateByKey operation

I started with StatefulNetworkWordCount to have a running count of words seen.

I have a file 'stored.count' which contains the word counts.

$ cat stored.count
a 1
b 2

I want to initialize StatefulNetworkWordCount with the values in 'stored.count' 
file, how do I do that?

I looked at the paper 'EECS-2012-259.pdf' Matei et al, in figure 2, it would be 
useful to have an initial RDD feeding into 'counts' at 't = 1', as below.

   initial
 |
t = 1: pageView -> ones -> counts
 |
t = 2: pageView -> ones -> counts
...

I have also attached the modified figure 2 of 
http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf .

I managed to hack Spark code to achieve this, and attaching the modified files.

Essentially, I added an argument 'initial : RDD [(K, S)]' to updateStateByKey 
method, as
def updateStateByKey[S: ClassTag](
initial : RDD [(K, S)],
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
  ): DStream[(K, S)]

If it sounds interesting for larger crowd I would be happy to cleanup the code, 
and volunteer to push into the code. I don't know the procedure to that though.

-Soumitra.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to initialize updateStateByKey operation

2014-09-21 Thread Soumitra Kumar
I started with StatefulNetworkWordCount to have a running count of words seen.

I have a file 'stored.count' which contains the word counts.

$ cat stored.count
a 1
b 2

I want to initialize StatefulNetworkWordCount with the values in 'stored.count' 
file, how do I do that?

I looked at the paper 'EECS-2012-259.pdf' Matei et al, in figure 2, it would be 
useful to have an initial RDD feeding into 'counts' at 't = 1', as below.

   initial
 |
t = 1: pageView -> ones -> counts
 |
t = 2: pageView -> ones -> counts
...

I have also attached the modified figure 2 of 
http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf .

I managed to hack Spark code to achieve this, and attaching the modified files.

Essentially, I added an argument 'initial : RDD [(K, S)]' to updateStateByKey 
method, as
def updateStateByKey[S: ClassTag](
initial : RDD [(K, S)],
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
  ): DStream[(K, S)]

If it sounds interesting for larger crowd I would be happy to cleanup the code, 
and volunteer to push into the code. I don't know the procedure to that though.

-Soumitra.
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.streaming.dstream

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.{Time, Duration}

/**
 * Extra functions available on DStream of (key, value) pairs through an implicit conversion.
 * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
 * these functions.
 */
class PairDStreamFunctions[K, V](self: DStream[(K,V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
  extends Serializable
{
  private[streaming] def ssc = self.ssc

  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
new HashPartitioner(numPartitions)
  }

  /**
   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
   * generate the RDDs with Spark's default number of partitions.
   */
  def groupByKey(): DStream[(K, Iterable[V])] = {
groupByKey(defaultPartitioner())
  }

  /**
   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
   * generate the RDDs with `numPartitions` partitions.
   */
  def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(numPartitions))
  }

  /**
   * Return a new DStream by applying `groupByKey` on each RDD. The supplied
   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
   */
  def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
  .asInstanceOf[DStream[(K, Iterable[V])]]
  }

  /**
   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
   * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
   * with Spark's default number of partitions.
   */
  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner())
  }

  /**
   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
   * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
   * with `numPartitions` partitions.
   */
  def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner(numPart

Re: Bulk-load to HBase

2014-09-19 Thread Soumitra Kumar
I successfully did this once.

RDD map to RDD [(ImmutableBytesWritable, KeyValue)]
then
val conf = HBaseConfiguration.create()
val job = new Job (conf, "CEF2HFile")
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]);
job.setMapOutputValueClass (classOf[KeyValue]);
val table = new HTable(conf, "output")
HFileOutputFormat.configureIncrementalLoad (job, table);
saveAsNewAPIHadoopFile("hdfs://localhost.localdomain:8020/user/cloudera/spark", 
classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf)

Then I do
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 
/user/cloudera/spark output
to load the HFiles to hbase.

- Original Message -
From: "Ted Yu" 
To: "Aniket Bhatnagar" 
Cc: "innowireless TaeYun Kim" , "user" 

Sent: Friday, September 19, 2014 2:29:51 PM
Subject: Re: Bulk-load to HBase


Please see http://hbase.apache.org/book.html#completebulkload 
LoadIncrementalHFiles has a main() method. 


On Fri, Sep 19, 2014 at 5:41 AM, Aniket Bhatnagar < aniket.bhatna...@gmail.com 
> wrote: 



Agreed that the bulk import would be faster. In my case, I wasn't expecting a 
lot of data to be uploaded to HBase and also, I didn't want to take the pain of 
importing generated HFiles into HBase. Is there a way to invoke HBase HFile 
import batch script programmatically? 




On 19 September 2014 17:58, innowireless TaeYun Kim < 
taeyun@innowireless.co.kr > wrote: 






In fact, it seems that Put can be used by HFileOutputFormat, so Put object 
itself may not be the problem. 

The problem is that TableOutputFormat uses the Put object in the normal way 
(that goes through normal write path), while HFileOutFormat uses it to directly 
build the HFile. 





From: innowireless TaeYun Kim [mailto: taeyun@innowireless.co.kr ] 
Sent: Friday, September 19, 2014 9:20 PM 


To: user@spark.apache.org 
Subject: RE: Bulk-load to HBase 







Thank you for the example code. 



Currently I use foreachPartition() + Put(), but your example code can be used 
to clean up my code. 



BTW, since the data uploaded by Put() goes through normal HBase write path, it 
can be slow. 

So, it would be nice if bulk-load could be used, since it bypasses the write 
path. 



Thanks. 



From: Aniket Bhatnagar [ mailto:aniket.bhatna...@gmail.com ] 
Sent: Friday, September 19, 2014 9:01 PM 
To: innowireless TaeYun Kim 
Cc: user 
Subject: Re: Bulk-load to HBase 




I have been using saveAsNewAPIHadoopDataset but I use TableOutputFormat instead 
of HFileOutputFormat. But, hopefully this should help you: 






val hbaseZookeeperQuorum = s"$zookeeperHost:$zookeeperPort:$zookeeperHbasePath" 


val conf = HBaseConfiguration.create() 


conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum) 


conf.set(TableOutputFormat.QUORUM_ADDRESS, hbaseZookeeperQuorum) 


conf.set(TableOutputFormat.QUORUM_PORT, zookeeperPort.toString) 



conf.setClass("mapreduce.outputformat.class", 
classOf[TableOutputFormat[Object]], classOf[OutputFormat[Object, Writable]]) 


conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) 





val rddToSave: RDD[(Array[Byte], Array[Byte], Array[Byte])] = ... // Some RDD 
that contains row key, column qualifier and data 






val putRDD = rddToSave.map(tuple => { 


val (rowKey, column data) = tuple 


val put: Put = new Put(rowKey) 


put.add(COLUMN_FAMILY_RAW_DATA_BYTES, column, data) 





(new ImmutableBytesWritable(rowKey), put) 


}) 





putRDD.saveAsNewAPIHadoopDataset(conf) 








On 19 September 2014 16:52, innowireless TaeYun Kim < 
taeyun@innowireless.co.kr > wrote: 



Hi, 



Sorry, I just found saveAsNewAPIHadoopDataset. 

Then, Can I use HFileOutputFormat with saveAsNewAPIHadoopDataset? Is there any 
example code for that? 



Thanks. 





From: innowireless TaeYun Kim [mailto: taeyun@innowireless.co.kr ] 
Sent: Friday, September 19, 2014 8:18 PM 
To: user@spark.apache.org 
Subject: RE: Bulk-load to HBase 





Hi, 



After reading several documents, it seems that saveAsHadoopDataset cannot use 
HFileOutputFormat. 

It ’ s because saveAsHadoopDataset method uses JobConf, so it belongs to the 
old Hadoop API, while HFileOutputFormat is a member of mapreduce package which 
is for the new Hadoop API. 



Am I right? 

If so, is there another method to bulk-load to HBase from RDD? 



Thanks. 





From: innowireless TaeYun Kim [ mailto:taeyun@innowireless.co.kr ] 
Sent: Friday, September 19, 2014 7:17 PM 
To: user@spark.apache.org 
Subject: Bulk-load to HBase 



Hi, 



Is there a way to bulk-load to HBase from RDD? 

HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I 
cannot figure out how to use it with saveAsHadoopDataset. 



Thanks. 




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming and ReactiveMongo

2014-09-19 Thread Soumitra Kumar
onStart should be non-blocking. You may try to create a thread in onStart 
instead.

- Original Message -
From: "t1ny" 
To: u...@spark.incubator.apache.org
Sent: Friday, September 19, 2014 1:26:42 AM
Subject: Re: Spark Streaming and ReactiveMongo

Here's what we've tried so far as a first example of a custom Mongo receiver
:

/class MongoStreamReceiver(host: String)
  extends NetworkReceiver[String] {

  protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_AND_DISK_SER_2)

  protected def onStart() = {
blocksGenerator.start()

val driver = new MongoDriver
val connection = driver.connection(List("m01-pdp2"))
val db = connection.db("local")
val collection = db.collection[BSONCollection]("oplog.rs")

val query = BSONDocument("op" -> "i")

val enumerator =
  collection.
find(query).
options(QueryOpts().tailable.awaitData).
cursor[BSONDocument].
enumerate()

val processor: Iteratee[BSONDocument, Unit] =
  Iteratee.foreach { doc =>
blocksGenerator += BSONDocument.pretty(doc)
  }

enumerator |>>> processor
  }

  protected def onStop() {
blocksGenerator.stop()
  }
}
/
However this code doesn't run, probably because of serialization issues (no
logs to confirm this though, just no data in the stream...)

Note that if we comment out the ReactiveMongo-related code and put something
like this instead, the code runs fine :
/for (i <- 0 until 1000) {
  blocksGenerator += "hello world"
  Thread.sleep(1000)
}
/
The Java socket example (found  here
  )
works fine as well.

Any hints ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-ReactiveMongo-tp14568p14661.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-18 Thread Soumitra Kumar
2.10.jar
>> \
>> --driver-memory 2G \
>> --executor-memory 16G \
>> --executor-cores 16 \
>> --num-executors 10 \
>> --spark.serializer org.apache.spark.serializer.KryoSerializer \
>> --spark.rdd.compress true \
>> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
>> --spark.akka.threads 64 \
>> --spark.akka.frameSize 500 \
>> --spark.task.maxFailures 64 \
>> --spark.scheduler.mode FAIR \
>> --spark.yarn.executor.memoryOverhead 4096 \
>> --spark.yarn.driver.memoryOverhead 1024 \
>> --spark.shuffle.consolidateFiles true \
>> --spark.default.parallelism 528 \
>>>logs/normRunLog-$run.log \
>> 2>logs/normRunLogError-$run.log & \
>> echo $! > logs/current-run.pid
>>
>> Some code optimizations (or, goof ups that I fixed). I did not
>> scientifically measure the impact of each but I think they helped:
>> - Made all my classes and objects serializable and then use Kryo (as
>> you see above)
>> - I map one receive task for each kafka partition
>> - Instead of doing a "union" on all the incoming streams and then
>> repartition() I now repartition() each incoming stream and process
>> them separately. I believe this reduces shuffle.
>> - Reduced number of repartitions. I was doing 128 after doing a
>> "union" on all incoming dStreams. I now repartition each of the five
>> streams separately (in a loop) to 24.
>> - For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
>> - Process data per partition instead of per RDD: dataout.foreachRDD(
>> rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
>> - Specific to kafka: when I create "new Producer", make sure I "close"
>> it else I had a ton of "too many files open" errors :)
>> - Use immutable objects as far as possible. If I use mutable objects
>> within a method/class then I turn them into immutable before passing
>> onto another class/method.
>> - For logging, create a LogService object that I then use for other
>> object/class declarations. Once instantiated, I can make "logInfo"
>> calls from within other Objects/Methods/Classes and output goes to the
>> "stderr" file in the Yarn container logs. Good for debugging stream
>> processing logic.
>>
>> Currently, my processing delay is lower than my dStream time window so
>> all is good. I get a ton of these errors in my driver logs:
>> 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
>> threw an exception
>>
>> These seem related to: https://issues.apache.org/jira/browse/SPARK-2316
>>
>> Best I understand and have been told, this does not affect data
>> integrity but may cause un-necessary recomputes.
>>
>> Hope this helps,
>>
>> Tim
>>
>>
>> On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
>>  wrote:
>>> Hmm, no response to this thread!
>>>
>>> Adding to it, please share experiences of building an enterprise grade 
>>> product based on Spark Streaming.
>>>
>>> I am exploring Spark Streaming for enterprise software and am cautiously 
>>> optimistic about it. I see huge potential to improve debuggability of Spark.
>>>
>>> - Original Message -
>>> From: "Tim Smith" 
>>> To: "spark users" 
>>> Sent: Friday, September 12, 2014 10:09:53 AM
>>> Subject: Stable spark streaming app
>>>
>>> Hi,
>>>
>>> Anyone have a stable streaming app running in "production"? Can you
>>> share some overview of the app and setup like number of nodes, events
>>> per second, broad stream processing workflow, config highlights etc?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-17 Thread Soumitra Kumar
umber of repartitions. I was doing 128 after doing a
"union" on all incoming dStreams. I now repartition each of the five
streams separately (in a loop) to 24.
- For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER"
- Process data per partition instead of per RDD: dataout.foreachRDD(
rdd => rdd.foreachPartition(rec => { myFunc(rec) }) )
- Specific to kafka: when I create "new Producer", make sure I "close"
it else I had a ton of "too many files open" errors :)
- Use immutable objects as far as possible. If I use mutable objects
within a method/class then I turn them into immutable before passing
onto another class/method.
- For logging, create a LogService object that I then use for other
object/class declarations. Once instantiated, I can make "logInfo"
calls from within other Objects/Methods/Classes and output goes to the
"stderr" file in the Yarn container logs. Good for debugging stream
processing logic.

Currently, my processing delay is lower than my dStream time window so
all is good. I get a ton of these errors in my driver logs:
14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener
threw an exception

These seem related to: https://issues.apache.org/jira/browse/SPARK-2316

Best I understand and have been told, this does not affect data
integrity but may cause un-necessary recomputes.

Hope this helps,

Tim


On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar
 wrote:
> Hmm, no response to this thread!
>
> Adding to it, please share experiences of building an enterprise grade 
> product based on Spark Streaming.
>
> I am exploring Spark Streaming for enterprise software and am cautiously 
> optimistic about it. I see huge potential to improve debuggability of Spark.
>
> - Original Message -
> From: "Tim Smith" 
> To: "spark users" 
> Sent: Friday, September 12, 2014 10:09:53 AM
> Subject: Stable spark streaming app
>
> Hi,
>
> Anyone have a stable streaming app running in "production"? Can you
> share some overview of the app and setup like number of nodes, events
> per second, broad stream processing workflow, config highlights etc?
>
> Thanks,
>
> Tim
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-17 Thread Soumitra Kumar
Hmm, no response to this thread!

Adding to it, please share experiences of building an enterprise grade product 
based on Spark Streaming.

I am exploring Spark Streaming for enterprise software and am cautiously 
optimistic about it. I see huge potential to improve debuggability of Spark.

- Original Message -
From: "Tim Smith" 
To: "spark users" 
Sent: Friday, September 12, 2014 10:09:53 AM
Subject: Stable spark streaming app

Hi,

Anyone have a stable streaming app running in "production"? Can you
share some overview of the app and setup like number of nodes, events
per second, broad stream processing workflow, config highlights etc?

Thanks,

Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to initialize StateDStream

2014-09-13 Thread Soumitra Kumar
Thanks for the pointers. I meant previous run of spark-submit.

For 1: This would be a bit more computation in every batch.

2: Its a good idea, but it may be inefficient to retrieve each value.

In general, for a generic state machine the initialization and input
sequence is critical for correctness.




On Sat, Sep 13, 2014 at 12:17 PM, qihong  wrote:

> I'm not sure what you mean by "previous run". Is it previous batch? or
> previous run of spark-submit?
>
> If it's "previous batch" (spark streaming creates a batch every batch
> interval), then there's nothing to do.
>
> If it's previous run of spark-submit (assuming you are able to save the
> result somewhere), then I can think of two possible ways to do it:
>
> 1. read saved result as RDD (just do this once), and join the RDD with each
> RDD of the stateStream.
>
> 2. add extra logic to updateFunction: when the previous state is None (one
> of two Option type values), you get save state for the given key from saved
> result somehow, then your original logic to create new state object based
> on
> Seq[V] and previous state. note that you need use this version of
> updateFunction: "updateFunc: (Iterator[(K, Seq[V], Option[S])]) =>
> Iterator[(K, S)]", which make key available to the update function.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14176.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to initialize StateDStream

2014-09-13 Thread Soumitra Kumar
I had looked at that.
If I have a set of saved word counts from previous run, and want to load
that in the next run, what is the best way to do it?

I am thinking of hacking the Spark code and have an initial rdd in
StateDStream,
and use that in for the first time.

On Fri, Sep 12, 2014 at 11:04 PM, qihong  wrote:

> there's no need to initialize StateDStream. Take a look at example
> StatefulNetworkWordCount.scala, it's part of spark source code.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14146.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


How to initialize StateDStream

2014-09-12 Thread Soumitra Kumar
Hello,

How do I initialize StateDStream used in updateStateByKey?

-Soumitra.


Re: Spark Streaming and database access (e.g. MySQL)

2014-09-07 Thread Soumitra Kumar
I have the following code:

stream foreachRDD { rdd =>
if (rdd.take (1).size == 1) {
rdd foreachPartition { iterator =>
initDbConnection ()
iterator foreach {
write to db
}
closeDbConnection ()
}
}
}

On Sun, Sep 7, 2014 at 1:26 PM, Sean Owen  wrote:

> ... I'd call out that last bit as actually tricky: "close off the driver"
>
> See this message for the right-est way to do that, along with the
> right way to open DB connections remotely instead of trying to
> serialize them:
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E
>
> On Sun, Sep 7, 2014 at 4:19 PM, Mayur Rustagi 
> wrote:
> > Standard pattern is to initialize the mysql jdbc driver in your
> mappartition
> > call , update database & then close off the driver.
> > Couple of gotchas
> > 1. New driver initiated for all your partitions
> > 2. If the effect(inserts & updates) is not idempotent, so if your server
> > crashes, Spark will replay updates to mysql & may cause data corruption.
> >
> >
> > Regards
> > Mayur
> >
> > Mayur Rustagi
> > Ph: +1 (760) 203 3257
> > http://www.sigmoidanalytics.com
> > @mayur_rustagi
> >
> >
> > On Sun, Sep 7, 2014 at 11:54 AM, jchen  wrote:
> >>
> >> Hi,
> >>
> >> Has someone tried using Spark Streaming with MySQL (or any other
> >> database/data store)? I can write to MySQL at the beginning of the
> driver
> >> application. However, when I am trying to write the result of every
> >> streaming processing window to MySQL, it fails with the following error:
> >>
> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> >> not
> >> serializable: java.io.NotSerializableException:
> >> com.mysql.jdbc.JDBC4PreparedStatement
> >>
> >> I think it is because the statement object should be serializable, in
> >> order
> >> to be executed on the worker node. Has someone tried the similar cases?
> >> Example code will be very helpful. My intension is to execute
> >> INSERT/UPDATE/DELETE/SELECT statements for each sliding window.
> >>
> >> Thanks,
> >> JC
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming: DStream - zipWithIndex

2014-08-28 Thread Soumitra Kumar
Yes, that is an option.

I started with a function of batch time, and index to generate id as long. This 
may be faster than generating UUID, with added benefit of sorting based on time.

- Original Message -
From: "Tathagata Das" 
To: "Soumitra Kumar" 
Cc: "Xiangrui Meng" , user@spark.apache.org
Sent: Thursday, August 28, 2014 2:19:38 AM
Subject: Re: Spark Streaming: DStream - zipWithIndex


If just want arbitrary unique id attached to each record in a dstream (no 
ordering etc), then why not create generate and attach an UUID to each record? 





On Wed, Aug 27, 2014 at 4:18 PM, Soumitra Kumar < kumar.soumi...@gmail.com > 
wrote: 



I see a issue here. 


If rdd.id is 1000 then rdd.id * 1e9.toLong would be BIG. 


I wish there was DStream mapPartitionsWithIndex. 





On Wed, Aug 27, 2014 at 3:04 PM, Xiangrui Meng < men...@gmail.com > wrote: 


You can use RDD id as the seed, which is unique in the same spark 
context. Suppose none of the RDDs would contain more than 1 billion 
records. Then you can use 

rdd.zipWithUniqueId().mapValues(uid => rdd.id * 1e9.toLong + uid) 

Just a hack .. 

On Wed, Aug 27, 2014 at 2:59 PM, Soumitra Kumar 


< kumar.soumi...@gmail.com > wrote: 
> So, I guess zipWithUniqueId will be similar. 
> 
> Is there a way to get unique index? 
> 
> 
> On Wed, Aug 27, 2014 at 2:39 PM, Xiangrui Meng < men...@gmail.com > wrote: 
>> 
>> No. The indices start at 0 for every RDD. -Xiangrui 
>> 
>> On Wed, Aug 27, 2014 at 2:37 PM, Soumitra Kumar 
>> < kumar.soumi...@gmail.com > wrote: 
>> > Hello, 
>> > 
>> > If I do: 
>> > 
>> > DStream transform { 
>> > rdd.zipWithIndex.map { 
>> > 
>> > Is the index guaranteed to be unique across all RDDs here? 
>> > 
>> > } 
>> > } 
>> > 
>> > Thanks, 
>> > -Soumitra. 
> 
> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming: DStream - zipWithIndex

2014-08-27 Thread Soumitra Kumar
I see a issue here.

If rdd.id is 1000 then rdd.id * 1e9.toLong would be BIG.

I wish there was DStream mapPartitionsWithIndex.


On Wed, Aug 27, 2014 at 3:04 PM, Xiangrui Meng  wrote:

> You can use RDD id as the seed, which is unique in the same spark
> context. Suppose none of the RDDs would contain more than 1 billion
> records. Then you can use
>
> rdd.zipWithUniqueId().mapValues(uid => rdd.id * 1e9.toLong + uid)
>
> Just a hack ..
>
> On Wed, Aug 27, 2014 at 2:59 PM, Soumitra Kumar
>  wrote:
> > So, I guess zipWithUniqueId will be similar.
> >
> > Is there a way to get unique index?
> >
> >
> > On Wed, Aug 27, 2014 at 2:39 PM, Xiangrui Meng  wrote:
> >>
> >> No. The indices start at 0 for every RDD. -Xiangrui
> >>
> >> On Wed, Aug 27, 2014 at 2:37 PM, Soumitra Kumar
> >>  wrote:
> >> > Hello,
> >> >
> >> > If I do:
> >> >
> >> > DStream transform {
> >> > rdd.zipWithIndex.map {
> >> >
> >> > Is the index guaranteed to be unique across all RDDs here?
> >> >
> >> > }
> >> > }
> >> >
> >> > Thanks,
> >> > -Soumitra.
> >
> >
>


Re: Spark Streaming: DStream - zipWithIndex

2014-08-27 Thread Soumitra Kumar
Thanks.

Just to double check, rdd.id would be unique for a batch in a DStream?


On Wed, Aug 27, 2014 at 3:04 PM, Xiangrui Meng  wrote:

> You can use RDD id as the seed, which is unique in the same spark
> context. Suppose none of the RDDs would contain more than 1 billion
> records. Then you can use
>
> rdd.zipWithUniqueId().mapValues(uid => rdd.id * 1e9.toLong + uid)
>
> Just a hack ..
>
> On Wed, Aug 27, 2014 at 2:59 PM, Soumitra Kumar
>  wrote:
> > So, I guess zipWithUniqueId will be similar.
> >
> > Is there a way to get unique index?
> >
> >
> > On Wed, Aug 27, 2014 at 2:39 PM, Xiangrui Meng  wrote:
> >>
> >> No. The indices start at 0 for every RDD. -Xiangrui
> >>
> >> On Wed, Aug 27, 2014 at 2:37 PM, Soumitra Kumar
> >>  wrote:
> >> > Hello,
> >> >
> >> > If I do:
> >> >
> >> > DStream transform {
> >> > rdd.zipWithIndex.map {
> >> >
> >> > Is the index guaranteed to be unique across all RDDs here?
> >> >
> >> > }
> >> > }
> >> >
> >> > Thanks,
> >> > -Soumitra.
> >
> >
>


Re: Spark Streaming: DStream - zipWithIndex

2014-08-27 Thread Soumitra Kumar
So, I guess zipWithUniqueId will be similar.

Is there a way to get unique index?


On Wed, Aug 27, 2014 at 2:39 PM, Xiangrui Meng  wrote:

> No. The indices start at 0 for every RDD. -Xiangrui
>
> On Wed, Aug 27, 2014 at 2:37 PM, Soumitra Kumar
>  wrote:
> > Hello,
> >
> > If I do:
> >
> > DStream transform {
> > rdd.zipWithIndex.map {
> >
> > Is the index guaranteed to be unique across all RDDs here?
> >
> > }
> > }
> >
> > Thanks,
> > -Soumitra.
>


Spark Streaming: DStream - zipWithIndex

2014-08-27 Thread Soumitra Kumar
Hello,

If I do:

DStream transform {
rdd.zipWithIndex.map {

Is the index guaranteed to be unique across all RDDs here?

}
}

Thanks,
-Soumitra.


Re: Shared variable in Spark Streaming

2014-08-08 Thread Soumitra Kumar
I want to keep track of the events processed in a batch.

How come 'globalCount' work for DStream? I think similar construct won't
work for RDD, that's why there is accumulator.


On Fri, Aug 8, 2014 at 12:52 AM, Tathagata Das 
wrote:

> Do you mean that you want a continuously updated count as more
> events/records are received in the DStream (remember, DStream is a
> continuous stream of data)? Assuming that is what you want, you can use a
> global counter
>
> var globalCount = 0L
>
> dstream.count().foreachRDD(rdd => { globalCount += rdd.first() } )
>
> This globalCount variable will reside in the driver and will keep being
> updated after every batch.
>
> TD
>
>
> On Thu, Aug 7, 2014 at 10:16 PM, Soumitra Kumar 
> wrote:
>
>> Hello,
>>
>> I want to count the number of elements in the DStream, like RDD.count() .
>> Since there is no such method in DStream, I thought of using DStream.count
>> and use the accumulator.
>>
>> How do I do DStream.count() to count the number of elements in a DStream?
>>
>> How do I create a shared variable in Spark Streaming?
>>
>> -Soumitra.
>>
>
>


Shared variable in Spark Streaming

2014-08-07 Thread Soumitra Kumar
Hello,

I want to count the number of elements in the DStream, like RDD.count() .
Since there is no such method in DStream, I thought of using DStream.count
and use the accumulator.

How do I do DStream.count() to count the number of elements in a DStream?

How do I create a shared variable in Spark Streaming?

-Soumitra.


Re: HBase row count

2014-02-25 Thread Soumitra Kumar
Found the issue, actually splits in HBase was not uniform, so one job was
taking 90% of time.

BTW, is there a way to save the details available port 4040 after job is
finished?


On Tue, Feb 25, 2014 at 7:26 AM, Nick Pentreath wrote:

> It's tricky really since you may not know upfront how much data is in
> there. You could possibly take a look at how much data is in the HBase
> tables to get an idea.
>
> It may take a bit of trial and error, like running out of memory trying to
> cache the dataset, and checking the Spark UI on port 4040 to see how much
> is cached and how much memory still remains available, etc etc. You should
> also take a look at http://spark.apache.org/docs/latest/tuning.html for
> ideas around memory and serialization tuning.
>
> Broadly speaking, what you want to try to do is filter as much data as
> possible first and cache the subset of data on which you'll be performing
> multiple passes or computations. For example, based on your code above, you
> may in fact only wish to cache the data that is the "interesting" fields
> RDD. It all depends on what you're trying to achieve.
>
> If you will only be doing one pass through the data anyway (like running a
> count every time on the full dataset) then caching is not going to help you.
>
>
> On Tue, Feb 25, 2014 at 4:59 PM, Soumitra Kumar 
> wrote:
>
>> Thanks Nick.
>>
>> How do I figure out if the RDD fits in memory?
>>
>>
>> On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath > > wrote:
>>
>>> cache only caches the data on the first action (count) - the first time
>>> it still needs to read the data from the source. So the first time you call
>>> count it will take the same amount of time whether cache is enabled or not.
>>> The second time you call count on a cached RDD, you should see that it
>>> takes a lot less time (assuming that the data fit in memory).
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <
>>> kumar.soumi...@gmail.com> wrote:
>>>
>>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>>
>>>> My expectation is that with cache enabled, there should not be any
>>>> penalty of 'hBaseRDD.count' call.
>>>>
>>>>
>>>>
>>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>>> nick.pentre...@gmail.com> wrote:
>>>>
>>>>> Yes, you''re initiating a scan for each count call. The normal way to
>>>>> improve this would be to use cache(), which is what you have in your
>>>>> commented out line:
>>>>> // hBaseRDD.cache()
>>>>>
>>>>> If you uncomment that line, you should see an improvement overall.
>>>>>
>>>>> If caching is not an option for some reason (maybe data is too large),
>>>>> then you can implement an overall count in your readFields method using
>>>>> accumulators:
>>>>>
>>>>> val count = sc.accumulator(0L)
>>>>> ...
>>>>> In your flatMap function do count += 1 for each row (regardless of
>>>>> whether "interesting" or not).
>>>>>
>>>>> In your main method after doing an action (e.g. count in your case),
>>>>> call val totalCount = count.value.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>>>> kumar.soumi...@gmail.com> wrote:
>>>>>
>>>>>> I have a code which reads an HBase table, and counts number of rows
>>>>>> containing a field.
>>>>>>
>>>>>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>>>> RDD[List[Array[Byte]]] = {
>>>>>> return rdd.flatMap(kv => {
>>>>>> // Set of interesting keys for this use case
>>>>>> val keys = List ("src")
>>>>>> var data = List[Array[Byte]]()
>>>>>> var usefulRow = false
>>>>>>
>>>>>> val cf = Bytes.toBytes ("cf")
>>>>>> keys.foreach {key =>
>>>>>> val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>>> if (col != null)
>>>>>> usefulRow = true
>>>>>> data = data :+ col
>>>>>> }
>>>>>>
>>>>>> if (usefulRow)
>>>>>> Some(data)
>>>>>> else
>>>>>> None
>>>>>> })
>>>>>> }
>>>>>>
>>>>>> def main(args: Array[String]) {
>>>>>> val hBaseRDD = init(args)
>>>>>> // hBaseRDD.cache()
>>>>>>
>>>>>> println(" Initial row count " + hBaseRDD.count())
>>>>>> println(" Rows with interesting fields " +
>>>>>> readFields(hBaseRDD).count())
>>>>>>   }
>>>>>>
>>>>>>
>>>>>> I am running on a one mode CDH installation.
>>>>>>
>>>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>>>> 'println(" Initial row count " + hBaseRDD.count())', it takes around
>>>>>> 1.5 minutes.
>>>>>>
>>>>>> Is it doing HBase scan twice, for both 'count' calls? How do I
>>>>>> improve it?
>>>>>>
>>>>>> Thanks,
>>>>>> -Soumitra.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: HBase row count

2014-02-25 Thread Soumitra Kumar
Thanks Nick.

How do I figure out if the RDD fits in memory?


On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath wrote:

> cache only caches the data on the first action (count) - the first time it
> still needs to read the data from the source. So the first time you call
> count it will take the same amount of time whether cache is enabled or not.
> The second time you call count on a cached RDD, you should see that it
> takes a lot less time (assuming that the data fit in memory).
>
>
> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar 
> wrote:
>
>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>
>> My expectation is that with cache enabled, there should not be any
>> penalty of 'hBaseRDD.count' call.
>>
>>
>>
>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> Yes, you''re initiating a scan for each count call. The normal way to
>>> improve this would be to use cache(), which is what you have in your
>>> commented out line:
>>> // hBaseRDD.cache()
>>>
>>> If you uncomment that line, you should see an improvement overall.
>>>
>>> If caching is not an option for some reason (maybe data is too large),
>>> then you can implement an overall count in your readFields method using
>>> accumulators:
>>>
>>> val count = sc.accumulator(0L)
>>> ...
>>> In your flatMap function do count += 1 for each row (regardless of
>>> whether "interesting" or not).
>>>
>>> In your main method after doing an action (e.g. count in your case),
>>> call val totalCount = count.value.
>>>
>>>
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>> kumar.soumi...@gmail.com> wrote:
>>>
>>>> I have a code which reads an HBase table, and counts number of rows
>>>> containing a field.
>>>>
>>>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>> RDD[List[Array[Byte]]] = {
>>>> return rdd.flatMap(kv => {
>>>> // Set of interesting keys for this use case
>>>> val keys = List ("src")
>>>> var data = List[Array[Byte]]()
>>>> var usefulRow = false
>>>>
>>>> val cf = Bytes.toBytes ("cf")
>>>> keys.foreach {key =>
>>>> val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>> if (col != null)
>>>> usefulRow = true
>>>> data = data :+ col
>>>> }
>>>>
>>>> if (usefulRow)
>>>> Some(data)
>>>> else
>>>> None
>>>> })
>>>> }
>>>>
>>>> def main(args: Array[String]) {
>>>> val hBaseRDD = init(args)
>>>> // hBaseRDD.cache()
>>>>
>>>> println(" Initial row count " + hBaseRDD.count())
>>>> println(" Rows with interesting fields " +
>>>> readFields(hBaseRDD).count())
>>>>   }
>>>>
>>>>
>>>> I am running on a one mode CDH installation.
>>>>
>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>> 'println(" Initial row count " + hBaseRDD.count())', it takes around
>>>> 1.5 minutes.
>>>>
>>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>>> it?
>>>>
>>>> Thanks,
>>>> -Soumitra.
>>>>
>>>
>>>
>>
>


Re: HBase row count

2014-02-24 Thread Soumitra Kumar
I did try with 'hBaseRDD.cache()', but don't see any improvement.

My expectation is that with cache enabled, there should not be any penalty
of 'hBaseRDD.count' call.



On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath
wrote:

> Yes, you''re initiating a scan for each count call. The normal way to
> improve this would be to use cache(), which is what you have in your
> commented out line:
> // hBaseRDD.cache()
>
> If you uncomment that line, you should see an improvement overall.
>
> If caching is not an option for some reason (maybe data is too large),
> then you can implement an overall count in your readFields method using
> accumulators:
>
> val count = sc.accumulator(0L)
> ...
> In your flatMap function do count += 1 for each row (regardless of
> whether "interesting" or not).
>
> In your main method after doing an action (e.g. count in your case), call val
> totalCount = count.value.
>
>
>
>
> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar 
> wrote:
>
>> I have a code which reads an HBase table, and counts number of rows
>> containing a field.
>>
>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>> RDD[List[Array[Byte]]] = {
>> return rdd.flatMap(kv => {
>> // Set of interesting keys for this use case
>> val keys = List ("src")
>> var data = List[Array[Byte]]()
>> var usefulRow = false
>>
>> val cf = Bytes.toBytes ("cf")
>> keys.foreach {key =>
>> val col = kv._2.getValue(cf, Bytes.toBytes(key))
>> if (col != null)
>> usefulRow = true
>> data = data :+ col
>> }
>>
>> if (usefulRow)
>> Some(data)
>> else
>> None
>> })
>> }
>>
>> def main(args: Array[String]) {
>> val hBaseRDD = init(args)
>> // hBaseRDD.cache()
>>
>> println(" Initial row count " + hBaseRDD.count())
>> println(" Rows with interesting fields " +
>> readFields(hBaseRDD).count())
>>   }
>>
>>
>> I am running on a one mode CDH installation.
>>
>> As it is it takes around 2.5 minutes. But if I comment out 'println("
>> Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.
>>
>> Is it doing HBase scan twice, for both 'count' calls? How do I improve it?
>>
>> Thanks,
>> -Soumitra.
>>
>
>


HBase row count

2014-02-24 Thread Soumitra Kumar
I have a code which reads an HBase table, and counts number of rows
containing a field.

def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
RDD[List[Array[Byte]]] = {
return rdd.flatMap(kv => {
// Set of interesting keys for this use case
val keys = List ("src")
var data = List[Array[Byte]]()
var usefulRow = false

val cf = Bytes.toBytes ("cf")
keys.foreach {key =>
val col = kv._2.getValue(cf, Bytes.toBytes(key))
if (col != null)
usefulRow = true
data = data :+ col
}

if (usefulRow)
Some(data)
else
None
})
}

def main(args: Array[String]) {
val hBaseRDD = init(args)
// hBaseRDD.cache()

println(" Initial row count " + hBaseRDD.count())
println(" Rows with interesting fields " +
readFields(hBaseRDD).count())
  }


I am running on a one mode CDH installation.

As it is it takes around 2.5 minutes. But if I comment out 'println("
Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.

Is it doing HBase scan twice, for both 'count' calls? How do I improve it?

Thanks,
-Soumitra.


[no subject]

2014-02-24 Thread Soumitra Kumar
I have a code which reads an HBase table, and counts number of rows
containing a field.

def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
RDD[List[Array[Byte]]] = {
return rdd.flatMap(kv => {
// Set of interesting keys for this use case
val keys = List ("src")
var data = List[Array[Byte]]()
var usefulRow = false

val cf = Bytes.toBytes ("cf")
keys.foreach {key =>
val col = kv._2.getValue(cf, Bytes.toBytes(key))
if (col != null)
usefulRow = true
data = data :+ col
}

if (usefulRow)
Some(data)
else
None
})
}

def main(args: Array[String]) {
val hBaseRDD = init(args)
// hBaseRDD.cache()

println(" Initial row count " + hBaseRDD.count())
println(" Rows with interesting fields " +
readFields(hBaseRDD).count())
  }


I am running on a one mode CDH installation.

As it is it takes around 2.5 minutes. But if I comment out 'println("
Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.

Is it doing HBase scan twice, for both 'count' calls? How do I improve it?

Thanks,
-Soumitra.