Unsubscribe

2023-12-29 Thread Vinti Maheshwari



Re: Spark and KafkaUtils

2016-03-15 Thread Vinti Maheshwari
Hi Cody,

I wanted to update my build.sbt which was working with kafka without giving
any error, it may help other user if they face similar issue.

name := "NetworkStreaming"

version := "1.0"

scalaVersion:= "2.10.5"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0", // kafka
  "org.apache.spark" %% "spark-mllib" % "1.6.0",
  "org.codehaus.groovy" % "groovy-all" % "1.8.6",
  "org.apache.hbase" % "hbase-server" % "1.1.2",
  "org.apache.spark" %% "spark-sql"  % "1.6.0",
  "org.apache.hbase" % "hbase-common" % "1.1.2"
excludeAll(ExclusionRule(organization = "javax.servlet",
name="javax.servlet-api"), ExclusionRule(organization =
"org.mortbay.jetty", name="jetty"), ExclusionRule(organization =
"org.mortbay.jetty", name="servlet-api-2.5")),
  "org.apache.hbase" % "hbase-client" % "1.1.2"
excludeAll(ExclusionRule(organization = "javax.servlet",
name="javax.servlet-api"), ExclusionRule(organization =
"org.mortbay.jetty", name="jetty"), ExclusionRule(organization =
"org.mortbay.jetty", name="servlet-api-2.5"))
)


assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")  =>
MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
MergeStrategy.discard
  case "log4j.properties"  =>
MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") =>
MergeStrategy.filterDistinctLines
  case "reference.conf"=>
MergeStrategy.concat
  case _   =>
MergeStrategy.first
}

Thanks & Regards,

Vinti



On Wed, Feb 24, 2016 at 1:34 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Looks like conflicting versions of the same dependency.
> If you look at the mergeStrategy section of the build file I posted, you
> can add additional lines for whatever dependencies are causing issues, e.g.
>
>   case PathList("org", "jboss", "netty", _*) => MergeStrategy.first
>
> On Wed, Feb 24, 2016 at 2:55 PM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy
>> bug related content.
>>
>> It's giving lots of errors related to ivy:
>>
>> *[error]
>> /Users/vintim/.ivy2/cache/javax.activation/activation/jars/activation-1.1.jar:javax/activation/ActivationDataFlavor.class*
>>
>> Here is complete error log:
>> https://gist.github.com/Vibhuti/07c24d2893fa6e520d4c
>>
>>
>> Regards,
>> ~Vinti
>>
>> On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Ok, that build file I linked earlier has a minimal example of use.  just
>>> running 'sbt assembly' given a similar build file should build a jar with
>>> all the dependencies.
>>>
>>> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>>> wrote:
>>>
>>>> I am not using sbt assembly currently. I need to check how to use sbt
>>>> assembly.
>>>>
>>>> Regards,
>>>> ~Vinti
>>>>
>>>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Are you using sbt assembly?  That's what will include all of the
>>>>> non-provided dependencies in a single jar along with your code.  Otherwise
>>>>> you'd have to specify each separate jar in your spark-submit line, which 
>>>>> is
>>>>> a pain.
>>>>>
>>>>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <
>>>>> vinti.u...@gmail.com> wrote:
>>>>>
>>>>>> Hi Cody,
>>>>>>
>>>>>> I tried with the build file you provided, but it's not working for
>>>>>> me, getting same error:
>>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>>>
>>>>>> I am not getting this error while building  (sbt package). I am
>>>>>> getting this error when i am running my spark-streaming program.
>>>>>> Do i need 

Re: [MARKETING] Spark Streaming stateful transformation mapWithState function getting error scala.MatchError: [Ljava.lang.Object]

2016-03-14 Thread Vinti Maheshwari
Hi Iain,


Thanks for your reply. Actually i changed my trackStateFunc, it's working
now.

For reference my working code with mapWithState:


def trackStateFunc(batchTime: Time, key: String, value:
Option[Array[Long]], state: State[Array[Long]])
  : Option[(String, Array[Long])] = {
  // Check if state exists
  if (state.exists) {
val newState:Array[Long] = Array(state.get, value.get).transpose.map(_.sum)
state.update(newState)// Set the new state
Some((key, newState))
  } else {
val initialState = value.get
state.update(initialState) // Set the initial state
Some((key, initialState))
  }
}

// StateSpec[KeyType, ValueType, StateType, MappedType]
val stateSpec: StateSpec[String, Array[Long], Array[Long], (String,
Array[Long])] = StateSpec.function(trackStateFunc _)

val state: MapWithStateDStream[String, Array[Long], Array[Long],
(String, Array[Long])] = parsedStream.mapWithState(stateSpec)


Thanks & Regards,

Vinti


On Mon, Mar 14, 2016 at 7:06 AM, Iain Cundy <iain.cu...@amdocs.com> wrote:

> Hi Vinti
>
>
>
> I don’t program in scala, but I think you’ve changed the meaning of the
> current variable – look again at what it state and what is new data.
>
>
>
> Assuming it works like the Java API, to use this function to maintain
> State you must call State.update, while you can return anything, not just
> the state.
>
>
>
> Cheers
>
> Iain
>
>
>
> *From:* Vinti Maheshwari [mailto:vinti.u...@gmail.com]
> *Sent:* 12 March 2016 22:10
> *To:* user
> *Subject:* [MARKETING] Spark Streaming stateful transformation
> mapWithState function getting error scala.MatchError: [Ljava.lang.Object]
>
>
>
> Hi All,
>
> I wanted to replace my updateStateByKey function with mapWithState
> function (Spark 1.6) to improve performance of my program.
>
> I was following these two documents:
> https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html
>
>
> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html
>
> but i am getting error *scala.MatchError: [Ljava.lang.Object]*
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 71.0 failed 4 times, most recent failure: Lost task 0.3 in stage 71.0 
> (TID 88, ttsv-lab-vmdb-01.englab.juniper.net): scala.MatchError: 
> [Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;)
>
> at 
> HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
>
> at 
> HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
>
> at scala.Option.flatMap(Option.scala:170)
>
> at 
> HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84)
>
> Reference code:
>
> def trackStateFunc(key:String, value:Option[Array[Long]], 
> current:State[Array[Long]]) = {
>
>
>
> //either we can use this
>
> // current.update(value)
>
>
>
> value.map(_ :+ current).orElse(Some(current)).flatMap{
>
>   case x:Array[Long] => Try(x.map(BDV(_)).reduce(_ + 
> _).toArray).toOption
>
>   case None => ???
>
> }
>
>   }
>
>
>
>   val statespec:StateSpec[String, Array[Long], Array[Long], 
> Option[Array[Long]]] = StateSpec.function(trackStateFunc _)
>
>
>
>   val state: MapWithStateDStream[String, Array[Long], Array[Long], 
> Option[Array[Long]]] = parsedStream.mapWithState(statespec)
>
> My previous working code which was using updateStateByKey function:
>
> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>
> (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>
>  prev.map(_ +: current).orElse(Some(current))
>
>   .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
>
>   })
>
> Anyone has idea what can be the issue?
>
> Thanks & Regards,
>
> Vinti
> This message and the information contained herein is proprietary and
> confidential and subject to the Amdocs policy statement, you may review at
> http://www.amdocs.com/email_disclaimer.asp
>


Spark Streaming stateful transformation mapWithState function getting error scala.MatchError: [Ljava.lang.Object]

2016-03-12 Thread Vinti Maheshwari
Hi All,

I wanted to replace my updateStateByKey function with mapWithState function
(Spark 1.6) to improve performance of my program.

I was following these two documents:
https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html

but i am getting error *scala.MatchError: [Ljava.lang.Object]*

org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 71.0 failed 4 times, most recent failure: Lost task
0.3 in stage 71.0 (TID 88, ttsv-lab-vmdb-01.englab.juniper.net):
scala.MatchError: [Ljava.lang.Object;@eaf8bc8 (of class
[Ljava.lang.Object;)
at 
HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
at 
HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
at scala.Option.flatMap(Option.scala:170)
at 
HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84)

Reference code:

def trackStateFunc(key:String, value:Option[Array[Long]],
current:State[Array[Long]]) = {

//either we can use this
// current.update(value)

value.map(_ :+ current).orElse(Some(current)).flatMap{
  case x:Array[Long] => Try(x.map(BDV(_)).reduce(_ +
_).toArray).toOption
  case None => ???
}
  }

  val statespec:StateSpec[String, Array[Long], Array[Long],
Option[Array[Long]]] = StateSpec.function(trackStateFunc _)

  val state: MapWithStateDStream[String, Array[Long], Array[Long],
Option[Array[Long]]] = parsedStream.mapWithState(statespec)

My previous working code which was using updateStateByKey function:

val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
 prev.map(_ +: current).orElse(Some(current))
  .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
  })

Anyone has idea what can be the issue?

Thanks & Regards,
Vinti


Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Vinti Maheshwari
I have 2 machines in my cluster with the below specifications:
128 GB RAM and 8 cores machine

Regards,
~Vinti

On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Thanks Supreeth and Shahbaz. I will try adding
> spark.streaming.kafka.maxRatePerPartition.
>
> Hi Shahbaz,
>
> Please see comments, inline:
>
>
>- Which version of Spark you are using. ==> *1.5.2*
>- How big is the Kafka Cluster ==> *2 brokers*
>- What is the Message Size and type.==>
> *String, 9,550 bytes (around) *
>- How big is the spark cluster (How many executors ,How many cores Per
>Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>- What does your Spark Job looks like ==>
>
>
>val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
>  ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>
>
>  val parsedStream = inputStream
>.map(line => {
>  val splitLines = line.split(",")
>  (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
>})
>
>  val state: DStream[(String, Array[Long])] = 
> parsedStream.updateStateByKey(
>(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>  prev.map(_ +: current).orElse(Some(current))
>.flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
> _).toArray).toOption)
>})
>  state.checkpoint(Duration(25000))
>  state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>  ssc
>}
>
>
> spark.streaming.backpressure.enabled set it to true and try?
>  ==>
>
>
> *yes, i had enabled it.*
> Regards,
> ~Vinti
>
> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz <shahzadh...@gmail.com> wrote:
>
>> Hello,
>>
>>- Which version of Spark you are using.
>>- How big is the Kafka Cluster
>>- What is the Message Size and type.
>>- How big is the spark cluster (How many executors ,How many cores
>>Per Executor)
>>- What does your Spark Job looks like .
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>
>>
>> Regards,
>> Shahbaz
>> +91-9986850670
>>
>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth <supreeth@gmail.com> wrote:
>>
>>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>>> control the number of messages read from Kafka per partition on the spark
>>> streaming consumer.
>>>
>>> -S
>>>
>>>
>>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>>> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to figure out why my kafka+spark job is running slow. I
>>> found that spark is consuming all the messages out of kafka into a single
>>> batch itself and not sending any messages to the other batches.
>>>
>>> 2016/03/05 21:57:05
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243825000>
>>> 0 events - - queued 2016/03/05 21:57:00
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=145724382>
>>> 0 events - - queued 2016/03/05 21:56:55
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243815000>
>>> 0 events - - queued 2016/03/05 21:56:50
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=145724381>
>>> 0 events - - queued 2016/03/05 21:56:45
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243805000>
>>> 0 events - - queued 2016/03/05 21:56:40
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=145724380>
>>> 4039573 events 6 ms - processing
>>>
>>> Does anyone know how this behavior can be changed so that the number of
>>> messages are load balanced across all the batches?
>>>
>>> Thanks,
>>> Vinti
>>>
>>>
>>
>


Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Vinti Maheshwari
Thanks Supreeth and Shahbaz. I will try adding
spark.streaming.kafka.maxRatePerPartition.

Hi Shahbaz,

Please see comments, inline:


   - Which version of Spark you are using. ==> *1.5.2*
   - How big is the Kafka Cluster ==> *2 brokers*
   - What is the Message Size and type.==>
*String, 9,550 bytes (around) *
   - How big is the spark cluster (How many executors ,How many cores Per
   Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
   - What does your Spark Job looks like ==>


   val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
 ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)


 val parsedStream = inputStream
   .map(line => {
 val splitLines = line.split(",")
 (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
   })

 val state: DStream[(String, Array[Long])] =
parsedStream.updateStateByKey(
   (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
 prev.map(_ +: current).orElse(Some(current))
   .flatMap(as => Try(as.map(BDV(_)).reduce(_ +
_).toArray).toOption)
   })
 state.checkpoint(Duration(25000))
 state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
 ssc
   }


spark.streaming.backpressure.enabled set it to true and try?
 ==>


*yes, i had enabled it.*
Regards,
~Vinti

On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz <shahzadh...@gmail.com> wrote:

> Hello,
>
>- Which version of Spark you are using.
>- How big is the Kafka Cluster
>- What is the Message Size and type.
>- How big is the spark cluster (How many executors ,How many cores Per
>Executor)
>- What does your Spark Job looks like .
>
> spark.streaming.backpressure.enabled set it to true and try?
>
>
> Regards,
> Shahbaz
> +91-9986850670
>
> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth <supreeth@gmail.com> wrote:
>
>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>> control the number of messages read from Kafka per partition on the spark
>> streaming consumer.
>>
>> -S
>>
>>
>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>> Hello,
>>
>> I am trying to figure out why my kafka+spark job is running slow. I found
>> that spark is consuming all the messages out of kafka into a single batch
>> itself and not sending any messages to the other batches.
>>
>> 2016/03/05 21:57:05
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243825000>
>> 0 events - - queued 2016/03/05 21:57:00
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=145724382>
>> 0 events - - queued 2016/03/05 21:56:55
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243815000>
>> 0 events - - queued 2016/03/05 21:56:50
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=145724381>
>> 0 events - - queued 2016/03/05 21:56:45
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243805000>
>> 0 events - - queued 2016/03/05 21:56:40
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=145724380>
>> 4039573 events 6 ms - processing
>>
>> Does anyone know how this behavior can be changed so that the number of
>> messages are load balanced across all the batches?
>>
>> Thanks,
>> Vinti
>>
>>
>


Spark + Kafka all messages being used in 1 batch

2016-03-05 Thread Vinti Maheshwari
Hello,

I am trying to figure out why my kafka+spark job is running slow. I found
that spark is consuming all the messages out of kafka into a single batch
itself and not sending any messages to the other batches.

2016/03/05 21:57:05

0 events - - queued 2016/03/05 21:57:00

0 events - - queued 2016/03/05 21:56:55

0 events - - queued 2016/03/05 21:56:50

0 events - - queued 2016/03/05 21:56:45

0 events - - queued 2016/03/05 21:56:40

4039573 events 6 ms - processing

Does anyone know how this behavior can be changed so that the number of
messages are load balanced across all the batches?

Thanks,
Vinti


Re: spark streaming

2016-03-02 Thread Vinti Maheshwari
Thanks Shixiong. Sure. Please find the details:

Spark-version: 1.5.2
I am doing data aggregation using check pointing, not sure if this is
causing issue.
Also, i am using perl_kafka producer to push data to kafka and then my
spark program is reading it from kafka. Not sure, if i need to use
createStream function instead of createDirectStream?

My program:

def main(args: Array[String]): Unit = {

val checkpointDirectory = "hdfs://:8020/user/spark/"  + args(2)

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val conf = new SparkConf().setAppName("HBaseStream")
  val sc = new SparkContext(conf)
  // create a StreamingContext, the main entry point for all
streaming functionality
  val ssc = new StreamingContext(sc, Seconds(1))

  val brokers = args(0)
  val topics= args(1)
  val topicsSet = topics.split(",").toSet
  val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokers, "auto.offset.reset" -> "smallest")

  val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

  // parse the lines of data into coverage objects
  val inputStream = messages.map(_._2)
  ssc.checkpoint(checkpointDirectory)
  inputStream.print(1)
  val parsedStream = inputStream
.map(line => {
  val splitLines = line.split(",")
  (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
})
  import breeze.linalg.{DenseVector => BDV}
  import scala.util.Try

  val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
  prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})

  state.checkpoint(Duration(1))
  state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
  ssc
}
val context = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)
context.start()
context.awaitTermination()

  }
}

Thanks & Regards,

Vinti


Thanks & Regards,

Vinti




On Wed, Mar 2, 2016 at 10:28 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Hey,
>
> KafkaUtils.createDirectStream doesn't need a StorageLevel as it doesn't
> store blocks to BlockManager. However, the error is not related
> to StorageLevel. It may be a bug. Could you provide more info about it?
> E.g., Spark version, your codes, logs.
>
> On Wed, Mar 2, 2016 at 3:02 AM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
>> program as currently i am getting
>> MetadataFetchFailedException*. *I am not sure where i should pass
>> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream
>> doesn't allow to pass that parameter.
>>
>>
>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
>> StringDecoder](
>>   ssc, kafkaParams, topicsSet)
>>
>>
>> Full Error:
>>
>> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
>> location for shuffle 0*
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> at
>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
>> at
>> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
>> at
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.comput

Re: Spark streaming: StorageLevel.MEMORY_AND_DISK_SER setting for KafkaUtils.createDirectStream

2016-03-02 Thread Vinti Maheshwari
Increasing Spark_executors_instances to 4 worked.
SPARK_EXECUTOR_INSTANCES="4" #Number of workers to start (Default: 2)

Regards,
Vinti




On Wed, Mar 2, 2016 at 4:28 AM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Thanks much Saisai. Got it.
> So i think increasing worker executor memory might work. Trying that.
>
> Regards,
> ~Vinti
>
> On Wed, Mar 2, 2016 at 4:21 AM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> You don't have to specify the storage level for direct Kafka API, since
>> it doesn't require to store the input data ahead of time. Only
>> receiver-based approach could specify the storage level.
>>
>> Thanks
>> Saisai
>>
>> On Wed, Mar 2, 2016 at 7:08 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my
>>> spark-streaming program as currently i am getting
>>> MetadataFetchFailedException*. *I am not sure where i should pass
>>> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream
>>> doesn't allow to pass that parameter.
>>>
>>>
>>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
>>> StringDecoder](
>>>   ssc, kafkaParams, topicsSet)
>>>
>>>
>>> Full Error:
>>>
>>> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
>>> output location for shuffle 0*
>>> at
>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
>>> at
>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
>>> at
>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>> at
>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>> at
>>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
>>> at
>>> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
>>> at
>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
>>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> )
>>>
>>> Thanks,
>>> ~Vinti
>>>
>>>
>>
>


Re: Spark streaming: StorageLevel.MEMORY_AND_DISK_SER setting for KafkaUtils.createDirectStream

2016-03-02 Thread Vinti Maheshwari
Thanks much Saisai. Got it.
So i think increasing worker executor memory might work. Trying that.

Regards,
~Vinti

On Wed, Mar 2, 2016 at 4:21 AM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> You don't have to specify the storage level for direct Kafka API, since it
> doesn't require to store the input data ahead of time. Only receiver-based
> approach could specify the storage level.
>
> Thanks
> Saisai
>
> On Wed, Mar 2, 2016 at 7:08 PM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
>> program as currently i am getting
>> MetadataFetchFailedException*. *I am not sure where i should pass
>> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream
>> doesn't allow to pass that parameter.
>>
>>
>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
>> StringDecoder](
>>   ssc, kafkaParams, topicsSet)
>>
>>
>> Full Error:
>>
>> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
>> location for shuffle 0*
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> at
>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
>> at
>> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
>> at
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> )
>>
>> Thanks,
>> ~Vinti
>>
>>
>


Spark streaming: StorageLevel.MEMORY_AND_DISK_SER setting for KafkaUtils.createDirectStream

2016-03-02 Thread Vinti Maheshwari
Hi All,

I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
program as currently i am getting
MetadataFetchFailedException*. *I am not sure where i should pass
StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
allow to pass that parameter.


val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)


Full Error:

*org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0*
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

)

Thanks,
~Vinti


spark streaming

2016-03-02 Thread Vinti Maheshwari
Hi All,

I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
program as currently i am getting
MetadataFetchFailedException*. *I am not sure where i should pass
StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
allow to pass that parameter.


val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)


Full Error:

*org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0*
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

)

Thanks,
~Vinti


Re: perl Kafka::Producer, “Kafka::Exception::Producer”, “code”, -1000, “message”, "Invalid argument

2016-02-29 Thread Vinti Maheshwari
Hi Cody,

Sorry, i realized afterwards, i should not ask here. My actual program is
spark-streaming and i used kafka for input streaming.

Thanks,
Vinti

On Mon, Feb 29, 2016 at 1:46 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Does this issue involve Spark at all?  Otherwise you may have better luck
> on a perl or kafka related list.
>
> On Mon, Feb 29, 2016 at 3:26 PM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I wrote kafka producer using kafka perl api, But i am getting error when
>> i am passing variable for sending message while if i am hard coding the
>> message data it's not giving any error.
>>
>> Perl program, where i added kafka producer code:
>>
>> try {
>> $kafka_connection = Kafka::Connection->new( host => 
>> $hadoop_server, port => '6667' );
>> $producer = Kafka::Producer->new( Connection => 
>> $kafka_connection );
>> my $topic = 'test1';
>> my $partition = 0;
>> my $message = $hadoop_str;
>> my $response = $producer->send(
>> $topic, # topic
>> $partition,  # partition
>> 
>> #"56b4b2b23c24c3608376d1ea,/obj/i386/ui/lib/access/daemon_map.So.gcda,1,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
>>  # message
>> $hadoop_str
>> #"t1,f9,1,1,1"
>> );
>> } catch {
>> my $error = $_;
>> if ( blessed( $error ) && $error->isa( 
>> 'Kafka::Exception' ) ) {
>> warn 'Error: (', $error->code, ') ',  
>> $error->message, "\n";
>> exit;
>> } else {
>> die $error;
>> }
>> };#CCLib::run_system_cmd( $cmd );
>> }
>>
>> Error Log: -bash-3.2$ ./stream_binary_hadoop.pl print (...) interpreted
>> as function at ./stream_binary_hadoop.pl line 429. Invalid argument:
>> message =
>> 56b4b2b23c24c3608376d1ea,/obj/i386/junos/usr.sbin/lmpd/lmpd_repl_msg_idr.gcda,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
>> at
>> /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Exception/Class/Base.pm
>> line 85. Exception::Class::Base::throw("Kafka::Exception::Producer",
>> "code", -1000, "message", "Invalid argument: message =
>> 56b4b2b23c24c3608376d1ea,/obj/i38"...) called at
>> /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Kafka/Producer.pm
>> line 374 Kafka::Producer::_error(Kafka::Producer=HASH(0x36955f8), -1000,
>> "message = 56b4b2b23c24c3608376d1ea,/obj/i386/junos/usr.sbin/l"...) called
>> at
>> /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Kafka/Producer.pm
>> line 331 Kafka::Producer::send(Kafka::Producer=HASH(0x36955f8), "test1", 0,
>> "56b4b2b23c24c3608376d1ea,/obj/i386/junos/usr.sbin/lmpd/lmpd_r"...) called
>> at ./stream_binary_hadoop.pl line 175 main::try {...} () called at
>> /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Try/Tiny.pm
>> line 81 eval {...} called at
>> /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Try/Tiny.pm
>> line 72 Try::Tiny::try(CODE(0x3692888), Try::Tiny::Catch=REF(0x3692c78))
>> called at ./stream_binary_hadoop.pl line 190
>> main::stream(HASH(0x3692708)) called at ./stream_binary_hadoop.pl line
>> 354 main::file_split(HASH(0x36927b0)) called at ./stream_binary_hadoop.pl
>> line 413
>>
>> at ./stream_binary_hadoop.pl line 188. main::catch {...} (" Invalid
>> argument: message = 56b4b2b23c24c3608376d1e"...) called at
>> /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Try/Tiny.pm
>> line 104 Try::Tiny::try(CODE(0x3692888), Try::Tiny::Catch=REF(0x3692c78))
>> called at ./stream_binary_hadoop.pl line 190
>> main::stream(HASH(0x3692708)) called at ./stream_binary_hadoop.pl line
>> 354 main::file_split(HASH(0x36927b0)) called at ./stream_binary_hadoop.pl
>> line 413
>>
>>
>>
>> Thank & Regards,
>>
>> ~Vinti
>>
>
>


Re: Spark streaming not remembering previous state

2016-02-27 Thread Vinti Maheshwari
Thanks much Amit, Sebastian. It worked.

Regards,
~Vinti

On Sat, Feb 27, 2016 at 12:44 PM, Amit Assudani <aassud...@impetus.com>
wrote:

> Your context is not being created using checkpoints, use get or create,
>
> From: Vinti Maheshwari <vinti.u...@gmail.com>
> Date: Saturday, February 27, 2016 at 3:28 PM
> To: user <user@spark.apache.org>
> Subject: Spark streaming not remembering previous state
>
> Hi All,
>
> I wrote spark streaming program with stateful transformation.
> It seems like my spark streaming application is doing computation
> correctly with check pointing.
> But i terminate my program and i start it again, it's not reading the
> previous checkpointing data and staring from the beginning. Is it the
> expected behaviour?
>
> Do i need to change anything in my program so that it will remember the
> previous data and start computation from there?
>
> Thanks in advance.
>
> For reference my program:
>
>
>   def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
> val ssc = new StreamingContext(sc, Seconds(5))
> val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", )
> 
> ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
> inputStream.print(1)
> val parsedStream = inputStream
>   .map(line => {
> val splitLines = line.split(",")
> (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
>   })
> import breeze.linalg.{DenseVector => BDV}
> import scala.util.Try
>
> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>   (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
> prev.map(_ +: current).orElse(Some(current))
>   .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
>   })
> state.checkpoint(Duration(1))
> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
> // Start the computation
> ssc.start()
> // Wait for the computation to terminate
> ssc.awaitTermination()
>
>   }
> }
>
>
> Regards,
>
> ~Vinti
>
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>


Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Error msg is:

*[error] deduplicate: different file contents found in the following:*
[error]
/Users/vintim/.ivy2/cache/org.jruby/jruby-complete/jars/jruby-complete-1.6.5.jar:org/joda/time/tz/data/Europe/Bucharest
[error]
/Users/vintim/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.6.jar:org/joda/time/tz/data/Europe/Bucharest

I tried to adding below block, given in stackoverflow, but still no luck.

http://stackoverflow.com/questions/20393283/deduplication-error-with-sbt-assembly-plugin?rq=1

excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
 cp filter {x => x.data.getName.matches("sbt.*") ||
x.data.getName.matches(".*macros.*")}}

Thanks,
~Vinti

On Wed, Feb 24, 2016 at 12:55 PM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy bug
> related content.
>
> It's giving lots of errors related to ivy:
>
> *[error]
> /Users/vintim/.ivy2/cache/javax.activation/activation/jars/activation-1.1.jar:javax/activation/ActivationDataFlavor.class*
>
> Here is complete error log:
> https://gist.github.com/Vibhuti/07c24d2893fa6e520d4c
>
>
> Regards,
> ~Vinti
>
> On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Ok, that build file I linked earlier has a minimal example of use.  just
>> running 'sbt assembly' given a similar build file should build a jar with
>> all the dependencies.
>>
>> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>>> I am not using sbt assembly currently. I need to check how to use sbt
>>> assembly.
>>>
>>> Regards,
>>> ~Vinti
>>>
>>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Are you using sbt assembly?  That's what will include all of the
>>>> non-provided dependencies in a single jar along with your code.  Otherwise
>>>> you'd have to specify each separate jar in your spark-submit line, which is
>>>> a pain.
>>>>
>>>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <
>>>> vinti.u...@gmail.com> wrote:
>>>>
>>>>> Hi Cody,
>>>>>
>>>>> I tried with the build file you provided, but it's not working for me,
>>>>> getting same error:
>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>>
>>>>> I am not getting this error while building  (sbt package). I am
>>>>> getting this error when i am running my spark-streaming program.
>>>>> Do i need to specify kafka jar path manually with spark-submit --jars
>>>>> flag?
>>>>>
>>>>> My build.sbt:
>>>>>
>>>>> name := "NetworkStreaming"
>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>>
>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>>
>>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>>>>
>>>>> libraryDependencies ++= Seq(
>>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>>>>> )
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>> ~Vinti
>>>>>
>>>>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> spark streaming is provided, kafka is not.
>>>>>>
>>>>>> This build file
>>>>>>
>>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>>>>>>
>>>>>> includes some hacks for ivy issues that may no longer be strictly
>>>>>> necessary, but try that build and see if it works for you.
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
>>>>>> vinti.u...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have tried multiple different settings in build.sbt but seems like
>>>>>>> nothing is working.
>>>>>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>>>>>
>>>>>>> Error
>>>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>>>>
>>>>>>> build.sbt
>>>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
>>>>>>> "1.0.0"
>>>>>>> libraryDependencies ++= Seq(
>>>>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>>>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>>>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" %
>>>>>>> "provided"
>>>>>>> )
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Vinti
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Thanks much Cody, I added assembly.sbt and modified build.sbt with ivy bug
related content.

It's giving lots of errors related to ivy:

*[error]
/Users/vintim/.ivy2/cache/javax.activation/activation/jars/activation-1.1.jar:javax/activation/ActivationDataFlavor.class*

Here is complete error log:
https://gist.github.com/Vibhuti/07c24d2893fa6e520d4c


Regards,
~Vinti

On Wed, Feb 24, 2016 at 12:16 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Ok, that build file I linked earlier has a minimal example of use.  just
> running 'sbt assembly' given a similar build file should build a jar with
> all the dependencies.
>
> On Wed, Feb 24, 2016 at 1:50 PM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> I am not using sbt assembly currently. I need to check how to use sbt
>> assembly.
>>
>> Regards,
>> ~Vinti
>>
>> On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Are you using sbt assembly?  That's what will include all of the
>>> non-provided dependencies in a single jar along with your code.  Otherwise
>>> you'd have to specify each separate jar in your spark-submit line, which is
>>> a pain.
>>>
>>> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <vinti.u...@gmail.com
>>> > wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> I tried with the build file you provided, but it's not working for me,
>>>> getting same error:
>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>
>>>> I am not getting this error while building  (sbt package). I am getting
>>>> this error when i am running my spark-streaming program.
>>>> Do i need to specify kafka jar path manually with spark-submit --jars
>>>> flag?
>>>>
>>>> My build.sbt:
>>>>
>>>> name := "NetworkStreaming"
>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>
>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>
>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>>>
>>>> libraryDependencies ++= Seq(
>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>>>> )
>>>>
>>>>
>>>>
>>>> Regards,
>>>> ~Vinti
>>>>
>>>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> spark streaming is provided, kafka is not.
>>>>>
>>>>> This build file
>>>>>
>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>>>>>
>>>>> includes some hacks for ivy issues that may no longer be strictly
>>>>> necessary, but try that build and see if it works for you.
>>>>>
>>>>>
>>>>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <
>>>>> vinti.u...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have tried multiple different settings in build.sbt but seems like
>>>>>> nothing is working.
>>>>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>>>>
>>>>>> Error
>>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>>>
>>>>>> build.sbt
>>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
>>>>>> "1.0.0"
>>>>>> libraryDependencies ++= Seq(
>>>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>>>>>> )
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Vinti
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
I am not using sbt assembly currently. I need to check how to use sbt
assembly.

Regards,
~Vinti

On Wed, Feb 24, 2016 at 11:10 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Are you using sbt assembly?  That's what will include all of the
> non-provided dependencies in a single jar along with your code.  Otherwise
> you'd have to specify each separate jar in your spark-submit line, which is
> a pain.
>
> On Wed, Feb 24, 2016 at 12:49 PM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Hi Cody,
>>
>> I tried with the build file you provided, but it's not working for me,
>> getting same error:
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>>
>> I am not getting this error while building  (sbt package). I am getting
>> this error when i am running my spark-streaming program.
>> Do i need to specify kafka jar path manually with spark-submit --jars
>> flag?
>>
>> My build.sbt:
>>
>> name := "NetworkStreaming"
>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>
>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>
>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
>> )
>>
>>
>>
>> Regards,
>> ~Vinti
>>
>> On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> spark streaming is provided, kafka is not.
>>>
>>> This build file
>>>
>>> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>>>
>>> includes some hacks for ivy issues that may no longer be strictly
>>> necessary, but try that build and see if it works for you.
>>>
>>>
>>> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <vinti.u...@gmail.com
>>> > wrote:
>>>
>>>> Hello,
>>>>
>>>> I have tried multiple different settings in build.sbt but seems like
>>>> nothing is working.
>>>> Can anyone suggest the right syntax/way to include kafka with spark?
>>>>
>>>> Error
>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>>>
>>>> build.sbt
>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>>>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>>>> libraryDependencies ++= Seq(
>>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>>>> )
>>>>
>>>>
>>>> Thanks,
>>>> Vinti
>>>>
>>>>
>>>
>>
>


Re: Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Hi Cody,

I tried with the build file you provided, but it's not working for me,
getting same error:
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/kafka/KafkaUtils$

I am not getting this error while building  (sbt package). I am getting
this error when i am running my spark-streaming program.
Do i need to specify kafka jar path manually with spark-submit --jars flag?

My build.sbt:

name := "NetworkStreaming"
libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"

libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2"
)



Regards,
~Vinti

On Wed, Feb 24, 2016 at 9:33 AM, Cody Koeninger <c...@koeninger.org> wrote:

> spark streaming is provided, kafka is not.
>
> This build file
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/build.sbt
>
> includes some hacks for ivy issues that may no longer be strictly
> necessary, but try that build and see if it works for you.
>
>
> On Wed, Feb 24, 2016 at 11:14 AM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have tried multiple different settings in build.sbt but seems like
>> nothing is working.
>> Can anyone suggest the right syntax/way to include kafka with spark?
>>
>> Error
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>>
>> build.sbt
>> libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
>> libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
>>   "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
>> )
>>
>>
>> Thanks,
>> Vinti
>>
>>
>


Spark and KafkaUtils

2016-02-24 Thread Vinti Maheshwari
Hello,

I have tried multiple different settings in build.sbt but seems like
nothing is working.
Can anyone suggest the right syntax/way to include kafka with spark?

Error
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/kafka/KafkaUtils$

build.sbt
libraryDependencies += "org.apache.hbase" % "hbase" % "0.92.1"
libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.2"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
  "org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2" % "provided"
)


Thanks,
Vinti


Network Spark Streaming from multiple remote hosts

2016-02-23 Thread Vinti Maheshwari
Hi All

I wrote program for Spark Streaming in Scala. In my program, i passed
'remote-host' and 'remote port' under socketTextStream.

And in the remote machine, i have one perl script who is calling system
command:

echo 'data_str' | nc  <>

In that way, my spark program is able to get data, but it seems little bit
confusing as i have multiple remote machines which needs to send data to
spark machine. I wanted to know the right way of doing it. Infact, how will
i deal with data coming from multiple hosts?

For Reference, My current program:

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)

val ssc = new StreamingContext(sc, Seconds(2))

val inputStream = ssc.socketTextStream(, )
---
---

ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()

  }}

Thanks in advance.

Regards,
~Vinti


Re: Spark Streaming not reading input coming from the other ip

2016-02-22 Thread Vinti Maheshwari
Thanks Shixiong, I am not getting any error and telnet is also working fine.

$ telnet  
Trying 192.168.186.97...
Connected to ttsv-


On Mon, Feb 22, 2016 at 1:10 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> What's the error info reported by Streaming? And could you use "telnet" to
> test if the network is normal?
>
> On Mon, Feb 22, 2016 at 6:59 AM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> For reference, my program:
>>
>> def main(args: Array[String]): Unit = {
>> val conf = new SparkConf().setAppName("HBaseStream")
>> val sc = new SparkContext(conf)
>> // create a StreamingContext, the main entry point for all streaming 
>> functionality
>> val ssc = new StreamingContext(sc, Seconds(2))
>> val inputStream = ssc.socketTextStream(, )
>> ...
>> }
>>
>> Data coming form different host to this socket, But somehow spark is not
>> reading it, while if i copy and paste same data, it works.
>>
>>  [root@ ~]# nc -lk 
>> 56b4b2b23c24c3608376d1f0,/obj/i386/junos/lib/librtsock/rtslib_gencfg.So.gcda,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
>> 56b4b2b23c24c3608376d1f0,/obj/i386/junos/lib/librtsock/rtslib_idl.So.gcda,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
>>
>> On Mon, Feb 22, 2016 at 6:38 AM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> I am in spark Streaming context, and i am reading input from the the
>>> socket using nc -lk . When i am running it and manually giving input
>>> it's working. But, if input is coming from different ip to this socket then
>>> spark is not reading that input, though it's showing all the input coming
>>> from different source under nc -lk . But somehow spark is not reading
>>> it.
>>>
>>> I am not sure what can be issue. Does anyone has idea about it? Thanks
>>> in advance.
>>>
>>>
>>> Thanks
>>>
>>> ~Vinti
>>>
>>
>>
>


Re: Spark Streaming not reading input coming from the other ip

2016-02-22 Thread Vinti Maheshwari
For reference, my program:

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all
streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream(, )
...
}

Data coming form different host to this socket, But somehow spark is not
reading it, while if i copy and paste same data, it works.

 [root@ ~]# nc -lk 
56b4b2b23c24c3608376d1f0,/obj/i386/junos/lib/librtsock/rtslib_gencfg.So.gcda,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
56b4b2b23c24c3608376d1f0,/obj/i386/junos/lib/librtsock/rtslib_idl.So.gcda,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0

On Mon, Feb 22, 2016 at 6:38 AM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Hi
>
> I am in spark Streaming context, and i am reading input from the the
> socket using nc -lk . When i am running it and manually giving input
> it's working. But, if input is coming from different ip to this socket then
> spark is not reading that input, though it's showing all the input coming
> from different source under nc -lk . But somehow spark is not reading
> it.
>
> I am not sure what can be issue. Does anyone has idea about it? Thanks in
> advance.
>
>
> Thanks
>
> ~Vinti
>


Spark Streaming not reading input coming from the other ip

2016-02-22 Thread Vinti Maheshwari
Hi

I am in spark Streaming context, and i am reading input from the the socket
using nc -lk . When i am running it and manually giving input it's
working. But, if input is coming from different ip to this socket then
spark is not reading that input, though it's showing all the input coming
from different source under nc -lk . But somehow spark is not reading
it.

I am not sure what can be issue. Does anyone has idea about it? Thanks in
advance.


Thanks

~Vinti


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
>
> Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
> For reference, final solution:
>
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
> // create a StreamingContext, the main entry point for all streaming 
> functionality
> val ssc = new StreamingContext(sc, Seconds(2))
> val inputStream = ssc.socketTextStream("hostname", )
> val parsedDstream = inputStream
>   .map(line => {
> val splitLines = line.split(",")
> (splitLines(1), splitLines.slice(2, 
> splitLines.length).map(_.trim.toInt))
>   })
>   .reduceByKey((first, second) => {
> val listOfArrays = ArrayBuffer(first, second)
> listOfArrays.toList.transpose.map(_.sum).toArray
>   })
>   .foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
> }
>
>
> Regards,
> Vinti
>
> On Sun, Feb 21, 2016 at 2:22 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> I believe the best way would be to use reduceByKey operation.
>>
>> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> You will need to do a collect and update a global map if you want to.
>>>
>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>> r2._3))
>>>  .foreachRDD(rdd => {
>>>rdd.collect().foreach((fileName, valueTuple) => >> global map here>)
>>>  })
>>>
>>> --
>>> Thanks
>>> Jatin Kumar | Rocket Scientist
>>> +91-7696741743 m
>>>
>>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com
>>> > wrote:
>>>
>>>> Nevermind, seems like an executor level mutable map is not recommended
>>>> as stated in
>>>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>>>
>>>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com
>>>> > wrote:
>>>>
>>>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>>>> suggested:
>>>>>
>>>>> def parseCoverageLine(str: String) = {
>>>>>   val arr = str.split(",")
>>>>>   ...
>>>>>   ...
>>>>>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>>>> }
>>>>>
>>>>> Then in the grouping, can i use a global hash map per executor /
>>>>> partition to aggregate the results?
>>>>>
>>>>> val globalMap:[String: List[Int]] = Map()
>>>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>>>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>>>> // if exists in global map, append result else add new key
>>>>>
>>>>> // globalMap
>>>>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>>>> })
>>>>>
>>>>> Thanks,
>>>>> Vinti
>>>>>
>>>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com
>>>>> > wrote:
>>>>>
>>>>>> Hello Vinti,
>>>>>>
>>>>>> One way to get this done is you split your input line into key and
>>>>>> value tuple and then you can simply use groupByKey and handle the values
>>>>>> the way you want. For example:
>>>>>>
>>>>>> Assuming you have already split the values into a 5 tuple:
>>>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
>>>>>> + r2._3))
>>>>>>
>>>>>> I hope that helps.
>>>>>>
>>>>>> --
>>>>>> Thanks
>>>>>> Jatin Kumar | Rocket Scientist
>>>>>> +91-7696741743 m
>>>>>>
>>>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>>>> vinti.u...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have input lines like below
>>>>>>>
>>>>>>> *Input*
>>>>>>> t1, file1, 1, 1, 1
>>>>>>> t1, file1, 1, 2, 3
>>>>>>> t1, file2, 2, 2, 2, 2
>>>>>>> t2, file1, 5, 5, 5
>>>>>>> t2, file2, 1, 1, 2, 2
>>>>>>>
>>>>>>> and i want to achieve the output like below rows which is a vertical
>>>>>>> addition of the corresponding numbers.
>>>>>>>
>>>>>>> *Output*
>>>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>>>
>>>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>>>> to figure out the way to group by file name.
>>>>>>>
>>>>>>> It seems like i will need to use something like below, i am not sure
>>>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>>>
>>>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>>>
>>>>>>> I know how to do the vertical sum of array of given numbers, but i
>>>>>>> am not sure how to feed that function to the group by.
>>>>>>>
>>>>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>>>   counts.toList.transpose.map(_.sum)
>>>>>>>   }
>>>>>>>
>>>>>>> ~Thanks,
>>>>>>> Vinti
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
For reference, final solution:

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all
streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", )
val parsedDstream = inputStream
  .map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2,
splitLines.length).map(_.trim.toInt))
  })
  .reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
  })
  .foreachRDD(rdd => rdd.foreach(Blaher.blah))

}


Regards,
Vinti

On Sun, Feb 21, 2016 at 2:22 PM, ayan guha <guha.a...@gmail.com> wrote:

> I believe the best way would be to use reduceByKey operation.
>
> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> You will need to do a collect and update a global map if you want to.
>>
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>  .foreachRDD(rdd => {
>>rdd.collect().foreach((fileName, valueTuple) => > map here>)
>>  })
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>>> Nevermind, seems like an executor level mutable map is not recommended
>>> as stated in
>>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>>
>>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>>> suggested:
>>>>
>>>> def parseCoverageLine(str: String) = {
>>>>   val arr = str.split(",")
>>>>   ...
>>>>   ...
>>>>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>>> }
>>>>
>>>> Then in the grouping, can i use a global hash map per executor /
>>>> partition to aggregate the results?
>>>>
>>>> val globalMap:[String: List[Int]] = Map()
>>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>>> // if exists in global map, append result else add new key
>>>>
>>>> // globalMap
>>>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>>> })
>>>>
>>>> Thanks,
>>>> Vinti
>>>>
>>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com>
>>>> wrote:
>>>>
>>>>> Hello Vinti,
>>>>>
>>>>> One way to get this done is you split your input line into key and
>>>>> value tuple and then you can simply use groupByKey and handle the values
>>>>> the way you want. For example:
>>>>>
>>>>> Assuming you have already split the values into a 5 tuple:
>>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
>>>>> + r2._3))
>>>>>
>>>>> I hope that helps.
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Jatin Kumar | Rocket Scientist
>>>>> +91-7696741743 m
>>>>>
>>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>>> vinti.u...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have input lines like below
>>>>>>
>>>>>> *Input*
>>>>>> t1, file1, 1, 1, 1
>>>>>> t1, file1, 1, 2, 3
>>>>>> t1, file2, 2, 2, 2, 2
>>>>>> t2, file1, 5, 5, 5
>>>>>> t2, file2, 1, 1, 2, 2
>>>>>>
>>>>>> and i want to achieve the output like below rows which is a vertical
>>>>>> addition of the corresponding numbers.
>>>>>>
>>>>>> *Output*
>>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>>
>>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>>> to figure out the way to group by file name.
>>>>>>
>>>>>> It seems like i will need to use something like below, i am not sure
>>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>>
>>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>>
>>>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>>>> not sure how to feed that function to the group by.
>>>>>>
>>>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>>   counts.toList.transpose.map(_.sum)
>>>>>>   }
>>>>>>
>>>>>> ~Thanks,
>>>>>> Vinti
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
Nevermind, seems like an executor level mutable map is not recommended as
stated in
http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/

On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Thanks for your reply Jatin. I changed my parsing logic to what you
> suggested:
>
> def parseCoverageLine(str: String) = {
>   val arr = str.split(",")
>   ...
>   ...
>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
> }
>
> Then in the grouping, can i use a global hash map per executor / partition
> to aggregate the results?
>
> val globalMap:[String: List[Int]] = Map()
> val coverageDStream = inputStream.map(parseCoverageLine)
> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
> // if exists in global map, append result else add new key
>
> // globalMap
> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
> })
>
> Thanks,
> Vinti
>
> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com>
> wrote:
>
>> Hello Vinti,
>>
>> One way to get this done is you split your input line into key and value
>> tuple and then you can simply use groupByKey and handle the values the way
>> you want. For example:
>>
>> Assuming you have already split the values into a 5 tuple:
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>
>> I hope that helps.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I have input lines like below
>>>
>>> *Input*
>>> t1, file1, 1, 1, 1
>>> t1, file1, 1, 2, 3
>>> t1, file2, 2, 2, 2, 2
>>> t2, file1, 5, 5, 5
>>> t2, file2, 1, 1, 2, 2
>>>
>>> and i want to achieve the output like below rows which is a vertical
>>> addition of the corresponding numbers.
>>>
>>> *Output*
>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>
>>> I am in a spark streaming context and i am having a hard time trying to
>>> figure out the way to group by file name.
>>>
>>> It seems like i will need to use something like below, i am not sure how
>>> to get to the correct syntax. Any inputs will be helpful.
>>>
>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>
>>> I know how to do the vertical sum of array of given numbers, but i am
>>> not sure how to feed that function to the group by.
>>>
>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>   counts.toList.transpose.map(_.sum)
>>>   }
>>>
>>> ~Thanks,
>>> Vinti
>>>
>>
>>
>


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
Thanks for your reply Jatin. I changed my parsing logic to what you
suggested:

def parseCoverageLine(str: String) = {
  val arr = str.split(",")
  ...
  ...
  (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
}

Then in the grouping, can i use a global hash map per executor / partition
to aggregate the results?

val globalMap:[String: List[Int]] = Map()
val coverageDStream = inputStream.map(parseCoverageLine)
coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
// if exists in global map, append result else add new key

// globalMap
// { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
})

Thanks,
Vinti

On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com>
wrote:

> Hello Vinti,
>
> One way to get this done is you split your input line into key and value
> tuple and then you can simply use groupByKey and handle the values the way
> you want. For example:
>
> Assuming you have already split the values into a 5 tuple:
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>
> I hope that helps.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have input lines like below
>>
>> *Input*
>> t1, file1, 1, 1, 1
>> t1, file1, 1, 2, 3
>> t1, file2, 2, 2, 2, 2
>> t2, file1, 5, 5, 5
>> t2, file2, 1, 1, 2, 2
>>
>> and i want to achieve the output like below rows which is a vertical
>> addition of the corresponding numbers.
>>
>> *Output*
>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>
>> I am in a spark streaming context and i am having a hard time trying to
>> figure out the way to group by file name.
>>
>> It seems like i will need to use something like below, i am not sure how
>> to get to the correct syntax. Any inputs will be helpful.
>>
>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>
>> I know how to do the vertical sum of array of given numbers, but i am not
>> sure how to feed that function to the group by.
>>
>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>   counts.toList.transpose.map(_.sum)
>>   }
>>
>> ~Thanks,
>> Vinti
>>
>
>


Stream group by

2016-02-21 Thread Vinti Maheshwari
Hello,

I have input lines like below

*Input*
t1, file1, 1, 1, 1
t1, file1, 1, 2, 3
t1, file2, 2, 2, 2, 2
t2, file1, 5, 5, 5
t2, file2, 1, 1, 2, 2

and i want to achieve the output like below rows which is a vertical
addition of the corresponding numbers.

*Output*
“file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
“file2” : [ 2+1, 2+1, 2+2, 2+2 ]

I am in a spark streaming context and i am having a hard time trying to
figure out the way to group by file name.

It seems like i will need to use something like below, i am not sure how to
get to the correct syntax. Any inputs will be helpful.

myDStream.foreachRDD(rdd => rdd.groupBy())

I know how to do the vertical sum of array of given numbers, but i am not
sure how to feed that function to the group by.

  def compute_counters(counts : ArrayBuffer[List[Int]]) = {
  counts.toList.transpose.map(_.sum)
  }

~Thanks,
Vinti


Re: Need help in spark-Scala program

2016-02-01 Thread Vinti Maheshwari
Hi,

Sorry, please ignore my message, It was sent by mistake. I am still
drafting.

Regards,
Vinti

On Mon, Feb 1, 2016 at 2:25 PM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Hi All,
>
> I recently started learning Spark. I need to use spark-streaming.
>
> 1) Input, need to read from MongoDB
>
> db.event_gcovs.find({executions:"56791a746e928d7b176d03c0", valid:1,
> infofile:{$exists:1}, geo:"sunnyvale"}, {infofile:1}).count()
>
> > Number of Info files: 24441
>
> /* 0 */
>
> {
>
> "_id" : ObjectId("568eaeda71404e5c563ccb86"),
>
> "infofile" :
> "/volume/testtech/datastore/code-coverage/p//infos/svl/6/56791a746e928d7b176d03c0/
> 69958.pcp_napt44_20368.pl.30090.exhibit.R0-re0.15.1I20151218_1934_jammyc.pfe.i386.TC011.fail.FAIL.gcov.info
> "
> }
>
> One info file can have 1000 of  these blocks( Each block starts from "SF"
> delimeter, and ends with the end_of_record.
>
>
>


Spark Streaming application designing question

2016-02-01 Thread Vinti Maheshwari
Hi,

I am new in spark. I wanted to do spark streaming setup to retrieve key
value pairs  of below format files:

file: info1

Note: Each info file will have around of 1000 of these records. And our
system continuously generating info files. So Through spark streaming i
wanted to aggregate result.

Can we give input to spark cluster this kind of files. I am interested in
the "SF" and "DA" delimiters only, "SF" corresponds to source file . And
"DA" corresponds the ( line number,  count).

As this input data is not the line format, so is this the good idea to use
these files for the spark input or should i need to do some intermediary
stage where i need to clean these files to generate new files which will
have each record information in line instead of blocks?
Or can we achieve this in Spark itself?

What should be the right approach?



*What i wanted to achieve? :*
I wanted to get line level information. Means, to get line (As a key) and
info files (as values)
My system continuously generating info files. So Through spark streaming i
wanted to aggregate result.

Final output i wanted is like below:
line178 -> (info1, info2, info7.)
line 2908 -> (info3, info90)

Do let me know if my explanation is not clear.


Thanks & Regards,
Vinti


Need help in spark-Scala program

2016-02-01 Thread Vinti Maheshwari
Hi All,

I recently started learning Spark. I need to use spark-streaming.

1) Input, need to read from MongoDB

db.event_gcovs.find({executions:"56791a746e928d7b176d03c0", valid:1,
infofile:{$exists:1}, geo:"sunnyvale"}, {infofile:1}).count()

> Number of Info files: 24441

/* 0 */

{

"_id" : ObjectId("568eaeda71404e5c563ccb86"),

"infofile" :
"/volume/testtech/datastore/code-coverage/p//infos/svl/6/56791a746e928d7b176d03c0/
69958.pcp_napt44_20368.pl.30090.exhibit.R0-re0.15.1I20151218_1934_jammyc.pfe.i386.TC011.fail.FAIL.gcov.info
"
}

One info file can have 1000 of  these blocks( Each block starts from "SF"
delimeter, and ends with the end_of_record.