PySpark read from HBase
Hi there, I have lots of raw data in several Hive tables where we built a workflow to "join" those records together and restructured into HBase. It was done using plain MapReduce to generate HFile, and then load incremental from HFile into HBase to guarantee the best performance. However, we need to do some time series analysis for each of the record in HBase, but the implementation was done in Python (pandas, scikit learn) which is pretty time-consuming to reproduce in Java, Scala. I am thinking PySpark is probably the best approach if it works. Can pyspark read from HFile directory? or can it read from HBase in parallel? I don't see that many examples out there so any help or guidance will be appreciated. Also, we are using Cloudera Hadoop so there might be a slight delay with the latest Spark release. Best regards, Bin
How to close connection in mapPartitions?
I use mapPartitions to open connections to Redis, I write it like this: val seqs = lines.mapPartitions { lines => val cache = new RedisCache(redisUrl, redisPort) val result = lines.map(line => Parser.parseBody(line, cache)) cache.redisPool.close result } But it seems the pool is closed before I use it. Am I doing anything wrong? Here is the error: java.lang.IllegalStateException: Pool not open at org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) at com.redis.RedisClientPool.withClient(Pool.scala:34) at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) at com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) at com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) at com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) at com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Re: How to close connection in mapPartitions?
BTW, "lines" is a DStream. Bin Wang <wbi...@gmail.com>于2015年10月23日周五 下午2:16写道: > I use mapPartitions to open connections to Redis, I write it like this: > > val seqs = lines.mapPartitions { lines => > val cache = new RedisCache(redisUrl, redisPort) > val result = lines.map(line => Parser.parseBody(line, cache)) > cache.redisPool.close > result > } > > But it seems the pool is closed before I use it. Am I doing anything > wrong? Here is the error: > > java.lang.IllegalStateException: Pool not open > at > org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) > at > org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) > at com.redis.RedisClientPool.withClient(Pool.scala:34) > at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) > at > com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) > at > com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) > at > com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) > at > com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > >
Dose spark auto invoke StreamingContext.stop while receive kill signal?
I'd like the spark application to be stopped gracefully while received kill signal, so I add these code: sys.ShutdownHookThread { println("Gracefully stopping Spark Streaming Application") ssc.stop(stopSparkContext = true, stopGracefully = true) println("Application stopped") } But the application is not stopped gracefully: 15/09/23 17:44:38 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM ... 15/09/23 17:44:38 INFO streaming.StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook Dose spark auto invoke StreamingContext.stop for me?
Is it possible to merged delayed batches in streaming?
I'm using Spark Streaming and there maybe some delays between batches. I'd like to know is it possible to merge delayed batches into one batch to do processing? For example, the interval is set to 5 min but the first batch uses 1 hour, so there are many batches delayed. In the end of processing for each batch, I'll save the data into database. So if all the delayed batches are merged into a big one, it will save many resources. I'd like to know if it is possible. Thanks.
Re: How to recovery DStream from checkpoint directory?
In my understand, here I have only three options to keep the DStream state between redeploys (yes, I'm using updateStateByKey): 1. Use checkpoint. 2. Use my own database. 3. Use both. But none of these options are great: 1. Use checkpoint: I cannot load it after code change. Or I need to keep the structure of the classes, which seems to be impossible in a developing project. 2. Use my own database: there may be failure between the program read data from Kafka and save the DStream to database. So there may have data lose. 3. Use both: Will the data load two times? How can I know in which situation I should use the which one? The power of checkpoint seems to be very limited. Is there any plan to support checkpoint while class is changed, like the discussion you gave me pointed out? Akhil Das <ak...@sigmoidanalytics.com>于2015年9月17日周四 下午3:26写道: > Any kind of changes to the jvm classes will make it fail. By checkpointing > the data you mean using checkpoint with updateStateByKey? Here's a similar > discussion happened earlier which will clear your doubts i guess > http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E > > Thanks > Best Regards > > On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang <wbi...@gmail.com> wrote: > >> And here is another question. If I load the DStream from database every >> time I start the job, will the data be loaded when the job is failed and >> auto restart? If so, both the checkpoint data and database data are loaded, >> won't this a problem? >> >> >> >> Bin Wang <wbi...@gmail.com>于2015年9月16日周三 下午8:40写道: >> >>> Will StreamingContex.getOrCreate do this work?What kind of code change >>> will make it cannot load? >>> >>> Akhil Das <ak...@sigmoidanalytics.com>于2015年9月16日周三 20:20写道: >>> >>>> You can't really recover from checkpoint if you alter the code. A >>>> better approach would be to use some sort of external storage (like a db or >>>> zookeeper etc) to keep the state (the indexes etc) and then when you deploy >>>> new code they can be easily recovered. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang <wbi...@gmail.com> wrote: >>>> >>>>> I'd like to know if there is a way to recovery dstream from checkpoint. >>>>> >>>>> Because I stores state in DStream, I'd like the state to be recovered >>>>> when I restart the application and deploy new code. >>>>> >>>> >>>> >
Re: How to recovery DStream from checkpoint directory?
Thanks Adrian, the hint of use updateStateByKey with initialRdd helps a lot! Adrian Tanase <atan...@adobe.com>于2015年9月17日周四 下午4:50写道: > This section in the streaming guide makes your options pretty clear > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code > > >1. Use 2 versions in parallel, drain the queue up to a point and strat >fresh in the new version, only processing events from that point forward > 1. Note that “up to a point” is specific to you state management > logic, it might mean “user sessions stated after 4 am” NOT “events > received > after 4 am” >2. Graceful shutdown and saving data to DB, followed by checkpoint >cleanup / new checkpoint dir > 1. On restat, you need to use the updateStateByKey that takes an > initialRdd with the values preloaded from DB > 2. By cleaning the checkpoint in between upgrades, data is loaded > only once > > Hope this helps, > -adrian > > From: Bin Wang > Date: Thursday, September 17, 2015 at 11:27 AM > To: Akhil Das > Cc: user > Subject: Re: How to recovery DStream from checkpoint directory? > > In my understand, here I have only three options to keep the DStream state > between redeploys (yes, I'm using updateStateByKey): > > 1. Use checkpoint. > 2. Use my own database. > 3. Use both. > > But none of these options are great: > > 1. Use checkpoint: I cannot load it after code change. Or I need to keep > the structure of the classes, which seems to be impossible in a developing > project. > 2. Use my own database: there may be failure between the program read data > from Kafka and save the DStream to database. So there may have data lose. > 3. Use both: Will the data load two times? How can I know in which > situation I should use the which one? > > The power of checkpoint seems to be very limited. Is there any plan to > support checkpoint while class is changed, like the discussion you gave me > pointed out? > > > > Akhil Das <ak...@sigmoidanalytics.com>于2015年9月17日周四 下午3:26写道: > >> Any kind of changes to the jvm classes will make it fail. By >> checkpointing the data you mean using checkpoint with updateStateByKey? >> Here's a similar discussion happened earlier which will clear your doubts i >> guess >> http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E >> >> Thanks >> Best Regards >> >> On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang <wbi...@gmail.com> wrote: >> >>> And here is another question. If I load the DStream from database every >>> time I start the job, will the data be loaded when the job is failed and >>> auto restart? If so, both the checkpoint data and database data are loaded, >>> won't this a problem? >>> >>> >>> >>> Bin Wang <wbi...@gmail.com>于2015年9月16日周三 下午8:40写道: >>> >>>> Will StreamingContex.getOrCreate do this work?What kind of code change >>>> will make it cannot load? >>>> >>>> Akhil Das <ak...@sigmoidanalytics.com>于2015年9月16日周三 20:20写道: >>>> >>>>> You can't really recover from checkpoint if you alter the code. A >>>>> better approach would be to use some sort of external storage (like a db >>>>> or >>>>> zookeeper etc) to keep the state (the indexes etc) and then when you >>>>> deploy >>>>> new code they can be easily recovered. >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang <wbi...@gmail.com> wrote: >>>>> >>>>>> I'd like to know if there is a way to recovery dstream from >>>>>> checkpoint. >>>>>> >>>>>> Because I stores state in DStream, I'd like the state to be recovered >>>>>> when I restart the application and deploy new code. >>>>>> >>>>> >>>>> >>
Re: How to recovery DStream from checkpoint directory?
Will StreamingContex.getOrCreate do this work?What kind of code change will make it cannot load? Akhil Das <ak...@sigmoidanalytics.com>于2015年9月16日周三 20:20写道: > You can't really recover from checkpoint if you alter the code. A better > approach would be to use some sort of external storage (like a db or > zookeeper etc) to keep the state (the indexes etc) and then when you deploy > new code they can be easily recovered. > > Thanks > Best Regards > > On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang <wbi...@gmail.com> wrote: > >> I'd like to know if there is a way to recovery dstream from checkpoint. >> >> Because I stores state in DStream, I'd like the state to be recovered >> when I restart the application and deploy new code. >> > >
How to recovery DStream from checkpoint directory?
I'd like to know if there is a way to recovery dstream from checkpoint. Because I stores state in DStream, I'd like the state to be recovered when I restart the application and deploy new code.
Re: How to recovery DStream from checkpoint directory?
And here is another question. If I load the DStream from database every time I start the job, will the data be loaded when the job is failed and auto restart? If so, both the checkpoint data and database data are loaded, won't this a problem? Bin Wang <wbi...@gmail.com>于2015年9月16日周三 下午8:40写道: > Will StreamingContex.getOrCreate do this work?What kind of code change > will make it cannot load? > > Akhil Das <ak...@sigmoidanalytics.com>于2015年9月16日周三 20:20写道: > >> You can't really recover from checkpoint if you alter the code. A better >> approach would be to use some sort of external storage (like a db or >> zookeeper etc) to keep the state (the indexes etc) and then when you deploy >> new code they can be easily recovered. >> >> Thanks >> Best Regards >> >> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang <wbi...@gmail.com> wrote: >> >>> I'd like to know if there is a way to recovery dstream from checkpoint. >>> >>> Because I stores state in DStream, I'd like the state to be recovered >>> when I restart the application and deploy new code. >>> >> >>
How to clear Kafka offset in Spark streaming?
Hi, I'm using spark streaming with kafka and I need to clear the offset and re-compute all things. I deleted checkpoint directory in HDFS and reset kafka offset with "kafka-run-class kafka.tools.ImportZkOffsets". I can confirm the offset is set to 0 in kafka: ~ > kafka-run-class kafka.tools.ConsumerOffsetChecker --group adhoc_data_spark --topic adhoc_data --zookeeper szq1.appadhoc.com:2181 Group Topic Pid Offset logSize Lag Owner adhoc_data_spark adhoc_data 0 0 5280743 5280743 none But when I restart spark streaming, the offset is reset to logSize, I cannot figure out why is that, can anybody help? Thanks.
Re: Data lost in spark streaming
There is some error logs in the executor and I don't know if it is related: 15/09/11 10:54:05 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01 15/09/11 10:54:05 WARN yarn.ApplicationMaster: Reporter thread fails 4 time(s) in a row. org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken from appattempt_1440495451668_0258_01 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53) at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79) at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy22.allocate(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:278) at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:174) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:323) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01 at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy21.allocate(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77) ... 9 more ... 15/09/11 10:54:10 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01 15/09/11 10:54:10 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 12, (reason: Exception was thrown 5 time(s) from Reporter thread.) 15/09/11 10:54:10 INFO streaming.StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 15/09/11 10:54:10 INFO scheduler.ReceiverTracker: Sent stop signal to all 1 receivers 15/09/11 10:54:12 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver Tathagata Das <t...@databricks.com>于2015年9月13日周日 下午4:05写道: > Maybe the driver got restarted. See the log4j logs of the driver before it > restarted. > > On Thu, Sep 10, 2015 at 11:32 PM, Bin Wang <wbi...@gmail.com> wrote: > >> I'm using spark streaming 1.4.0 and have a DStream that have all the data >> it received. But today the history data in the DStream seems to be lost >> suddenly. And the application UI also lost the streaming process time and >> all the related data. Could any give some hint to debug this? Thanks. >> >> >> >
Data lost in spark streaming
I'm using spark streaming 1.4.0 and have a DStream that have all the data it received. But today the history data in the DStream seems to be lost suddenly. And the application UI also lost the streaming process time and all the related data. Could any give some hint to debug this? Thanks.
Re: Will multiple filters on the same RDD optimized to one filter?
What if I would use both rdd1 and rdd2 later? Raghavendra Pandey raghavendra.pan...@gmail.com于2015年7月16日周四 下午4:08写道: If you cache rdd it will save some operations. But anyway filter is a lazy operation. And it runs based on what you will do later on with rdd1 and rdd2... Raghavendra On Jul 16, 2015 1:33 PM, Bin Wang wbi...@gmail.com wrote: If I write code like this: val rdd = input.map(_.value) val f1 = rdd.filter(_ == 1) val f2 = rdd.filter(_ == 2) ... Then the DAG of the execution may be this: - Filter - ... Map - Filter - ... But the two filters is operated on the same RDD, which means it could be done by just scan the RDD once. Does spark have this kind optimization for now?
Will multiple filters on the same RDD optimized to one filter?
If I write code like this: val rdd = input.map(_.value) val f1 = rdd.filter(_ == 1) val f2 = rdd.filter(_ == 2) ... Then the DAG of the execution may be this: - Filter - ... Map - Filter - ... But the two filters is operated on the same RDD, which means it could be done by just scan the RDD once. Does spark have this kind optimization for now?
Spark Streaming Hangs on Start
I'm using spark streaming with Kafka, and submit it to YARN cluster with mode yarn-cluster. But it hangs at SparkContext.start(). The Kafka config is right since it can show some events in Streaming tab of web UI. The attached file is the screen shot of the Jobs tab of web UI. The code in the main class is: object StatCounter { val config = ConfigFactory.load() val redisUrl = config.getString(redis.url) val redisPort = config.getInt(redis.port) val zkQuorum = config.getString(kafka.zkQuorum) val group = config.getString(kafka.group) val topic = config.getString(kafka.topic) val threadNum = config.getInt(kafka.threadNum) val cache = new RedisCache(redisUrl, redisPort) def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(config.getString(spark.name)) .set(spark.cassandra.connection.host, config.getString(cassandra.host)) val ssc = new StreamingContext(conf, Seconds(config.getInt(spark.interval))) ssc.checkpoint(config.getString(spark.checkpoint)) val storage = new CassandraStorage(adhoc_data, ssc) val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - threadNum)).map(_._2) val logs = lines.flatMap(line = Parser.parseBody(line, cache)) Counter.count(logs, storage) sys.ShutdownHookThread { println(Gracefully stopping Spark Streaming Application) ssc.stop(stopSparkContext = true, stopGracefully = true) println(Application stopped) } ssc.start() ssc.awaitTermination() } } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Hangs on Start
Thanks for the help. I set --executor-cores and it works now. I've used --total-executor-cores and don't realize it changed. Tathagata Das t...@databricks.com于2015年7月10日周五 上午3:11写道: 1. There will be a long running job with description start() as that is the jobs that is running the receivers. It will never end. 2. You need to set the number of cores given to the Spark executors by the YARN container. That is SparkConf spark.executor.cores, --executor-cores in spark-submit. Since it is by default 1, your only container has one core which is occupied by the receiver, leaving no cores to run the map tasks. So the map stage is blocked 3. Note these log lines. Especially 15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Received stop signal . I think somehow your streaming context is being shutdown too early which is causing the KafkaReceiver to stop. Something your should debug. 15/07/09 18:27:13 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Starting 15/07/09 18:27:13 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] Added fetcher for partitions ArrayBuffer([[adhoc_data,0], initOffset 53 to broker id:42,host:szq1.appadhoc.com,port:9092] ) 15/07/09 18:27:13 INFO storage.MemoryStore: ensureFreeSpace(1680) called with curMem=96628, maxMem=16669841817 15/07/09 18:27:13 INFO storage.MemoryStore: Block input-0-1436437633600 stored as bytes in memory (estimated size 1680.0 B, free 15.5 GB) 15/07/09 18:27:13 WARN storage.BlockManager: Block input-0-1436437633600 replicated to only 0 peer(s) instead of 1 peers 15/07/09 18:27:14 INFO receiver.BlockGenerator: Pushed block input-0-1436437633600*15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Received stop signal *15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver: 15/07/09 18:29:00 INFO consumer.ZookeeperConsumerConnector: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], ZKConsumerConnector shutting down 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] Stopping leader finder thread 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], Shutting down 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], Stopped 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], Shutdown completed 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] Stopping all fetchers 15/07/09 18:29:00 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Shutting down 15/07/09 18:29:01 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Stopped 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Shutdown completed 15/07/09 18:29:01 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] All connections stopped 15/07/09 18:29:01 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. 15/07/09 18:29:01 INFO zookeeper.ZooKeeper: Session: 0x14e70eedca00315 closed 15/07/09 18:29:01 INFO zookeeper.ClientCnxn: EventThread shut down 15/07/09 18:29:01 INFO consumer.ZookeeperConsumerConnector: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], ZKConsumerConnector shutdown completed in 74 ms 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0
Re: How to submit streaming application and exit
Thanks. Actually I've find the way. I'm using spark-submit to submit the job the a YARN cluster with --mater yarn-cluster (which spark-submit process is not the driver). So I can config spark.yarn.submit.waitAppComplettion to false so that the process will exit after the job is submitted. ayan guha guha.a...@gmail.com于2015年7月8日周三 下午12:26写道: spark-submit is nothing but a process in your OS, so you should be able to submit it in background and exit. However, your spark-submit process itself is the driver for your spark streaming application, so it will not exit for the lifetime of the streaming app. On Wed, Jul 8, 2015 at 1:13 PM, Bin Wang wbi...@gmail.com wrote: I'm writing a streaming application and want to use spark-submit to submit it to a YARN cluster. I'd like to submit it in a client node and exit spark-submit after the application is running. Is it possible? -- Best Regards, Ayan Guha
How to submit streaming application and exit
I'm writing a streaming application and want to use spark-submit to submit it to a YARN cluster. I'd like to submit it in a client node and exit spark-submit after the application is running. Is it possible?
Problem Run Spark Example HBase Code Using Spark-Submit
I am trying to run the Spark example code HBaseTest from command line using spark-submit instead run-example, in that case, I can learn more how to run spark code in general. However, it told me CLASS_NOT_FOUND about htrace since I am using CDH5.4. I successfully located the htrace jar file but I am having a hard time adding it to path. This is the final spark-submit command I have but still have the class not found error. Can anyone help me with this? #!/bin/bash export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark /bin/bash $SPARK_HOME/bin/spark-submit \ --master yarn-client \ --class org.apache.spark.examples.HBaseTest \ --driver-class-path /etc/hbase/conf:$SPARK_HOME/examples/lib/*.jar:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/*.jar \ --jars $SPARK_HOME/examples/lib/*.jar:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/*.jar \ $SPARK_HOME/examples/lib/*.jar \ myhbasetablename Note: htrace-core-3.0.4.jar, htrace-core-3.1.0-incubating.jar, htrace-core.jar are all located under ' /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/'.
Re: Specify Python interpreter
Hi Felix and Tomoas, Thanks a lot for your information. I figured out the environment variable PYSPARK_PYTHON is the secret key. My current approach is to start iPython notebook on the namenode, export PYSPARK_PYTHON=/opt/local/anaconda/bin/ipython /opt/local/anaconda/bin/ipython notebook --profile=mypysparkprofile In my iPython notebook, I have the flexibility to manually start my SparkContext in a way like this: os.environ[YARN_CONF_DIR] = /etc/hadoop/conf os.environ[JAVA_HOME] = /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/ sys.path.append(/opt/cloudera/parcels/CDH/lib/spark/python) sys.path.append(/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip) from pyspark import SparkContext, SparkConf sconf = SparkConf() conf = (SparkConf().setMaster(spark://datafireball1:7077) .setAppName(SparkApplication) .set(spark.executor.memory, 16g) .set(spark.ui.killEnabled, true)) sc = SparkContext(conf=conf) This really works out for me and I am using the lastest iPython notebook to interactively write Spark application. If you have a better Python solution will can offer a better workflow for interactive spark development. Please share. Bin On Tue, May 12, 2015 at 1:20 AM, Tomas Olsson tomas.ols...@mdh.se wrote: Hi, You can try PYSPARK_DRIVER_PYTHON=/path/to/ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook” /path/to//pyspark /Tomas On 11 May 2015, at 22:17, Bin Wang binwang...@gmail.com wrote: Hey there, I have installed a python interpreter in certain location, say /opt/local/anaconda. Is there anything that I can specify the Python interpreter while developing in iPython notebook? Maybe a property in the while creating the Sparkcontext? I know that I can put #!/opt/local/anaconda at the top of my Python code and use spark-submit to distribute it to the cluster. However, since I am using iPython notebook, this is not available as an option. Best, Bin
Specify Python interpreter
Hey there, I have installed a python interpreter in certain location, say /opt/local/anaconda. Is there anything that I can specify the Python interpreter while developing in iPython notebook? Maybe a property in the while creating the Sparkcontext? I know that I can put #!/opt/local/anaconda at the top of my Python code and use spark-submit to distribute it to the cluster. However, since I am using iPython notebook, this is not available as an option. Best, Bin
Spark on top of YARN Compression in iPython notebook
I started a AWS cluster (1master + 3core) and download the prebuilt Spark binary. I downloaded the latest Anaconda Python and started a iPython notebook server by running the command below: ipython notebook --port --profile nbserver --no-browser Then, I try to develop a Spark application running on top of YARN interactively in the iPython notebook: Here is the code that I have written: import sys import os from pyspark import SparkContext, SparkConf sys.path.append('/home/hadoop/myuser/spark-1.3.1-bin-hadoop2.4/python') sys.path.append('/home/hadoop/myuser/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip') os.environ[YARN_CONF_DIR] = /home/hadoop/conf os.environ[SPARK_HOME] = /home/hadoop/bwang/spark-1.3.1-bin-hadoop2.4 conf = (SparkConf() .setMaster(yarn-client) .setAppName(Spark ML) .set(spark.executor.memory, 2g) ) sc = SparkContext(conf=conf) data = sc.textFile(hdfs:// ec2-xx.xx.xx..compute-1.amazonaws.com:8020/data/*) data.count() The code works all the way till the count, and it shows com.hadoop.compression.lzo.LzoCodec not found.. Here http://www.wepaste.com/sparkcompression/is the full log. I did some search, and it is basically around Spark cannot access Lzocodec library. I have tried to use os.environ to set the SPARK_CLASSPATH and SPARK_LIBRARY_PATH to include the hadoop-lzo.jar which is located in ./home/hadoop/.versions/2.4.0-amzn-4/share/hadoop/common/lib/hadoop-lzo.jar in AWS hadoop. However, it is still not working. Can anyone show me how to solve this problem?
Anaconda iPython notebook working with CDH Spark
Hi there, I have a cluster with CDH5.1 running on top of Redhat6.5, where the default Python version is 2.6. I am trying to set up a proper iPython notebook environment to develop spark application using pyspark. Here http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ is a tutorial that I have been following. However, it turned out that the author was using iPython1 where we have the latest Anaconda Python2.7 installed on our name node. When I finished following the tutorial, I can connect to the spark cluster but whenever I tried to distribute the work, it will errorred out and google tells me it is the difference between the version of Python across the cluster. Here are a few thoughts that I am planning to try. (1) remove the Anaconda Python from the namenode and install the iPython version that is compatible with Python2.6. (2) or I need to install Anaconda Python on every node and make it the default Python version across the whole cluster (however, I am not sure if this plan will totally screw up the existing environment since some running services are built by Python2.6...) Let me which should be the proper way to set up an iPython notebook environment. Best regards, Bin
Re: Spark and HBase
Hi Flavio, I happened to attend, actually attending the 2014 Apache Conf, I heard a project called Apache Phoenix, which fully leverage HBase and suppose to be 1000x faster than Hive. And it is not memory bounded, in which case sets up a limit for Spark. It is still in the incubating group and the stats functions spark has already implemented are still on the roadmap. I am not sure whether it will be good but might be something interesting to check out. /usr/bin On Tue, Apr 8, 2014 at 9:57 AM, Flavio Pompermaier pomperma...@okkam.itwrote: Hi to everybody, in these days I looked a bit at the recent evolution of the big data stacks and it seems that HBase is somehow fading away in favour of Spark+HDFS. Am I correct? Do you think that Spark and HBase should work together or not? Best regards, Flavio
Re: Missing Spark URL after staring the master
Hi Mayur, I am using CDH4.6.0p0.26. And the latest Cloudera Spark parcel is Spark 0.9.0 CDH4.6.0p0.50. As I mentioned, somehow, the Cloudera Spark version doesn't contain the run-example shell scripts.. However, it is automatically configured and it is pretty easy to set up across the cluster... Thanks, Bin On Tue, Mar 4, 2014 at 10:59 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: I have on cloudera vm http://docs.sigmoidanalytics.com/index.php/How_to_Install_Spark_on_Cloudera_VM which version are you trying to setup on cloudera.. also which cloudera version are you using... Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Mar 3, 2014 at 4:29 PM, Bin Wang binwang...@gmail.com wrote: Hi Ognen/Mayur, Thanks for the reply and it is good to know how easy it is to setup Spark on AWS cluster. My situation is a bit different from yours, our company already have a cluster and it really doesn't make that much sense not to use them. That is why I have been going through this. I really wish there are some tutorials teaching you how to set up Spark Cluster on baremetal CDH cluster or .. some way to tweak the CDH Spark distribution, so it is up to date. Ognen, of course it will be very helpful if you can 'history | grep spark... ' and document the work that you have done since you've already made it! Bin On Mon, Mar 3, 2014 at 2:06 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: I should add that in this setup you really do not need to look for the printout of the master node's IP - you set it yourself a priori. If anyone is interested, let me know, I can write it all up so that people can follow some set of instructions. Who knows, maybe I can come up with a set of scripts to automate it all... Ognen On 3/3/14, 3:02 PM, Ognen Duzlevski wrote: I have a Standalone spark cluster running in an Amazon VPC that I set up by hand. All I did was provision the machines from a common AMI image (my underlying distribution is Ubuntu), I created a sparkuser on each machine and I have a /home/sparkuser/spark folder where I downladed spark. I did this on the master only, I did sbt/sbt assemble and I set up the conf/spark-env.sh to point to the master which is an IP address (in my case 10.10.0.200, the port is the default 7077). I also set up the slaves file in the same subdirectory to have all 16 ip addresses of the worker nodes (in my case 10.10.0.201-216). After sbt/sbt assembly was done on master, I then did cd ~/; tar -czf spark.tgz spark/ and I copied the resulting tgz file to each worker using the same sparkuser account and unpacked the .tgz on each slave (this will effectively replicate everything from master to all slaves - you can script this so you don't do it by hand). Your AMI should have the distribution's version of Java and git installed by the way. All you have to do then is sparkuser@spark-master spark/sbin/start-all.sh (for 0.9, in 0.8.1 it is spark/bin/start-all.sh) and it will all automagically start :) All my Amazon nodes come with 4x400 Gb of ephemeral space which I have set up into a 1.6TB RAID0 array on each node and I am pooling this into an HDFS filesystem which is operated by a namenode outside the spark cluster while all the datanodes are the same nodes as the spark workers. This enables replication and extremely fast access since ephemeral is much faster than EBS or anything else on Amazon (you can do even better with SSD drives on this setup but it will cost ya). If anyone is interested I can document our pipeline set up - I came up with it myself and do not have a clue as to what the industry standards are since I could not find any written instructions anywhere online about how to set up a whole data analytics pipeline from the point of ingestion to the point of analytics (people don't want to share their secrets? or am I just in the dark and incapable of using Google properly?). My requirement was that I wanted this to run within a VPC for added security and simplicity, the Amazon security groups get really old quickly. Added bonus is that you can use a VPN as an entry into the whole system and your cluster instantly becomes local to you in terms of IPs etc. I use OpenVPN since I don't like Cisco nor Juniper (the only two options Amazon provides for their VPN gateways). Ognen On 3/3/14, 1:00 PM, Bin Wang wrote: Hi there, I have a CDH cluster set up, and I tried using the Spark parcel come with Cloudera Manager, but it turned out they even don't have the run-example shell command in the bin folder. Then I removed it from the cluster and cloned the incubator-spark into the name node of my cluster, and built from source there successfully with everything as default. I ran a few examples and everything seems work fine in the local mode. Then I am thinking about scale it to my cluster, which is what
Spark Streaming Maven Build
Hi there, I tried the Kafka WordCount example and it works perfect and the code is pretty straightforward to understand. Can anyone show to me how to start your own maven project with the KafkaWordCount example using minimum-effort. 1. How the pom file should look like (including jar-plugin? assembly-plugin?..etc) 2. mvn install or mvn clean install or mvn install compile assembly:single? 3. after you have a jar file, then how do you execute the jar file instead of using bin/run-example... To answer those people who might ask what you have done (Here is a derivative from the KafkaWordCount example that I have written to do IP count example where the input data from Kafka is actually JSON string. https://github.com/biwa7636/binwangREPO I had such a bad lucky got it to working. So if anyone can copy the code of WordCountExample and build it using maven and got it working.. if you can share your pom and those maven commands, I will be so appreciated!) Best regards and let me know if you need further info. Bin