good http sync client to be used with spark
Hi, In our application pipeline we need to push the data from spark streaming to a http server. I would like to have a http client with below requirements. 1. synchronous calls 2. Http connection pool support 3. light weight and easy to use. spray,akka http are mostly suited for async call . Correct me if I am wrong. Could you please let me know what is the client that suits the above ?
Re: Spark Launch programatically - Basics!
We are using the below code for for integration test. You need to wait for the process state. .startApplication( new Listener { override def infoChanged(handle: SparkAppHandle): Unit = { println("*** info changed * ", handle.getAppId, handle.getState) } override def stateChanged(handle: SparkAppHandle): Unit = { println("*** state changed *", handle.getAppId, handle.getState) } }) // Initial state goes to unknown // To avoid the UNKNOWN state check below. Thread.sleep(1); def waitTillComplete(handler: SparkAppHandle): Unit = { while (!handler.getState.isFinal && handler.getState != SparkAppHandle.State.UNKNOWN) { println("State :%s".format(handler.getState())) Thread.sleep(5000) } } On Thu, May 18, 2017 at 2:17 AM, Nipun Arorawrote: > Hi, > > I am trying to get a simple spark application to run programatically. I > looked at http://spark.apache.org/docs/2.1.0/api/java/index. > html?org/apache/spark/launcher/package-summary.html, at the following > code. > >public class MyLauncher { > public static void main(String[] args) throws Exception { >SparkAppHandle handle = new SparkLauncher() > .setAppResource("/my/app.jar") > .setMainClass("my.spark.app.Main") > .setMaster("local") > .setConf(SparkLauncher.DRIVER_MEMORY, "2g") > .startApplication(); >// Use handle API to monitor / control application. > } >} > > > I don't have any errors in running this for my application, but I am > running spark in local mode and the launcher class immediately exits after > executing this function. Are we supposed to wait for the process state etc. > > Is there a more detailed example of how to monitor inputstreams etc. any > github link or blogpost would help. > > Thanks > Nipun >
Restart if driver gets insufficient resources
Hi All, We are running spark on kubernetes. There is a scenario in which the spark driver(pod) was not able to communicate properly with master and it got stuck saying insufficient resources. On restarting the spark driver (pod) manually , It was able to run properly. Is there a way to just kill the driver if it gets insufficient resources (or not being able to start up) ? PS: This is required because kubernetes supervise the driver instance and we would like to kill the driver immediately so that it will restart. Thanks Vimal
spark logging best practices
Hi, http://stackoverflow.com/questions/29208844/apache-spark-logging-within-scala What is the best way to capture spark logs without getting task not serialzible error ? The above link has various workarounds. Also is there a way to dynamically set the log level when the application is running ? (from warn to debug , without restarting the app) Thanks Vimal
Re: Spark master shuts down when one of zookeeper dies
Hi Ted, Thanks for the pointers. I had a three node zookeeper setup . Now the master alone dies when a zookeeper instance is down and a new master is elected as leader and the cluster is up. But the master that was down , never comes up. Is this the expected ? Is there a way to get alert when a master is down ? How to make sure that there is atleast one back up master is up always ? Thanks Vimal On Tue, Jun 28, 2016 at 7:24 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Please see some blog w.r.t. the number of nodes in the quorum: > > > http://stackoverflow.com/questions/13022244/zookeeper-reliability-three-versus-five-nodes > > http://www.ibm.com/developerworks/library/bd-zookeeper/ > the paragraph starting with 'A quorum is represented by a strict > majority of nodes' > > FYI > > On Tue, Jun 28, 2016 at 5:52 AM, vimal dinakaran <vimal3...@gmail.com> > wrote: > >> I am using zookeeper for providing HA for spark cluster. We have two >> nodes zookeeper cluster. >> >> When one of the zookeeper dies then the entire spark cluster goes down . >> >> Is this expected behaviour ? >> Am I missing something in config ? >> >> Spark version - 1.6.1. >> Zookeeper version - 3.4.6 >> // spark-env.sh >> SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER >> -Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181" >> >> Below is the log from spark master: >> ZooKeeperLeaderElectionAgent: We have lost leadership >> 16/06/27 09:39:30 ERROR Master: Leadership has been revoked -- master >> shutting down. >> >> Thanks >> Vimal >> >> >> >> >
Re: Restart App and consume from checkpoint using direct kafka API
I have implemented the above approach with cassandra db. Thank you all. On Thu, Mar 31, 2016 at 8:26 PM, Cody Koeninger <c...@koeninger.org> wrote: > Long story short, no. Don't rely on checkpoints if you cant handle > reprocessing some of your data. > > On Thu, Mar 31, 2016 at 3:02 AM, Imre Nagi <imre.nagi2...@gmail.com> > wrote: > > I'm dont know how to read the data from the checkpoint. But AFAIK and > based > > on my experience, I think the best thing that you can do is storing the > > offset to a particular storage such as database everytime you consume the > > message. Then read the offset from the database everytime you want to > start > > reading the message. > > > > nb: This approach is also explained by Cody in his blog post. > > > > Thanks > > > > On Thu, Mar 31, 2016 at 2:14 PM, vimal dinakaran <vimal3...@gmail.com> > > wrote: > >> > >> Hi, > >> In the blog > >> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md > >> > >> It is mentioned that enabling checkpoint works as long as the app jar is > >> unchanged. > >> > >> If I want to upgrade the jar with the latest code and consume from kafka > >> where it was stopped , how to do that ? > >> Is there a way to read the binary object of the checkpoint during init > and > >> use that to start from offset ? > >> > >> Thanks > >> Vimal > > > > >
Spark master shuts down when one of zookeeper dies
I am using zookeeper for providing HA for spark cluster. We have two nodes zookeeper cluster. When one of the zookeeper dies then the entire spark cluster goes down . Is this expected behaviour ? Am I missing something in config ? Spark version - 1.6.1. Zookeeper version - 3.4.6 // spark-env.sh SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181" Below is the log from spark master: ZooKeeperLeaderElectionAgent: We have lost leadership 16/06/27 09:39:30 ERROR Master: Leadership has been revoked -- master shutting down. Thanks Vimal
spark streaming application - deployment best practices
Hi All, I am using spark-submit cluster mode deployment for my application to run it in production. But this places a requirement of having the jars in the same path in all the nodes and also the config file which is passed as argument in the same path. I am running spark in standalone mode and I don't have hadoop or S3 environment. This mode of deployment is inconvenient. I could do spark submit from one node in client mode but it doesn't provide high availablity . What is the best way to deploy spark streaming applications in production ? Thanks Vimal
Restart App and consume from checkpoint using direct kafka API
Hi, In the blog https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md It is mentioned that enabling checkpoint works as long as the app jar is unchanged. If I want to upgrade the jar with the latest code and consume from kafka where it was stopped , how to do that ? Is there a way to read the binary object of the checkpoint during init and use that to start from offset ? Thanks Vimal
Re: spark streaming web ui not showing the events - direct kafka api
No I am using DSE 4.8 which has spark 1.4. Is this a known issue ? On Wed, Jan 27, 2016 at 11:52 PM, Cody Koeninger <c...@koeninger.org> wrote: > Have you tried spark 1.5? > > On Wed, Jan 27, 2016 at 11:14 AM, vimal dinakaran <vimal3...@gmail.com> > wrote: > >> Hi , >> I am using spark 1.4 with direct kafka api . In my streaming ui , I am >> able to see the events listed in UI only if add stream.print() statements >> or else event rate and input events remains in 0 eventhough the events gets >> processed. >> >> Without print statements , I have the action saveToCassandra in the >> dstream. >> >> Any reasons why is this not working ? >> >> Thanks >> Vimal >> > >
spark streaming web ui not showing the events - direct kafka api
Hi , I am using spark 1.4 with direct kafka api . In my streaming ui , I am able to see the events listed in UI only if add stream.print() statements or else event rate and input events remains in 0 eventhough the events gets processed. Without print statements , I have the action saveToCassandra in the dstream. Any reasons why is this not working ? Thanks Vimal
Re: Cluster mode dependent jars not working
--driver-classpath needs to be added with jars needed. But this is not being mentioned in the spark documentation. On Tue, Dec 15, 2015 at 9:13 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Please use --conf spark.executor.extraClassPath=XXX to specify dependent > jars. > > On Tue, Dec 15, 2015 at 3:57 AM, vimal dinakaran <vimal3...@gmail.com> > wrote: > >> I am running spark using cluster mode for deployment . Below is the >> command >> >> >> JARS=$JARS_HOME/amqp-client-3.5.3.jar,$JARS_HOME/nscala-time_2.10-2.0.0.jar,\ >> $JARS_HOME/kafka_2.10-0.8.2.1.jar,$JARS_HOME/kafka-clients-0.8.2.1.jar,\ >> $JARS_HOME/spark-streaming-kafka_2.10-1.4.1.jar,\ >> $JARS_HOME/zkclient-0.3.jar,$JARS_HOME/protobuf-java-2.4.0a.jar >> >> dse spark-submit -v --conf >> "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ >> --executor-memory 512M \ >> --total-executor-cores 3 \ >> --deploy-mode "cluster" \ >> --master spark://$MASTER:7077 \ >> --jars=$JARS \ >> --supervise \ >> --class "com.testclass" $APP_JAR input.json \ >> --files "/home/test/input.json" >> >> The above command is working fine in client mode. But when I use it in >> cluster mode I get class not found exception >> >> Exception in thread "main" java.lang.reflect.InvocationTargetException >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) >> at >> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) >> Caused by: java.lang.NoClassDefFoundError: >> org/apache/spark/streaming/kafka/KafkaUtils$ >> >> In client mode the dependent jars are getting copied to the >> /var/lib/spark/work directory whereas in cluster mode it is not. >> I am using nfs and I have mounted the same directory on all the spark >> nodes under same name. Still I get the error. >> >> From the verbose logs of dse spark-submit, I see the classpath elements >> are missing here . >> >> -- >> spark.hadoop.cassandra.input.native.ssl.trust.store.password -> cassandra >> spark.cassandra.connection.ssl.trustStore.password -> cassandra >> spark.ssl.keyStorePassword -> cassandra >> spark.cassandra.auth.username -> cassandra >> spark.hadoop.fs.har.impl -> org.apache.hadoop.fs.HarFileSystem >> Classpath elements: >> >> >> WARN 2015-12-15 17:08:48 org.apache.spark.util.Utils: Your hostname, >> demeter-dev-node2 resolves to a loopback address: 127.0.1.1; using >> 10.29.23.170 instead (on interface eth0) >> WARN 2015-12-15 17:08:48 org.apache.spark.util.Utils: Set SPARK_LOCAL_IP >> if you need to bind to another addres >> >> How it is able to pick the application jar which is also under same >> directory but not the dependent jars ? >> Please help me in getting this solved. >> >> >
Cluster mode dependent jars not working
I am running spark using cluster mode for deployment . Below is the command JARS=$JARS_HOME/amqp-client-3.5.3.jar,$JARS_HOME/nscala-time_2.10-2.0.0.jar,\ $JARS_HOME/kafka_2.10-0.8.2.1.jar,$JARS_HOME/kafka-clients-0.8.2.1.jar,\ $JARS_HOME/spark-streaming-kafka_2.10-1.4.1.jar,\ $JARS_HOME/zkclient-0.3.jar,$JARS_HOME/protobuf-java-2.4.0a.jar dse spark-submit -v --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --executor-memory 512M \ --total-executor-cores 3 \ --deploy-mode "cluster" \ --master spark://$MASTER:7077 \ --jars=$JARS \ --supervise \ --class "com.testclass" $APP_JAR input.json \ --files "/home/test/input.json" The above command is working fine in client mode. But when I use it in cluster mode I get class not found exception Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ In client mode the dependent jars are getting copied to the /var/lib/spark/work directory whereas in cluster mode it is not. I am using nfs and I have mounted the same directory on all the spark nodes under same name. Still I get the error. >From the verbose logs of dse spark-submit, I see the classpath elements are missing here . -- spark.hadoop.cassandra.input.native.ssl.trust.store.password -> cassandra spark.cassandra.connection.ssl.trustStore.password -> cassandra spark.ssl.keyStorePassword -> cassandra spark.cassandra.auth.username -> cassandra spark.hadoop.fs.har.impl -> org.apache.hadoop.fs.HarFileSystem Classpath elements: WARN 2015-12-15 17:08:48 org.apache.spark.util.Utils: Your hostname, demeter-dev-node2 resolves to a loopback address: 127.0.1.1; using 10.29.23.170 instead (on interface eth0) WARN 2015-12-15 17:08:48 org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you need to bind to another addres How it is able to pick the application jar which is also under same directory but not the dependent jars ? Please help me in getting this solved.
Re: Checkpoint not working after driver restart
I am pasting the code here . Please let me know if there is any sequence that is wrong. def createContext(checkpointDirectory: String, config: Config): StreamingContext = { println("Creating new context") val conf = new SparkConf(true).setAppName(appName).set("spark.streaming.unpersist","true") val ssc = new StreamingContext(conf, Seconds(config.getInt(batchIntervalParam))) ssc.checkpoint(checkpointDirectory) val isValid = validate(ssc, config) if (isValid) { val result = runJob(ssc, config) println("result is " + result) } else { println(isValid.toString) } ssc } def main(args: Array[String]): Unit = { if (args.length < 1) { println("Must specify the path to config file ") println("Usage progname ") return } val url = args(0) logger.info("Starting " + appName) println("Got the path as %s".format(url)) val source = scala.io.Source.fromFile(url) val lines = try source.mkString finally source.close() val config = ConfigFactory.parseString(lines) val directoryPath = config.getString(checkPointParam) val ssc = StreamingContext.getOrCreate(directoryPath, () => { createContext(directoryPath,config) }) ssc.start() ssc.awaitTermination() } def getRabbitMQStream(config: Config, ssc: StreamingContext): ReceiverInputDStream[String] = { val rabbitMQHost = config.getString(rabbitmqHostParam) val rabbitMQPort = config.getInt(rabbitmqPortParam) val rabbitMQQueue = config.getString(rabbitmqQueueNameParam) println("changing the memory lvel") val receiverStream: ReceiverInputDStream[String] = { RabbitMQUtils.createStreamFromAQueue(ssc, rabbitMQHost, rabbitMQPort, rabbitMQQueue,StorageLevel.MEMORY_AND_DISK_SER) } receiverStream.start() receiverStream } def getBaseDstream(config: Config, ssc: StreamingContext): ReceiverInputDStream[String] = { val baseDstream = config.getString(receiverTypeParam) match { case "rabbitmq" => getRabbitMQStream(config, ssc) } baseDstream } def runJob(ssc: StreamingContext, config: Config): Any = { val keyspace = config.getString(keyspaceParam) val clientStatsTable = config.getString(clientStatsTableParam) val hourlyStatsTable = config.getString(hourlyStatsTableParam) val batchInterval = config.getInt(batchIntervalParam) val windowInterval = config.getInt(windowIntervalParam) val hourlyInterval = config.getInt(hourlyParam) val limit = config.getInt(limitParam) val lines = getBaseDstream(config, ssc) val statsRDD = lines.filter(_.contains("client_stats")).map(_.split(",")(1)) val parserFunc = getProtobufParserFunction() val clientUsageRDD: DStream[((String, String), Double)] = statsRDD.flatMap(x => parserFunc(x)) val formatterFunc = getJsonFormatterFunc() val oneMinuteWindowResult = clientUsageRDD.reduceByKeyAndWindow((x: Double, y: Double) => x + y, Seconds(windowInterval), Seconds(batchInterval)) .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2 .reduceByKey((x, y) => (x ++ y)) .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit))) println("Client Usage from rabbitmq ") oneMinuteWindowResult.map(x => (x._1, DateTime.now, formatterFunc(x._2))).saveToCassandra(keyspace, clientStatsTable) oneMinuteWindowResult.print() val HourlyResult = clientUsageRDD.reduceByKeyAndWindow((x: Double, y: Double) => x + y, Seconds(hourlyInterval), Seconds(batchInterval)) .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2 .reduceByKey((x, y) => (x ++ y)) .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit))) HourlyResult.map(x => (x._1, DateTime.now, formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable) HourlyResult.map(x => (x, "hourly")).print() } } On Wed, Nov 4, 2015 at 12:27 PM, vimal dinakaran <vimal3...@gmail.com> wrote: > I have a simple spark streaming application which reads the data from the > rabbitMQ > and does some aggregation on window interval of 1 min and 1 hour for > batch interval of 30s. > > I have a three node setup. And to enable checkpoint, > I have mounted the same directory using sshfs to all worker node for > creating checkpoint. > > When I run the spark streaming App for the first time it works fine . > I could see the results being printed
Checkpoint not working after driver restart
I have a simple spark streaming application which reads the data from the rabbitMQ and does some aggregation on window interval of 1 min and 1 hour for batch interval of 30s. I have a three node setup. And to enable checkpoint, I have mounted the same directory using sshfs to all worker node for creating checkpoint. When I run the spark streaming App for the first time it works fine . I could see the results being printed on console and some checkpoints happening in the network directory. But when I run the job for the second time , it fails with the following exception at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) And the exception is repeated. I am not pumping huge data to the rabbitMQ. When I run the job for the first time I am dumping only < 100 events . And when I run for the second time, I have stopped the messages being sent to RabbitMQ from the producer process. I have tried setting "spark.streaming.unpersist","true" . And My Set up has 3 node each having one core allocated for spark and executor memory per node is 512MB. Please help me in solving this issue.