good http sync client to be used with spark

2017-05-31 Thread vimal dinakaran
Hi,
 In our application pipeline we need to push the data from spark streaming
to a http server.

I would like to have a http client with below requirements.

1. synchronous calls
2. Http connection pool support
3. light weight and easy to use.

spray,akka http are mostly suited for async call . Correct me if I am wrong.

Could you please let me know what is the client that suits the above ?


Re: Spark Launch programatically - Basics!

2017-05-23 Thread vimal dinakaran
We are using the below code for for integration test. You need to wait for
the process state.
.startApplication(
new Listener {
  override def infoChanged(handle: SparkAppHandle): Unit = {
println("*** info changed * ", handle.getAppId,
handle.getState)
  }

  override def stateChanged(handle: SparkAppHandle): Unit = {
println("*** state changed *", handle.getAppId,
handle.getState)
  }
})

// Initial state goes to unknown
// To avoid the UNKNOWN state check below.
Thread.sleep(1);

def waitTillComplete(handler: SparkAppHandle): Unit = {
while (!handler.getState.isFinal && handler.getState !=
SparkAppHandle.State.UNKNOWN) {
  println("State :%s".format(handler.getState()))
  Thread.sleep(5000)
}
  }

On Thu, May 18, 2017 at 2:17 AM, Nipun Arora 
wrote:

> Hi,
>
> I am trying to get a simple spark application to run programatically. I
> looked at http://spark.apache.org/docs/2.1.0/api/java/index.
> html?org/apache/spark/launcher/package-summary.html, at the following
> code.
>
>public class MyLauncher {
>  public static void main(String[] args) throws Exception {
>SparkAppHandle handle = new SparkLauncher()
>  .setAppResource("/my/app.jar")
>  .setMainClass("my.spark.app.Main")
>  .setMaster("local")
>  .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
>  .startApplication();
>// Use handle API to monitor / control application.
>  }
>}
>
>
> I don't have any errors in running this for my application, but I am
> running spark in local mode and the launcher class immediately exits after
> executing this function. Are we supposed to wait for the process state etc.
>
> Is there a more detailed example of how to monitor inputstreams etc. any
> github link or blogpost would help.
>
> Thanks
> Nipun
>


Restart if driver gets insufficient resources

2017-03-02 Thread vimal dinakaran
Hi All,
 We are running spark on kubernetes. There is a scenario in which the spark
driver(pod) was not able to communicate properly with master and it got
stuck saying insufficient resources.

On restarting the spark driver (pod) manually , It was able to run properly.

Is there a way to just kill the driver if it gets insufficient resources
(or not being able to start up) ?

PS: This is required because kubernetes supervise the driver instance and
we would like to kill the driver immediately so that it will restart.

Thanks
Vimal


spark logging best practices

2016-07-08 Thread vimal dinakaran
Hi,

http://stackoverflow.com/questions/29208844/apache-spark-logging-within-scala

What is the best way to capture spark logs without getting task not
serialzible error ?
The above link has various workarounds.

Also is there a way to dynamically set the log level when the application
is running ? (from warn to debug , without restarting the app)

Thanks
Vimal


Re: Spark master shuts down when one of zookeeper dies

2016-06-30 Thread vimal dinakaran
Hi Ted,
 Thanks for the pointers. I had a three node zookeeper setup . Now the
master alone dies when  a zookeeper instance is down and a new master is
elected as leader and the cluster is up.
But the master that was down , never comes up.

Is this the expected ? Is there a way to get alert when a master is down ?
How to make sure that there is atleast one back up master is up always ?

Thanks
Vimal




On Tue, Jun 28, 2016 at 7:24 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Please see some blog w.r.t. the number of nodes in the quorum:
>
>
> http://stackoverflow.com/questions/13022244/zookeeper-reliability-three-versus-five-nodes
>
> http://www.ibm.com/developerworks/library/bd-zookeeper/
>   the paragraph starting with 'A quorum is represented by a strict
> majority of nodes'
>
> FYI
>
> On Tue, Jun 28, 2016 at 5:52 AM, vimal dinakaran <vimal3...@gmail.com>
> wrote:
>
>> I am using zookeeper for providing HA for spark cluster.  We have two
>> nodes zookeeper cluster.
>>
>> When one of the zookeeper dies then the entire spark cluster goes down .
>>
>> Is this expected behaviour ?
>> Am I missing something in config ?
>>
>> Spark version - 1.6.1.
>> Zookeeper version - 3.4.6
>> // spark-env.sh
>> SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
>> -Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181"
>>
>> Below is the log from spark master:
>> ZooKeeperLeaderElectionAgent: We have lost leadership
>> 16/06/27 09:39:30 ERROR Master: Leadership has been revoked -- master
>> shutting down.
>>
>> Thanks
>> Vimal
>>
>>
>>
>>
>


Re: Restart App and consume from checkpoint using direct kafka API

2016-06-28 Thread vimal dinakaran
I have implemented the above approach with cassandra db.

Thank you all.

On Thu, Mar 31, 2016 at 8:26 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Long story short, no.  Don't rely on checkpoints if you cant handle
> reprocessing some of your data.
>
> On Thu, Mar 31, 2016 at 3:02 AM, Imre Nagi <imre.nagi2...@gmail.com>
> wrote:
> > I'm dont know how to read the data from the checkpoint. But AFAIK and
> based
> > on my experience, I think the best thing that you can do is storing the
> > offset to a particular storage such as database everytime you consume the
> > message. Then read the offset from the database everytime you want to
> start
> > reading the message.
> >
> > nb: This approach is also explained by Cody in his blog post.
> >
> > Thanks
> >
> > On Thu, Mar 31, 2016 at 2:14 PM, vimal dinakaran <vimal3...@gmail.com>
> > wrote:
> >>
> >> Hi,
> >>  In the blog
> >> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
> >>
> >> It is mentioned that enabling checkpoint works as long as the app jar is
> >> unchanged.
> >>
> >> If I want to upgrade the jar with the latest code and consume from kafka
> >> where it was stopped , how to do that ?
> >> Is there a way to read the binary object of the checkpoint during init
> and
> >> use that to start from offset ?
> >>
> >> Thanks
> >> Vimal
> >
> >
>


Spark master shuts down when one of zookeeper dies

2016-06-28 Thread vimal dinakaran
I am using zookeeper for providing HA for spark cluster.  We have two nodes
zookeeper cluster.

When one of the zookeeper dies then the entire spark cluster goes down .

Is this expected behaviour ?
Am I missing something in config ?

Spark version - 1.6.1.
Zookeeper version - 3.4.6
// spark-env.sh
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181"

Below is the log from spark master:
ZooKeeperLeaderElectionAgent: We have lost leadership
16/06/27 09:39:30 ERROR Master: Leadership has been revoked -- master
shutting down.

Thanks
Vimal


spark streaming application - deployment best practices

2016-06-15 Thread vimal dinakaran
Hi All,
I am using spark-submit cluster mode deployment for my application to run
it in production.

But this places a requirement of having the jars in the same path in all
the nodes and also the config file which is passed as argument in the same
path. I am running spark in standalone mode and I don't have hadoop or S3
environment. This mode of deployment is inconvenient. I could do spark
submit from one node in client mode but it doesn't provide high availablity
.

What is the best way to deploy spark streaming applications in production ?


Thanks

Vimal


Restart App and consume from checkpoint using direct kafka API

2016-03-31 Thread vimal dinakaran
Hi,
 In the blog
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

It is mentioned that enabling checkpoint works as long as the app jar is
unchanged.

If I want to upgrade the jar with the latest code and consume from kafka
where it was stopped , how to do that ?
Is there a way to read the binary object of the checkpoint during init and
use that to start from offset ?

Thanks
Vimal


Re: spark streaming web ui not showing the events - direct kafka api

2016-02-03 Thread vimal dinakaran
No I am using DSE 4.8 which has spark 1.4. Is this a known issue ?

On Wed, Jan 27, 2016 at 11:52 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Have you tried spark 1.5?
>
> On Wed, Jan 27, 2016 at 11:14 AM, vimal dinakaran <vimal3...@gmail.com>
> wrote:
>
>> Hi ,
>>  I am using spark 1.4 with direct kafka api . In my streaming ui , I am
>> able to see the events listed in UI only if add stream.print() statements
>> or else event rate and input events remains in 0 eventhough the events gets
>> processed.
>>
>> Without print statements , I have the action saveToCassandra in the
>> dstream.
>>
>> Any reasons why is this not working ?
>>
>> Thanks
>> Vimal
>>
>
>


spark streaming web ui not showing the events - direct kafka api

2016-01-27 Thread vimal dinakaran
Hi ,
 I am using spark 1.4 with direct kafka api . In my streaming ui , I am
able to see the events listed in UI only if add stream.print() statements
or else event rate and input events remains in 0 eventhough the events gets
processed.

Without print statements , I have the action saveToCassandra in the
dstream.

Any reasons why is this not working ?

Thanks
Vimal


Re: Cluster mode dependent jars not working

2015-12-17 Thread vimal dinakaran
--driver-classpath needs to be added with jars needed. But this is not
being mentioned in the spark documentation.

On Tue, Dec 15, 2015 at 9:13 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Please use --conf spark.executor.extraClassPath=XXX to specify dependent
> jars.
>
> On Tue, Dec 15, 2015 at 3:57 AM, vimal dinakaran <vimal3...@gmail.com>
> wrote:
>
>> I am running spark using cluster mode for deployment . Below is the
>> command
>>
>>
>> JARS=$JARS_HOME/amqp-client-3.5.3.jar,$JARS_HOME/nscala-time_2.10-2.0.0.jar,\
>> $JARS_HOME/kafka_2.10-0.8.2.1.jar,$JARS_HOME/kafka-clients-0.8.2.1.jar,\
>> $JARS_HOME/spark-streaming-kafka_2.10-1.4.1.jar,\
>> $JARS_HOME/zkclient-0.3.jar,$JARS_HOME/protobuf-java-2.4.0a.jar
>>
>> dse spark-submit -v --conf
>> "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
>>  --executor-memory 512M \
>>  --total-executor-cores 3 \
>>  --deploy-mode "cluster" \
>>  --master spark://$MASTER:7077 \
>>  --jars=$JARS \
>>  --supervise \
>>  --class "com.testclass" $APP_JAR  input.json \
>>  --files "/home/test/input.json"
>>
>> The above command is working fine in client mode. But when I use it in
>> cluster mode I get class not found exception
>>
>> Exception in thread "main" java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>> at
>> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>> Caused by: java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>>
>> In client mode the dependent jars are getting copied to the
>> /var/lib/spark/work directory whereas in cluster mode it is not.
>> I am using nfs and I have mounted the same directory on all the spark
>> nodes under same name. Still I get the error.
>>
>> From the verbose logs of dse spark-submit, I see the classpath elements
>> are missing here .
>>
>> --
>> spark.hadoop.cassandra.input.native.ssl.trust.store.password -> cassandra
>> spark.cassandra.connection.ssl.trustStore.password -> cassandra
>> spark.ssl.keyStorePassword -> cassandra
>> spark.cassandra.auth.username -> cassandra
>> spark.hadoop.fs.har.impl -> org.apache.hadoop.fs.HarFileSystem
>> Classpath elements:
>>
>>
>> WARN  2015-12-15 17:08:48 org.apache.spark.util.Utils: Your hostname,
>> demeter-dev-node2 resolves to a loopback address: 127.0.1.1; using
>> 10.29.23.170 instead (on interface eth0)
>> WARN  2015-12-15 17:08:48 org.apache.spark.util.Utils: Set SPARK_LOCAL_IP
>> if you need to bind to another addres
>>
>> How it is able to pick the application jar which is also under same
>> directory but not the dependent jars ?
>> Please help me in getting this solved.
>>
>>
>


Cluster mode dependent jars not working

2015-12-15 Thread vimal dinakaran
I am running spark using cluster mode for deployment . Below is the command

JARS=$JARS_HOME/amqp-client-3.5.3.jar,$JARS_HOME/nscala-time_2.10-2.0.0.jar,\
$JARS_HOME/kafka_2.10-0.8.2.1.jar,$JARS_HOME/kafka-clients-0.8.2.1.jar,\
$JARS_HOME/spark-streaming-kafka_2.10-1.4.1.jar,\
$JARS_HOME/zkclient-0.3.jar,$JARS_HOME/protobuf-java-2.4.0a.jar

dse spark-submit -v --conf
"spark.serializer=org.apache.spark.serializer.KryoSerializer" \
 --executor-memory 512M \
 --total-executor-cores 3 \
 --deploy-mode "cluster" \
 --master spark://$MASTER:7077 \
 --jars=$JARS \
 --supervise \
 --class "com.testclass" $APP_JAR  input.json \
 --files "/home/test/input.json"

The above command is working fine in client mode. But when I use it in
cluster mode I get class not found exception

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.NoClassDefFoundError:
org/apache/spark/streaming/kafka/KafkaUtils$

In client mode the dependent jars are getting copied to the
/var/lib/spark/work directory whereas in cluster mode it is not.
I am using nfs and I have mounted the same directory on all the spark nodes
under same name. Still I get the error.

>From the verbose logs of dse spark-submit, I see the classpath elements are
missing here .
--
spark.hadoop.cassandra.input.native.ssl.trust.store.password -> cassandra
spark.cassandra.connection.ssl.trustStore.password -> cassandra
spark.ssl.keyStorePassword -> cassandra
spark.cassandra.auth.username -> cassandra
spark.hadoop.fs.har.impl -> org.apache.hadoop.fs.HarFileSystem
Classpath elements:


WARN  2015-12-15 17:08:48 org.apache.spark.util.Utils: Your hostname,
demeter-dev-node2 resolves to a loopback address: 127.0.1.1; using
10.29.23.170 instead (on interface eth0)
WARN  2015-12-15 17:08:48 org.apache.spark.util.Utils: Set SPARK_LOCAL_IP
if you need to bind to another addres

How it is able to pick the application jar which is also under same
directory but not the dependent jars ?
Please help me in getting this solved.


Re: Checkpoint not working after driver restart

2015-11-07 Thread vimal dinakaran
I am pasting the code here . Please let me know if there is any sequence
that is wrong.


def createContext(checkpointDirectory: String, config: Config):
StreamingContext = {
println("Creating new context")

val conf = new
SparkConf(true).setAppName(appName).set("spark.streaming.unpersist","true")

val ssc = new StreamingContext(conf,
Seconds(config.getInt(batchIntervalParam)))
ssc.checkpoint(checkpointDirectory)
val isValid = validate(ssc, config)

if (isValid) {
  val result = runJob(ssc, config)
  println("result is " + result)
} else {
  println(isValid.toString)
}

ssc
 }

  def main(args: Array[String]): Unit = {

if (args.length < 1) {
  println("Must specify the path to config file ")
  println("Usage progname  ")
  return
}
val url = args(0)
logger.info("Starting " + appName)
println("Got the path as %s".format(url))
val source = scala.io.Source.fromFile(url)
val lines = try source.mkString finally source.close()
val config = ConfigFactory.parseString(lines)
val directoryPath = config.getString(checkPointParam)

val ssc = StreamingContext.getOrCreate(directoryPath, () => {
  createContext(directoryPath,config)
})

ssc.start()
ssc.awaitTermination()
  }


  def getRabbitMQStream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
val rabbitMQHost = config.getString(rabbitmqHostParam)
val rabbitMQPort = config.getInt(rabbitmqPortParam)
val rabbitMQQueue = config.getString(rabbitmqQueueNameParam)
println("changing the memory lvel")
val receiverStream: ReceiverInputDStream[String] = {
  RabbitMQUtils.createStreamFromAQueue(ssc, rabbitMQHost,
rabbitMQPort, rabbitMQQueue,StorageLevel.MEMORY_AND_DISK_SER)
}
receiverStream.start()
receiverStream
  }

  def getBaseDstream(config: Config, ssc: StreamingContext):
ReceiverInputDStream[String] = {
val baseDstream = config.getString(receiverTypeParam) match {
  case "rabbitmq" => getRabbitMQStream(config, ssc)
}
baseDstream
  }

  def runJob(ssc: StreamingContext, config: Config): Any = {

val keyspace = config.getString(keyspaceParam)
val clientStatsTable = config.getString(clientStatsTableParam)
val hourlyStatsTable = config.getString(hourlyStatsTableParam)
val batchInterval = config.getInt(batchIntervalParam)
val windowInterval = config.getInt(windowIntervalParam)
val hourlyInterval = config.getInt(hourlyParam)
val limit = config.getInt(limitParam)

val lines = getBaseDstream(config, ssc)
val statsRDD =
lines.filter(_.contains("client_stats")).map(_.split(",")(1))

val parserFunc = getProtobufParserFunction()
val clientUsageRDD: DStream[((String, String), Double)] =
statsRDD.flatMap(x => parserFunc(x))
val formatterFunc = getJsonFormatterFunc()
val oneMinuteWindowResult = clientUsageRDD.reduceByKeyAndWindow((x:
Double, y: Double) => x + y, Seconds(windowInterval),
Seconds(batchInterval))
  .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2
  .reduceByKey((x, y) => (x ++ y))
  .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

println("Client Usage from rabbitmq ")
oneMinuteWindowResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, clientStatsTable)
oneMinuteWindowResult.print()

val HourlyResult = clientUsageRDD.reduceByKeyAndWindow((x: Double,
y: Double) => x + y, Seconds(hourlyInterval), Seconds(batchInterval))
  .map(x => ((x._1._2), ArrayBuffer((x._1._1, x._2
  .reduceByKey((x, y) => (x ++ y))
  .mapValues(x => (x.toList.sortBy(x => -x._2).take(limit)))

HourlyResult.map(x => (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable)
HourlyResult.map(x => (x, "hourly")).print()

  }
}


On Wed, Nov 4, 2015 at 12:27 PM, vimal dinakaran <vimal3...@gmail.com>
wrote:

> I have a simple spark streaming application which reads the data from the
> rabbitMQ
>  and does some aggregation on window interval of  1 min and 1 hour for
> batch interval of 30s.
>
>  I have a three node setup. And to enable checkpoint,
>  I have mounted the same directory using sshfs to all worker node for
> creating checkpoint.
>
>  When I run the spark streaming App for the first time it works fine .
>  I could see the results being printed 

Checkpoint not working after driver restart

2015-11-03 Thread vimal dinakaran
I have a simple spark streaming application which reads the data from the
rabbitMQ
 and does some aggregation on window interval of  1 min and 1 hour for
batch interval of 30s.

 I have a three node setup. And to enable checkpoint,
 I have mounted the same directory using sshfs to all worker node for
creating checkpoint.

 When I run the spark streaming App for the first time it works fine .
 I could see the results being printed on console and some checkpoints
happening in the network directory.

 But when I run the job for the second time , it fails with the following
exception

at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


And the exception is repeated.


I am not pumping huge data to the rabbitMQ. When I run the job for the
first time I am dumping only < 100 events .
And when I run for the second time,  I have stopped the messages being sent
to RabbitMQ from the producer process.

I have tried setting "spark.streaming.unpersist","true" .
And My Set up has 3 node each having one core allocated for spark and
executor memory per node is 512MB.

Please help me in solving this issue.