PySpark read from HBase

2016-08-12 Thread Bin Wang
Hi there,

I have lots of raw data in several Hive tables where we built a workflow to
"join" those records together and restructured into HBase. It was done
using plain MapReduce to generate HFile, and then load incremental from
HFile into HBase to guarantee the best performance.

However, we need to do some time series analysis for each of the record in
HBase, but the implementation was done in Python (pandas, scikit learn)
which is pretty time-consuming to reproduce in Java, Scala.

I am thinking PySpark is probably the best approach if it works.
Can pyspark read from HFile directory? or can it read from HBase in
parallel?
I don't see that many examples out there so any help or guidance will be
appreciated.

Also, we are using Cloudera Hadoop so there might be a slight delay with
the latest Spark release.

Best regards,

Bin


How to close connection in mapPartitions?

2015-10-23 Thread Bin Wang
I use mapPartitions to open connections to Redis, I write it like this:

val seqs = lines.mapPartitions { lines =>
  val cache = new RedisCache(redisUrl, redisPort)
  val result = lines.map(line => Parser.parseBody(line, cache))
  cache.redisPool.close
  result
}

But it seems the pool is closed before I use it. Am I doing anything wrong?
Here is the error:

java.lang.IllegalStateException: Pool not open
at 
org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
at 
org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
at com.redis.RedisClientPool.withClient(Pool.scala:34)
at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
at 
com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
at 
com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
at 
com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
at 
com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Re: How to close connection in mapPartitions?

2015-10-23 Thread Bin Wang
BTW, "lines" is a DStream.

Bin Wang <wbi...@gmail.com>于2015年10月23日周五 下午2:16写道:

> I use mapPartitions to open connections to Redis, I write it like this:
>
> val seqs = lines.mapPartitions { lines =>
>   val cache = new RedisCache(redisUrl, redisPort)
>   val result = lines.map(line => Parser.parseBody(line, cache))
>   cache.redisPool.close
>   result
> }
>
> But it seems the pool is closed before I use it. Am I doing anything
> wrong? Here is the error:
>
> java.lang.IllegalStateException: Pool not open
>   at 
> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>   at 
> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>   at com.redis.RedisClientPool.withClient(Pool.scala:34)
>   at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>   at 
> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>   at 
> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>   at 
> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>   at 
> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
>


Dose spark auto invoke StreamingContext.stop while receive kill signal?

2015-09-23 Thread Bin Wang
I'd like the spark application to be stopped gracefully while received kill
signal, so I add these code:

sys.ShutdownHookThread {
  println("Gracefully stopping Spark Streaming Application")
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  println("Application stopped")
}

But the application is not stopped gracefully:

15/09/23 17:44:38 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
...
15/09/23 17:44:38 INFO streaming.StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook

Dose spark auto invoke StreamingContext.stop for me?


Is it possible to merged delayed batches in streaming?

2015-09-23 Thread Bin Wang
I'm using Spark Streaming and there maybe some delays between batches. I'd
like to know is it possible to merge delayed batches into one batch to do
processing?

For example, the interval is set to 5 min but the first batch uses 1 hour,
so there are many batches delayed. In the end of processing for each batch,
I'll save the data into database. So if all the delayed batches are merged
into a big one, it will save many resources. I'd like to know if it is
possible. Thanks.


Re: How to recovery DStream from checkpoint directory?

2015-09-17 Thread Bin Wang
In my understand, here I have only three options to keep the DStream state
between redeploys (yes, I'm using updateStateByKey):

1. Use checkpoint.
2. Use my own database.
3. Use both.

But none of  these options are great:

1. Use checkpoint: I cannot load it after code change. Or I need to keep
the structure of the classes, which seems to be impossible in a developing
project.
2. Use my own database: there may be failure between the program read data
from Kafka and save the DStream to database. So there may have data lose.
3. Use both: Will the data load two times? How can I know in which
situation I should use the which one?

The power of checkpoint seems to be very limited. Is there any plan to
support checkpoint while class is changed, like the discussion you gave me
pointed out?



Akhil Das <ak...@sigmoidanalytics.com>于2015年9月17日周四 下午3:26写道:

> Any kind of changes to the jvm classes will make it fail. By checkpointing
> the data you mean using checkpoint with updateStateByKey? Here's a similar
> discussion happened earlier which will clear your doubts i guess
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E
>
> Thanks
> Best Regards
>
> On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang <wbi...@gmail.com> wrote:
>
>> And here is another question. If I load the DStream from database every
>> time I start the job, will the data be loaded when the job is failed and
>> auto restart? If so, both the checkpoint data and database data are loaded,
>> won't this a problem?
>>
>>
>>
>> Bin Wang <wbi...@gmail.com>于2015年9月16日周三 下午8:40写道:
>>
>>> Will StreamingContex.getOrCreate do this work?What kind of code change
>>> will make it cannot load?
>>>
>>> Akhil Das <ak...@sigmoidanalytics.com>于2015年9月16日周三 20:20写道:
>>>
>>>> You can't really recover from checkpoint if you alter the code. A
>>>> better approach would be to use some sort of external storage (like a db or
>>>> zookeeper etc) to keep the state (the indexes etc) and then when you deploy
>>>> new code they can be easily recovered.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang <wbi...@gmail.com> wrote:
>>>>
>>>>> I'd like to know if there is a way to recovery dstream from checkpoint.
>>>>>
>>>>> Because I stores state in DStream, I'd like the state to be recovered
>>>>> when I restart the application and deploy new code.
>>>>>
>>>>
>>>>
>


Re: How to recovery DStream from checkpoint directory?

2015-09-17 Thread Bin Wang
Thanks Adrian, the hint of use updateStateByKey with initialRdd helps a lot!

Adrian Tanase <atan...@adobe.com>于2015年9月17日周四 下午4:50写道:

> This section in the streaming guide makes your options pretty clear
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code
>
>
>1. Use 2 versions in parallel, drain the queue up to a point and strat
>fresh in the new version, only processing events from that point forward
>   1. Note that “up to a point” is specific to you state management
>   logic, it might mean “user sessions stated after 4 am” NOT “events 
> received
>   after 4 am”
>2. Graceful shutdown and saving data to DB, followed by checkpoint
>cleanup / new checkpoint dir
>   1. On restat, you need to use the updateStateByKey that takes an
>   initialRdd with the values preloaded from DB
>   2. By cleaning the checkpoint in between upgrades, data is loaded
>       only once
>
> Hope this helps,
> -adrian
>
> From: Bin Wang
> Date: Thursday, September 17, 2015 at 11:27 AM
> To: Akhil Das
> Cc: user
> Subject: Re: How to recovery DStream from checkpoint directory?
>
> In my understand, here I have only three options to keep the DStream state
> between redeploys (yes, I'm using updateStateByKey):
>
> 1. Use checkpoint.
> 2. Use my own database.
> 3. Use both.
>
> But none of  these options are great:
>
> 1. Use checkpoint: I cannot load it after code change. Or I need to keep
> the structure of the classes, which seems to be impossible in a developing
> project.
> 2. Use my own database: there may be failure between the program read data
> from Kafka and save the DStream to database. So there may have data lose.
> 3. Use both: Will the data load two times? How can I know in which
> situation I should use the which one?
>
> The power of checkpoint seems to be very limited. Is there any plan to
> support checkpoint while class is changed, like the discussion you gave me
> pointed out?
>
>
>
> Akhil Das <ak...@sigmoidanalytics.com>于2015年9月17日周四 下午3:26写道:
>
>> Any kind of changes to the jvm classes will make it fail. By
>> checkpointing the data you mean using checkpoint with updateStateByKey?
>> Here's a similar discussion happened earlier which will clear your doubts i
>> guess
>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang <wbi...@gmail.com> wrote:
>>
>>> And here is another question. If I load the DStream from database every
>>> time I start the job, will the data be loaded when the job is failed and
>>> auto restart? If so, both the checkpoint data and database data are loaded,
>>> won't this a problem?
>>>
>>>
>>>
>>> Bin Wang <wbi...@gmail.com>于2015年9月16日周三 下午8:40写道:
>>>
>>>> Will StreamingContex.getOrCreate do this work?What kind of code change
>>>> will make it cannot load?
>>>>
>>>> Akhil Das <ak...@sigmoidanalytics.com>于2015年9月16日周三 20:20写道:
>>>>
>>>>> You can't really recover from checkpoint if you alter the code. A
>>>>> better approach would be to use some sort of external storage (like a db 
>>>>> or
>>>>> zookeeper etc) to keep the state (the indexes etc) and then when you 
>>>>> deploy
>>>>> new code they can be easily recovered.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang <wbi...@gmail.com> wrote:
>>>>>
>>>>>> I'd like to know if there is a way to recovery dstream from
>>>>>> checkpoint.
>>>>>>
>>>>>> Because I stores state in DStream, I'd like the state to be recovered
>>>>>> when I restart the application and deploy new code.
>>>>>>
>>>>>
>>>>>
>>


Re: How to recovery DStream from checkpoint directory?

2015-09-16 Thread Bin Wang
Will StreamingContex.getOrCreate do this work?What kind of code change will
make it cannot load?

Akhil Das <ak...@sigmoidanalytics.com>于2015年9月16日周三 20:20写道:

> You can't really recover from checkpoint if you alter the code. A better
> approach would be to use some sort of external storage (like a db or
> zookeeper etc) to keep the state (the indexes etc) and then when you deploy
> new code they can be easily recovered.
>
> Thanks
> Best Regards
>
> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang <wbi...@gmail.com> wrote:
>
>> I'd like to know if there is a way to recovery dstream from checkpoint.
>>
>> Because I stores state in DStream, I'd like the state to be recovered
>> when I restart the application and deploy new code.
>>
>
>


How to recovery DStream from checkpoint directory?

2015-09-16 Thread Bin Wang
I'd like to know if there is a way to recovery dstream from checkpoint.

Because I stores state in DStream, I'd like the state to be recovered when
I restart the application and deploy new code.


Re: How to recovery DStream from checkpoint directory?

2015-09-16 Thread Bin Wang
And here is another question. If I load the DStream from database every
time I start the job, will the data be loaded when the job is failed and
auto restart? If so, both the checkpoint data and database data are loaded,
won't this a problem?



Bin Wang <wbi...@gmail.com>于2015年9月16日周三 下午8:40写道:

> Will StreamingContex.getOrCreate do this work?What kind of code change
> will make it cannot load?
>
> Akhil Das <ak...@sigmoidanalytics.com>于2015年9月16日周三 20:20写道:
>
>> You can't really recover from checkpoint if you alter the code. A better
>> approach would be to use some sort of external storage (like a db or
>> zookeeper etc) to keep the state (the indexes etc) and then when you deploy
>> new code they can be easily recovered.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang <wbi...@gmail.com> wrote:
>>
>>> I'd like to know if there is a way to recovery dstream from checkpoint.
>>>
>>> Because I stores state in DStream, I'd like the state to be recovered
>>> when I restart the application and deploy new code.
>>>
>>
>>


How to clear Kafka offset in Spark streaming?

2015-09-14 Thread Bin Wang
Hi,

I'm using spark streaming with kafka and I need to clear the offset and
re-compute all things. I deleted checkpoint directory in HDFS and reset
kafka offset with "kafka-run-class kafka.tools.ImportZkOffsets". I can
confirm the offset is set to 0 in kafka:

~ > kafka-run-class kafka.tools.ConsumerOffsetChecker --group
adhoc_data_spark --topic adhoc_data --zookeeper szq1.appadhoc.com:2181
Group   Topic  Pid Offset  logSize
Lag Owner
adhoc_data_spark adhoc_data 0   0   5280743
5280743 none

But when I restart spark streaming, the offset is reset to logSize, I
cannot figure out why is that, can anybody help? Thanks.


Re: Data lost in spark streaming

2015-09-13 Thread Bin Wang
There is some error logs in the executor and I don't know if it is related:

15/09/11 10:54:05 WARN ipc.Client: Exception encountered while connecting
to the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv
alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01
15/09/11 10:54:05 WARN yarn.ApplicationMaster: Reporter thread fails 4
time(s) in a row.
org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
AMRMToken from appattempt_1440495451668_0258_01
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at
org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy22.allocate(Unknown Source)
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:278)
at
org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:174)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:323)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
Invalid AMRMToken from appattempt_1440495451668_0258_01
at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy21.allocate(Unknown Source)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
... 9 more

...

15/09/11 10:54:10 WARN ipc.Client: Exception encountered while connecting
to the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv
alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01
15/09/11 10:54:10 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 12, (reason: Exception was thrown 5 time(s) from Reporter thread.)
15/09/11 10:54:10 INFO streaming.StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook
15/09/11 10:54:10 INFO scheduler.ReceiverTracker: Sent stop signal to all 1
receivers
15/09/11 10:54:12 ERROR scheduler.ReceiverTracker: Deregistered receiver
for stream 0: Stopped by driver

Tathagata Das <t...@databricks.com>于2015年9月13日周日 下午4:05写道:

> Maybe the driver got restarted. See the log4j logs of the driver before it
> restarted.
>
> On Thu, Sep 10, 2015 at 11:32 PM, Bin Wang <wbi...@gmail.com> wrote:
>
>> I'm using spark streaming 1.4.0 and have a DStream that have all the data
>> it received. But today the history data in the DStream seems to be lost
>> suddenly. And the application UI also lost the streaming process time and
>> all the related data. Could any give some hint to debug this? Thanks.
>>
>>
>>
>


Data lost in spark streaming

2015-09-11 Thread Bin Wang
I'm using spark streaming 1.4.0 and have a DStream that have all the data
it received. But today the history data in the DStream seems to be lost
suddenly. And the application UI also lost the streaming process time and
all the related data. Could any give some hint to debug this? Thanks.


Re: Will multiple filters on the same RDD optimized to one filter?

2015-07-16 Thread Bin Wang
What if I would use both rdd1 and rdd2 later?

Raghavendra Pandey raghavendra.pan...@gmail.com于2015年7月16日周四 下午4:08写道:

 If you cache rdd it will save some operations. But anyway filter is a lazy
 operation. And it runs based on what you will do later on with rdd1 and
 rdd2...

 Raghavendra
 On Jul 16, 2015 1:33 PM, Bin Wang wbi...@gmail.com wrote:

 If I write code like this:

 val rdd = input.map(_.value)
 val f1 = rdd.filter(_ == 1)
 val f2 = rdd.filter(_ == 2)
 ...

 Then the DAG of the execution may be this:

  - Filter - ...
 Map
  - Filter - ...

 But the two filters is operated on the same RDD, which means it could be
 done by just scan the RDD once. Does spark have this kind optimization for
 now?




Will multiple filters on the same RDD optimized to one filter?

2015-07-16 Thread Bin Wang
If I write code like this:

val rdd = input.map(_.value)
val f1 = rdd.filter(_ == 1)
val f2 = rdd.filter(_ == 2)
...

Then the DAG of the execution may be this:

 - Filter - ...
Map
 - Filter - ...

But the two filters is operated on the same RDD, which means it could be
done by just scan the RDD once. Does spark have this kind optimization for
now?


Spark Streaming Hangs on Start

2015-07-09 Thread Bin Wang
I'm using spark streaming with Kafka, and submit it to YARN cluster with
mode yarn-cluster. But it hangs at SparkContext.start(). The Kafka config
is right since it can show some events in Streaming tab of web UI.

The attached file is the screen shot of the Jobs tab of web UI. The code
in the main class is:

object StatCounter {

  val config = ConfigFactory.load()
  val redisUrl = config.getString(redis.url)
  val redisPort = config.getInt(redis.port)
  val zkQuorum = config.getString(kafka.zkQuorum)
  val group = config.getString(kafka.group)
  val topic = config.getString(kafka.topic)
  val threadNum = config.getInt(kafka.threadNum)

  val cache = new RedisCache(redisUrl, redisPort)

  def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(config.getString(spark.name))
.set(spark.cassandra.connection.host,
config.getString(cassandra.host))

val ssc = new StreamingContext(conf,
Seconds(config.getInt(spark.interval)))
ssc.checkpoint(config.getString(spark.checkpoint))
val storage = new CassandraStorage(adhoc_data, ssc)

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -
threadNum)).map(_._2)

val logs = lines.flatMap(line = Parser.parseBody(line, cache))
Counter.count(logs, storage)

sys.ShutdownHookThread {
  println(Gracefully stopping Spark Streaming Application)
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  println(Application stopped)
}

ssc.start()
ssc.awaitTermination()
  }
}

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark Streaming Hangs on Start

2015-07-09 Thread Bin Wang
Thanks for the help. I set  --executor-cores and it works now. I've used
--total-executor-cores and don't realize it changed.

Tathagata Das t...@databricks.com于2015年7月10日周五 上午3:11写道:

 1. There will be a long running job with description start() as that is
 the jobs that is running the receivers. It will never end.

 2. You need to set the number of cores given to the Spark executors by the
 YARN container. That is SparkConf spark.executor.cores,  --executor-cores
 in spark-submit. Since it is by default 1, your only container has one core
 which is occupied by the receiver, leaving no cores to run the map tasks.
 So the map stage is blocked

 3.  Note these log lines. Especially 15/07/09 18:29:00 INFO
 receiver.ReceiverSupervisorImpl: Received stop signal . I think somehow
 your streaming context is being shutdown too early which is causing the
 KafkaReceiver to stop. Something your should debug.


 15/07/09 18:27:13 INFO consumer.ConsumerFetcherThread: 
 [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
  Starting
 15/07/09 18:27:13 INFO consumer.ConsumerFetcherManager: 
 [ConsumerFetcherManager-1436437633199] Added fetcher for partitions 
 ArrayBuffer([[adhoc_data,0], initOffset 53 to broker 
 id:42,host:szq1.appadhoc.com,port:9092] )
 15/07/09 18:27:13 INFO storage.MemoryStore: ensureFreeSpace(1680) called with 
 curMem=96628, maxMem=16669841817
 15/07/09 18:27:13 INFO storage.MemoryStore: Block input-0-1436437633600 
 stored as bytes in memory (estimated size 1680.0 B, free 15.5 GB)
 15/07/09 18:27:13 WARN storage.BlockManager: Block input-0-1436437633600 
 replicated to only 0 peer(s) instead of 1 peers
 15/07/09 18:27:14 INFO receiver.BlockGenerator: Pushed block 
 input-0-1436437633600*15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: 
 Received stop signal
 *15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Stopping receiver 
 with message: Stopped by driver:
 15/07/09 18:29:00 INFO consumer.ZookeeperConsumerConnector: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], 
 ZKConsumerConnector shutting down
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: 
 [ConsumerFetcherManager-1436437633199] Stopping leader finder thread
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
  Shutting down
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
  Stopped
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread],
  Shutdown completed
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: 
 [ConsumerFetcherManager-1436437633199] Stopping all fetchers
 15/07/09 18:29:00 INFO consumer.ConsumerFetcherThread: 
 [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
  Shutting down
 15/07/09 18:29:01 INFO consumer.SimpleConsumer: Reconnect due to socket 
 error: java.nio.channels.ClosedByInterruptException
 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: 
 [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
  Stopped
 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: 
 [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42],
  Shutdown completed
 15/07/09 18:29:01 INFO consumer.ConsumerFetcherManager: 
 [ConsumerFetcherManager-1436437633199] All connections stopped
 15/07/09 18:29:01 INFO zkclient.ZkEventThread: Terminate ZkClient event 
 thread.
 15/07/09 18:29:01 INFO zookeeper.ZooKeeper: Session: 0x14e70eedca00315 closed
 15/07/09 18:29:01 INFO zookeeper.ClientCnxn: EventThread shut down
 15/07/09 18:29:01 INFO consumer.ZookeeperConsumerConnector: 
 [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], 
 ZKConsumerConnector shutdown completed in 74 ms
 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop
 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Deregistering 
 receiver 0






Re: How to submit streaming application and exit

2015-07-08 Thread Bin Wang
Thanks. Actually I've find the way. I'm using spark-submit to submit the
job the a YARN cluster with --mater yarn-cluster (which spark-submit
process is not the driver). So I can config
spark.yarn.submit.waitAppComplettion to false so that the process will
exit after the job is submitted.

ayan guha guha.a...@gmail.com于2015年7月8日周三 下午12:26写道:

 spark-submit is nothing but a process in your OS, so you should be able to
 submit it in background and exit. However, your spark-submit process itself
 is the driver for your spark streaming application, so it will not exit for
 the lifetime of the streaming app.

 On Wed, Jul 8, 2015 at 1:13 PM, Bin Wang wbi...@gmail.com wrote:

 I'm writing a streaming application and want to use spark-submit to
 submit it to a YARN cluster. I'd like to submit it in a client node and
 exit spark-submit after the application is running. Is it possible?




 --
 Best Regards,
 Ayan Guha



How to submit streaming application and exit

2015-07-07 Thread Bin Wang
I'm writing a streaming application and want to use spark-submit to submit
it to a YARN cluster. I'd like to submit it in a client node and exit
spark-submit after the application is running. Is it possible?


Problem Run Spark Example HBase Code Using Spark-Submit

2015-06-25 Thread Bin Wang
I am trying to run the Spark example code HBaseTest from command line using
spark-submit instead run-example, in that case, I can learn more how to run
spark code in general.

However, it told me CLASS_NOT_FOUND about htrace since I am using CDH5.4. I
successfully located the htrace jar file but I am having a hard time adding
it to path.

This is the final spark-submit command I have but still have the class not
found error.  Can anyone help me with this?

#!/bin/bash
export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark

/bin/bash $SPARK_HOME/bin/spark-submit \
--master yarn-client \
--class org.apache.spark.examples.HBaseTest \
--driver-class-path
/etc/hbase/conf:$SPARK_HOME/examples/lib/*.jar:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/*.jar
\
--jars
$SPARK_HOME/examples/lib/*.jar:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/*.jar
\
$SPARK_HOME/examples/lib/*.jar \
myhbasetablename

Note:
htrace-core-3.0.4.jar, htrace-core-3.1.0-incubating.jar, htrace-core.jar
are all located under '
/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/'.


Re: Specify Python interpreter

2015-05-12 Thread Bin Wang
Hi Felix and Tomoas,

Thanks a lot for your information. I figured out the environment variable
PYSPARK_PYTHON is the secret key.

My current approach is to start iPython notebook on the namenode,

export PYSPARK_PYTHON=/opt/local/anaconda/bin/ipython
/opt/local/anaconda/bin/ipython notebook --profile=mypysparkprofile

In my iPython notebook, I have the flexibility to manually start my
SparkContext in a way like this:

os.environ[YARN_CONF_DIR] = /etc/hadoop/conf
os.environ[JAVA_HOME] = /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/
sys.path.append(/opt/cloudera/parcels/CDH/lib/spark/python)

  
sys.path.append(/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip)
from pyspark import SparkContext, SparkConf
sconf = SparkConf()
conf = (SparkConf().setMaster(spark://datafireball1:7077)
.setAppName(SparkApplication)
.set(spark.executor.memory, 16g)
.set(spark.ui.killEnabled, true))
sc = SparkContext(conf=conf)

This really works out for me and I am using the lastest iPython notebook to
interactively write Spark application.

If you have a better Python solution will can offer a better workflow for
interactive spark development. Please share.

Bin


On Tue, May 12, 2015 at 1:20 AM, Tomas Olsson tomas.ols...@mdh.se wrote:

 Hi,
 You can try

 PYSPARK_DRIVER_PYTHON=/path/to/ipython
 PYSPARK_DRIVER_PYTHON_OPTS=notebook” /path/to//pyspark


 /Tomas

  On 11 May 2015, at 22:17, Bin Wang binwang...@gmail.com wrote:
 
  Hey there,
 
  I have installed a python interpreter in certain location, say
 /opt/local/anaconda.
 
  Is there anything that I can specify the Python interpreter while
 developing in iPython notebook? Maybe a property in the while creating the
 Sparkcontext?
 
 
  I know that I can put #!/opt/local/anaconda at the top of my Python
 code and use spark-submit to distribute it to the cluster. However, since I
 am using iPython notebook, this is not available as an option.
 
  Best,
 
  Bin




Specify Python interpreter

2015-05-11 Thread Bin Wang
Hey there,

I have installed a python interpreter in certain location, say
/opt/local/anaconda.

Is there anything that I can specify the Python interpreter while
developing in iPython notebook? Maybe a property in the while creating the
Sparkcontext?


I know that I can put #!/opt/local/anaconda at the top of my Python code
and use spark-submit to distribute it to the cluster. However, since I am
using iPython notebook, this is not available as an option.

Best,

Bin


Spark on top of YARN Compression in iPython notebook

2015-05-10 Thread Bin Wang
I started a AWS cluster (1master + 3core) and download the prebuilt Spark
binary. I downloaded the latest Anaconda Python and started a iPython
notebook server by running the command below:

ipython notebook --port  --profile nbserver --no-browser

Then, I try to develop a Spark application running on top of YARN
interactively in the iPython notebook:

Here is the code that I have written:

import sys
import os
from pyspark import SparkContext, SparkConf
sys.path.append('/home/hadoop/myuser/spark-1.3.1-bin-hadoop2.4/python')
sys.path.append('/home/hadoop/myuser/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip')
os.environ[YARN_CONF_DIR] = /home/hadoop/conf
os.environ[SPARK_HOME] = /home/hadoop/bwang/spark-1.3.1-bin-hadoop2.4
conf = (SparkConf()
.setMaster(yarn-client)
.setAppName(Spark ML)
.set(spark.executor.memory, 2g)
   )
sc = SparkContext(conf=conf)
data = sc.textFile(hdfs://
ec2-xx.xx.xx..compute-1.amazonaws.com:8020/data/*)
data.count()

The code works all the way till the count, and it shows
com.hadoop.compression.lzo.LzoCodec not found..
Here http://www.wepaste.com/sparkcompression/is the full log.

I did some search, and it is basically around Spark cannot access Lzocodec
library.

I have tried to use os.environ to set the SPARK_CLASSPATH and
SPARK_LIBRARY_PATH to include the hadoop-lzo.jar which is located in
./home/hadoop/.versions/2.4.0-amzn-4/share/hadoop/common/lib/hadoop-lzo.jar
 in AWS hadoop. However, it is still not working.

Can anyone show me how to solve this problem?


Anaconda iPython notebook working with CDH Spark

2014-12-28 Thread Bin Wang
Hi there,

I have a cluster with CDH5.1 running on top of Redhat6.5, where the default
Python version is 2.6. I am trying to set up a proper iPython notebook
environment to develop spark application using pyspark.

Here
http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/
is a tutorial that I have been following. However, it turned out that the
author was using iPython1 where we have the latest Anaconda Python2.7
installed on our name node. When I finished following the tutorial, I can
connect to the spark cluster but whenever I tried to distribute the work,
it will errorred out and google tells me it is the difference between the
version of Python across the cluster.

Here are a few thoughts that I am planning to try.
(1) remove the Anaconda Python from the namenode and install the iPython
version that is compatible with Python2.6.
(2) or I need to install Anaconda Python on every node and make it the
default Python version across the whole cluster (however, I am not sure if
this plan will totally screw up the existing environment since some running
services are built by Python2.6...)

Let me which should be the proper way to set up an iPython notebook
environment.

Best regards,

Bin


Re: Spark and HBase

2014-04-08 Thread Bin Wang
Hi Flavio,

I happened to attend, actually attending the 2014 Apache Conf, I heard a
project called Apache Phoenix, which fully leverage HBase and suppose to
be 1000x faster than Hive. And it is not memory bounded, in which case sets
up a limit for Spark. It is still in the incubating group and the stats
functions spark has already implemented are still on the roadmap. I am not
sure whether it will be good but might be something interesting to check
out.

/usr/bin


On Tue, Apr 8, 2014 at 9:57 AM, Flavio Pompermaier pomperma...@okkam.itwrote:

 Hi to everybody,

 in these days I looked a bit at the recent evolution of the big data
 stacks and it seems that HBase is somehow fading away in favour of
 Spark+HDFS. Am I correct?
 Do you think that Spark and HBase should work together or not?

 Best regards,
 Flavio



Re: Missing Spark URL after staring the master

2014-03-04 Thread Bin Wang
Hi Mayur,

I am using CDH4.6.0p0.26.  And the latest Cloudera Spark parcel is Spark
0.9.0 CDH4.6.0p0.50.
As I mentioned, somehow, the Cloudera Spark version doesn't contain the
run-example shell scripts.. However, it is automatically configured and it
is pretty easy to set up across the cluster...

Thanks,
Bin


On Tue, Mar 4, 2014 at 10:59 AM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 I have on cloudera vm
 http://docs.sigmoidanalytics.com/index.php/How_to_Install_Spark_on_Cloudera_VM
 which version are you trying to setup on cloudera.. also which cloudera
 version are you using...


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Mon, Mar 3, 2014 at 4:29 PM, Bin Wang binwang...@gmail.com wrote:

 Hi Ognen/Mayur,

 Thanks for the reply and it is good to know how easy it is to setup Spark
 on AWS cluster.

 My situation is a bit different from yours, our company already have a
 cluster and it really doesn't make that much sense not to use them. That is
 why I have been going through this. I really wish there are some
 tutorials teaching you how to set up Spark Cluster on baremetal CDH cluster
 or .. some way to tweak the CDH Spark distribution, so it is up to date.

 Ognen, of course it will be very helpful if you can 'history | grep
 spark... ' and document the work that you have done since you've already
 made it!

 Bin



 On Mon, Mar 3, 2014 at 2:06 PM, Ognen Duzlevski 
 og...@plainvanillagames.com wrote:

  I should add that in this setup you really do not need to look for the
 printout of the master node's IP - you set it yourself a priori. If anyone
 is interested, let me know, I can write it all up so that people can follow
 some set of instructions. Who knows, maybe I can come up with a set of
 scripts to automate it all...

 Ognen



 On 3/3/14, 3:02 PM, Ognen Duzlevski wrote:

 I have a Standalone spark cluster running in an Amazon VPC that I set up
 by hand. All I did was provision the machines from a common AMI image (my
 underlying distribution is Ubuntu), I created a sparkuser on each machine
 and I have a /home/sparkuser/spark folder where I downladed spark. I did
 this on the master only, I did sbt/sbt assemble and I set up the
 conf/spark-env.sh to point to the master which is an IP address (in my case
 10.10.0.200, the port is the default 7077). I also set up the slaves file
 in the same subdirectory to have all 16 ip addresses of the worker nodes
 (in my case 10.10.0.201-216). After sbt/sbt assembly was done on master, I
 then did cd ~/; tar -czf spark.tgz spark/ and I copied the resulting tgz
 file to each worker using the same sparkuser account and unpacked the
 .tgz on each slave (this will effectively replicate everything from master
 to all slaves - you can script this so you don't do it by hand).

 Your AMI should have the distribution's version of Java and git
 installed by the way.

 All you have to do then is sparkuser@spark-master
 spark/sbin/start-all.sh (for 0.9, in 0.8.1 it is spark/bin/start-all.sh)
 and it will all automagically start :)

 All my Amazon nodes come with 4x400 Gb of ephemeral space which I have
 set up into a 1.6TB RAID0 array on each node and I am pooling this into an
 HDFS filesystem which is operated by a namenode outside the spark cluster
 while all the datanodes are the same nodes as the spark workers. This
 enables replication and extremely fast access since ephemeral is much
 faster than EBS or anything else on Amazon (you can do even better with SSD
 drives on this setup but it will cost ya).

 If anyone is interested I can document our pipeline set up - I came up
 with it myself and do not have a clue as to what the industry standards are
 since I could not find any written instructions anywhere online about how
 to set up a whole data analytics pipeline from the point of ingestion to
 the point of analytics (people don't want to share their secrets? or am I
 just in the dark and incapable of using Google properly?). My requirement
 was that I wanted this to run within a VPC for added security and
 simplicity, the Amazon security groups get really old quickly. Added bonus
 is that you can use a VPN as an entry into the whole system and your
 cluster instantly becomes local to you in terms of IPs etc. I use OpenVPN
 since I don't like Cisco nor Juniper (the only two options Amazon provides
 for their VPN gateways).

 Ognen


 On 3/3/14, 1:00 PM, Bin Wang wrote:

 Hi there,

  I have a CDH cluster set up, and I tried using the Spark parcel come
 with Cloudera Manager, but it turned out they even don't have the
 run-example shell command in the bin folder. Then I removed it from the
 cluster and cloned the incubator-spark into the name node of my cluster,
 and built from source there successfully with everything as default.

  I ran a few examples and everything seems work fine in the local mode.
 Then I am thinking about scale it to my cluster, which is what

Spark Streaming Maven Build

2014-03-04 Thread Bin Wang
Hi there,

I tried the Kafka WordCount example and it works perfect and the code is
pretty straightforward to understand.

Can anyone show to me how to start your own maven project with the
KafkaWordCount example using minimum-effort.

1. How the pom file should look like (including jar-plugin?
assembly-plugin?..etc)
2. mvn install or mvn clean install or mvn install compile assembly:single?
3. after you have a jar file, then how do you execute the jar file instead
of using bin/run-example...


To answer those people who might ask what you have done
(Here is a derivative from the KafkaWordCount example that I have written
to do IP count example where the input data from Kafka is actually JSON
string.
https://github.com/biwa7636/binwangREPO
I had such a bad lucky got it to working. So if anyone can copy the code of
WordCountExample and build it using maven and got it working.. if you can
share your pom and those maven commands, I will be so appreciated!)

Best regards and let me know if you need further info.

Bin