Re: Invalidating/Remove complete mapWithState state

2017-04-17 Thread Matthias Niehoff
That would be the hard way, but if possible I want to clear the cache
without stopping the application, maybe triggered by a message in the
stream.

Am 17. April 2017 um 19:41 schrieb ayan guha <guha.a...@gmail.com>:

> It sounds like you want to stop the stream process, wipe out the check
> point and restart?
>
> On Mon, 17 Apr 2017 at 10:13 pm, Matthias Niehoff <
> matthias.nieh...@codecentric.de> wrote:
>
>> Hi everybody,
>>
>> is there a way to complete invalidate or remove the state used by
>> mapWithState, not only for a given key using State#remove()?
>>
>> Deleting the state key by key is not an option, as a) not all possible
>> keys are known(might be work around of course) and b) the number of keys is
>> to big and therefore takes to long.
>>
>> I tried to unpersist the RDD retrieved by stateSnapshot (
>> stateSnapshots().transform(_.unpersist()) ) , but this did not work as
>> expected.
>>
>> Thank you,
>>
>> Matthias
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting
>> codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland
>> telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676>
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
> --
> Best Regards,
> Ayan Guha
>



-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting
codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland
telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676>
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Invalidating/Remove complete mapWithState state

2017-04-17 Thread Matthias Niehoff
Hi everybody,

is there a way to complete invalidate or remove the state used by
mapWithState, not only for a given key using State#remove()?

Deleting the state key by key is not an option, as a) not all possible keys
are known(might be work around of course) and b) the number of keys is to
big and therefore takes to long.

I tried to unpersist the RDD retrieved by stateSnapshot (
stateSnapshots().transform(_.unpersist()) ) , but this did not work as
expected.

Thank you,

Matthias
-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting
codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland
telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676>
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Problems with new experimental Kafka Consumer for 0.10

2016-10-17 Thread Matthias Niehoff
heartbeat.interval.ms default
group.max.session.timeout.ms default
session.timeout.ms 6

default values as of kafka 0.10.

complete Kafka params:

val kafkaParams = Map[String, String](
  "bootstrap.servers" -> kafkaBrokers,
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> "false",
  "key.deserializer" -> classOf[StringDeserializer].getName,
  "value.deserializer" -> classOf[BytesDeserializer].getName,
  "session.timeout.ms" -> s"${1 * 60 * 1000}",
  "request.timeout.ms" -> s"${2 * 60 * 1000}",
  "max.poll.records" -> "1000"
)


As pointed out, when using different groups for each DirectStream
everything is fine.

2016-10-15 2:42 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> For you or anyone else having issues with consumer rebalance, what are
> your settings for
>
> heartbeat.interval.ms
> session.timeout.ms
> group.max.session.timeout.ms
>
> relative to your batch time?
>
> On Tue, Oct 11, 2016 at 10:19 AM, static-max <flasha...@googlemail.com>
> wrote:
> > Hi,
> >
> > I run into the same exception
> > (org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> be
> > completed since the group has already rebalanced ...), but I only use one
> > stream.
> > I get the exceptions when trying to manually commit the offset to Kafka:
> >
> > OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
> > CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream.dstream();
> > cco.commitAsync(offsets);
> >
> > I tried setting "max.poll.records" to 1000 but this did not help.
> >
> > Any idea what could be wrong?
> >
> > 2016-10-11 15:36 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >>
> >> The new underlying kafka consumer prefetches data and is generally
> heavier
> >> weight, so it is cached on executors.  Group id is part of the cache
> key. I
> >> assumed kafka users would use different group ids for consumers they
> wanted
> >> to be distinct, since otherwise would cause problems even with the
> normal
> >> kafka consumer,  but that appears to be a poor assumption.
> >>
> >> I'll figure out a way to make this more obvious.
> >>
> >>
> >> On Oct 11, 2016 8:19 AM, "Matthias Niehoff"
> >> <matthias.nieh...@codecentric.de> wrote:
> >>
> >> good point, I changed the group id to be unique for the separate streams
> >> and now it works. Thanks!
> >>
> >> Although changing this is ok for us, i am interested in the why :-) With
> >> the old connector this was not a problem nor is it afaik with the pure
> kafka
> >> consumer api
> >>
> >> 2016-10-11 14:30 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >>>
> >>> Just out of curiosity, have you tried using separate group ids for the
> >>> separate streams?
> >>>
> >>>
> >>> On Oct 11, 2016 4:46 AM, "Matthias Niehoff"
> >>> <matthias.nieh...@codecentric.de> wrote:
> >>>>
> >>>> I stripped down the job to just consume the stream and print it,
> without
> >>>> avro deserialization. When I only consume one topic, everything is
> fine. As
> >>>> soon as I add a second stream it stucks after about 5 minutes. So I
> >>>> basically bails down to:
> >>>>
> >>>>
> >>>>   val kafkaParams = Map[String, String](
> >>>> "bootstrap.servers" -> kafkaBrokers,
> >>>> "group.id" -> group,
> >>>> "key.deserializer" -> classOf[StringDeserializer].getName,
> >>>> "value.deserializer" -> classOf[BytesDeserializer].getName,
> >>>> "session.timeout.ms" -> s"${1 * 60 * 1000}",
> >>>> "request.timeout.ms" -> s"${2 * 60 * 1000}",
> >>>> "auto.offset.reset" -> "latest",
> >>>> "enable.auto.commit" -> "false"
> >>>>   )
> >>>>
> >>>>   def main(args: Array[String]) {
> >>>>
> >>>> def createStreamingContext(): StreamingContext = {
> >>>>
> >>>>   val sparkConf = new SparkConf(true)
> >>>> .setAppName("Kafka Consumer Test")
> >>>>

Re: Problems with new experimental Kafka Consumer for 0.10

2016-10-11 Thread Matthias Niehoff
good point, I changed the group id to be unique for the separate streams
and now it works. Thanks!

Although changing this is ok for us, i am interested in the why :-) With
the old connector this was not a problem nor is it afaik with the pure
kafka consumer api

2016-10-11 14:30 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> Just out of curiosity, have you tried using separate group ids for the
> separate streams?
>
> On Oct 11, 2016 4:46 AM, "Matthias Niehoff" <matthias.niehoff@codecentric.
> de> wrote:
>
>> I stripped down the job to just consume the stream and print it, without
>> avro deserialization. When I only consume one topic, everything is fine. As
>> soon as I add a second stream it stucks after about 5 minutes. So I
>> basically bails down to:
>>
>>
>>   val kafkaParams = Map[String, String](
>> "bootstrap.servers" -> kafkaBrokers,
>> "group.id" -> group,
>> "key.deserializer" -> classOf[StringDeserializer].getName,
>> "value.deserializer" -> classOf[BytesDeserializer].getName,
>> "session.timeout.ms" -> s"${1 * 60 * 1000}",
>> "request.timeout.ms" -> s"${2 * 60 * 1000}",
>> "auto.offset.reset" -> "latest",
>> "enable.auto.commit" -> "false"
>>   )
>>
>>   def main(args: Array[String]) {
>>
>> def createStreamingContext(): StreamingContext = {
>>
>>   val sparkConf = new SparkConf(true)
>> .setAppName("Kafka Consumer Test")
>>   sparkConf.setMaster("local[*]")
>>
>>   val ssc = new StreamingContext(sparkConf, 
>> Seconds(streaming_interval_seconds))
>>
>>   // AD REQUESTS
>>   // ===
>>   val serializedAdRequestStream = createStream(ssc, topic_adrequest)
>>   serializedAdRequestStream.map(record => record.value().get()).print(10)
>>
>>   // VIEWS
>>   // ==
>>   val serializedViewStream = createStream(ssc, topic_view)
>>   serializedViewStream.map(record => record.value().get()).print(10)
>>
>> //  // CLICKS
>> //  // ==
>> //  val serializedClickStream = createStream(ssc, topic_click)
>> //  serializedClickStream.map(record => record.value().get()).print(10)
>>
>>   ssc
>> }
>>
>> val streamingContext = createStreamingContext
>> streamingContext.start()
>> streamingContext.awaitTermination()
>>   }
>>
>>
>> And in the logs you see:
>>
>>
>> 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job 
>> 1476100944000 ms.2 from job set of time 1476100944000 ms*16/10/10 14:02:26 
>> *INFO JobScheduler: Total delay: 2,314 s for time 1476100944000 ms 
>> (execution: 0,698 s)*16/10/10 14:03:26 *INFO JobScheduler: Added jobs for 
>> time 1476100946000 ms
>> 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from persistence 
>> list
>> 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job 
>> 1476100946000 ms.0 from job set of time 1476100946000 ms
>>
>>
>> 2016-10-11 9:28 GMT+02:00 Matthias Niehoff <matthias.niehoff@codecentric.
>> de>:
>>
>>> This Job will fail after about 5 minutes:
>>>
>>>
>>> object SparkJobMinimal {
>>>
>>>   //read Avro schemas
>>>   var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc")
>>>   val avroSchemaAdRequest = 
>>> scala.io.Source.fromInputStream(stream).getLines.mkString
>>>   stream.close
>>>   stream = getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc")
>>>   val avroSchemaEvent = 
>>> scala.io.Source.fromInputStream(stream).getLines.mkString
>>>   stream.close
>>>
>>>
>>>   val kafkaBrokers = "broker-0.kafka.mesos:9092"
>>>
>>>   val topic_adrequest = "adserving.log.ad_request"
>>>   val topic_view = "adserving.log.view"
>>>   val topic_click = "adserving.log.click"
>>>   val group = UUID.randomUUID().toString
>>>   val streaming_interval_seconds = 2
>>>
>>>   val kafkaParams = Map[String, String](
>>> "bootstrap.servers" -> kafkaBrokers,
>>> "group.id" -> group,
>>> "key.deserializer" -> classOf[StringDeserializer].getName,
>>> "value.deserializer" -> classOf[Byte

Re: Problems with new experimental Kafka Consumer for 0.10

2016-10-11 Thread Matthias Niehoff
I re-ran the job with DEBUG Log Level for org.apache.spark, kafka.consumer
and org.apache.kafka. Please find the output here:
http://pastebin.com/VgtRUQcB

most of the delay is introduced by *16/10/11 13:20:12 DEBUG RecurringTimer:
Callback for JobGenerator called at time x*, which repeats multiple times,
until about on minute has passed. I think this class is responsible for the
endless loop, scheduling the microbatches, but I do not know exactly what
it does and why it has a problem with multiple Kafka Direct Streams.

2016-10-11 11:46 GMT+02:00 Matthias Niehoff <matthias.nieh...@codecentric.de
>:

> I stripped down the job to just consume the stream and print it, without
> avro deserialization. When I only consume one topic, everything is fine. As
> soon as I add a second stream it stucks after about 5 minutes. So I
> basically bails down to:
>
>
>   val kafkaParams = Map[String, String](
> "bootstrap.servers" -> kafkaBrokers,
> "group.id" -> group,
> "key.deserializer" -> classOf[StringDeserializer].getName,
> "value.deserializer" -> classOf[BytesDeserializer].getName,
> "session.timeout.ms" -> s"${1 * 60 * 1000}",
> "request.timeout.ms" -> s"${2 * 60 * 1000}",
> "auto.offset.reset" -> "latest",
> "enable.auto.commit" -> "false"
>   )
>
>   def main(args: Array[String]) {
>
> def createStreamingContext(): StreamingContext = {
>
>   val sparkConf = new SparkConf(true)
> .setAppName("Kafka Consumer Test")
>   sparkConf.setMaster("local[*]")
>
>   val ssc = new StreamingContext(sparkConf, 
> Seconds(streaming_interval_seconds))
>
>   // AD REQUESTS
>   // ===
>   val serializedAdRequestStream = createStream(ssc, topic_adrequest)
>   serializedAdRequestStream.map(record => record.value().get()).print(10)
>
>   // VIEWS
>   // ==
>   val serializedViewStream = createStream(ssc, topic_view)
>   serializedViewStream.map(record => record.value().get()).print(10)
>
> //  // CLICKS
> //  // ==
> //  val serializedClickStream = createStream(ssc, topic_click)
> //  serializedClickStream.map(record => record.value().get()).print(10)
>
>   ssc
> }
>
> val streamingContext = createStreamingContext
> streamingContext.start()
> streamingContext.awaitTermination()
>   }
>
>
> And in the logs you see:
>
>
> 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job 1476100944000 
> ms.2 from job set of time 1476100944000 ms*16/10/10 14:02:26 *INFO 
> JobScheduler: Total delay: 2,314 s for time 1476100944000 ms (execution: 
> 0,698 s)*16/10/10 14:03:26 *INFO JobScheduler: Added jobs for time 
> 1476100946000 ms
> 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from persistence 
> list
> 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job 1476100946000 
> ms.0 from job set of time 1476100946000 ms
>
>
> 2016-10-11 9:28 GMT+02:00 Matthias Niehoff <matthias.niehoff@codecentric.
> de>:
>
>> This Job will fail after about 5 minutes:
>>
>>
>> object SparkJobMinimal {
>>
>>   //read Avro schemas
>>   var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc")
>>   val avroSchemaAdRequest = 
>> scala.io.Source.fromInputStream(stream).getLines.mkString
>>   stream.close
>>   stream = getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc")
>>   val avroSchemaEvent = 
>> scala.io.Source.fromInputStream(stream).getLines.mkString
>>   stream.close
>>
>>
>>   val kafkaBrokers = "broker-0.kafka.mesos:9092"
>>
>>   val topic_adrequest = "adserving.log.ad_request"
>>   val topic_view = "adserving.log.view"
>>   val topic_click = "adserving.log.click"
>>   val group = UUID.randomUUID().toString
>>   val streaming_interval_seconds = 2
>>
>>   val kafkaParams = Map[String, String](
>> "bootstrap.servers" -> kafkaBrokers,
>> "group.id" -> group,
>> "key.deserializer" -> classOf[StringDeserializer].getName,
>> "value.deserializer" -> classOf[BytesDeserializer].getName,
>> "session.timeout.ms" -> s"${1 * 60 * 1000}",
>> "request.timeout.ms" -> s"${2 * 60 * 1000}",
>> "auto.offset.reset" -> "latest",
>> "enable.auto.commit" -> "false"
>&

Re: Problems with new experimental Kafka Consumer for 0.10

2016-10-11 Thread Matthias Niehoff
I stripped down the job to just consume the stream and print it, without
avro deserialization. When I only consume one topic, everything is fine. As
soon as I add a second stream it stucks after about 5 minutes. So I
basically bails down to:


  val kafkaParams = Map[String, String](
"bootstrap.servers" -> kafkaBrokers,
"group.id" -> group,
"key.deserializer" -> classOf[StringDeserializer].getName,
"value.deserializer" -> classOf[BytesDeserializer].getName,
"session.timeout.ms" -> s"${1 * 60 * 1000}",
"request.timeout.ms" -> s"${2 * 60 * 1000}",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "false"
  )

  def main(args: Array[String]) {

def createStreamingContext(): StreamingContext = {

  val sparkConf = new SparkConf(true)
.setAppName("Kafka Consumer Test")
  sparkConf.setMaster("local[*]")

  val ssc = new StreamingContext(sparkConf,
Seconds(streaming_interval_seconds))

  // AD REQUESTS
  // ===
  val serializedAdRequestStream = createStream(ssc, topic_adrequest)
  serializedAdRequestStream.map(record => record.value().get()).print(10)

  // VIEWS
  // ==
  val serializedViewStream = createStream(ssc, topic_view)
  serializedViewStream.map(record => record.value().get()).print(10)

//  // CLICKS
//  // ==
//  val serializedClickStream = createStream(ssc, topic_click)
//  serializedClickStream.map(record => record.value().get()).print(10)

  ssc
}

val streamingContext = createStreamingContext
streamingContext.start()
streamingContext.awaitTermination()
  }


And in the logs you see:


16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job
1476100944000 ms.2 from job set of time 1476100944000 ms*16/10/10
14:02:26 *INFO JobScheduler: Total delay: 2,314 s for time
1476100944000 ms (execution: 0,698 s)*16/10/10 14:03:26 *INFO
JobScheduler: Added jobs for time 1476100946000 ms
16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from persistence list
16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job
1476100946000 ms.0 from job set of time 1476100946000 ms


2016-10-11 9:28 GMT+02:00 Matthias Niehoff <matthias.nieh...@codecentric.de>
:

> This Job will fail after about 5 minutes:
>
>
> object SparkJobMinimal {
>
>   //read Avro schemas
>   var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc")
>   val avroSchemaAdRequest = 
> scala.io.Source.fromInputStream(stream).getLines.mkString
>   stream.close
>   stream = getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc")
>   val avroSchemaEvent = 
> scala.io.Source.fromInputStream(stream).getLines.mkString
>   stream.close
>
>
>   val kafkaBrokers = "broker-0.kafka.mesos:9092"
>
>   val topic_adrequest = "adserving.log.ad_request"
>   val topic_view = "adserving.log.view"
>   val topic_click = "adserving.log.click"
>   val group = UUID.randomUUID().toString
>   val streaming_interval_seconds = 2
>
>   val kafkaParams = Map[String, String](
> "bootstrap.servers" -> kafkaBrokers,
> "group.id" -> group,
> "key.deserializer" -> classOf[StringDeserializer].getName,
> "value.deserializer" -> classOf[BytesDeserializer].getName,
> "session.timeout.ms" -> s"${1 * 60 * 1000}",
> "request.timeout.ms" -> s"${2 * 60 * 1000}",
> "auto.offset.reset" -> "latest",
> "enable.auto.commit" -> "false"
>   )
>
>   def main(args: Array[String]) {
>
> def createStreamingContext(): StreamingContext = {
>
>   val sparkConf = new SparkConf(true)
> .setAppName("Kafka Consumer Test")
>   sparkConf.setMaster("local[*]")
>
>
>   val ssc = new StreamingContext(sparkConf, 
> Seconds(streaming_interval_seconds))
>
>   // AD REQUESTS
>   // ===
>   val serializedAdRequestStream = createStream(ssc, topic_adrequest)
>
>   val adRequestStream = deserializeStream(serializedAdRequestStream, 
> avroSchemaAdRequest, record => AdRequestLog(record)).cache()
>   adRequestStream.print(10)
>
>   // VIEWS
>   // ==
>
>   val serializedViewStream = createStream(ssc, topic_view)
>   val viewStream = deserializeStream(serializedViewStream, 
> avroSchemaEvent, record => Event(record, EventType.View)).cache()
>   viewStream.print(10)
>
>
>   // CLICKS
>   // ==
>   val serializedClickStream

Re: Problems with new experimental Kafka Consumer for 0.10

2016-10-11 Thread Matthias Niehoff
This Job will fail after about 5 minutes:


object SparkJobMinimal {

  //read Avro schemas
  var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc")
  val avroSchemaAdRequest =
scala.io.Source.fromInputStream(stream).getLines.mkString
  stream.close
  stream = getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc")
  val avroSchemaEvent =
scala.io.Source.fromInputStream(stream).getLines.mkString
  stream.close


  val kafkaBrokers = "broker-0.kafka.mesos:9092"

  val topic_adrequest = "adserving.log.ad_request"
  val topic_view = "adserving.log.view"
  val topic_click = "adserving.log.click"
  val group = UUID.randomUUID().toString
  val streaming_interval_seconds = 2

  val kafkaParams = Map[String, String](
"bootstrap.servers" -> kafkaBrokers,
"group.id" -> group,
"key.deserializer" -> classOf[StringDeserializer].getName,
"value.deserializer" -> classOf[BytesDeserializer].getName,
"session.timeout.ms" -> s"${1 * 60 * 1000}",
"request.timeout.ms" -> s"${2 * 60 * 1000}",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "false"
  )

  def main(args: Array[String]) {

def createStreamingContext(): StreamingContext = {

  val sparkConf = new SparkConf(true)
.setAppName("Kafka Consumer Test")
  sparkConf.setMaster("local[*]")


  val ssc = new StreamingContext(sparkConf,
Seconds(streaming_interval_seconds))

  // AD REQUESTS
  // ===
  val serializedAdRequestStream = createStream(ssc, topic_adrequest)

  val adRequestStream =
deserializeStream(serializedAdRequestStream, avroSchemaAdRequest,
record => AdRequestLog(record)).cache()
  adRequestStream.print(10)

  // VIEWS
  // ==

  val serializedViewStream = createStream(ssc, topic_view)
  val viewStream = deserializeStream(serializedViewStream,
avroSchemaEvent, record => Event(record, EventType.View)).cache()
  viewStream.print(10)


  // CLICKS
  // ==
  val serializedClickStream = createStream(ssc, topic_click)
  val clickEventStream = deserializeStream(serializedClickStream,
avroSchemaEvent, record => Event(record, EventType.Click)).cache()
  clickEventStream.print(10)

  ssc
}

val streamingContext = createStreamingContext
streamingContext.start()
streamingContext.awaitTermination()
  }

  def createStream(ssc: StreamingContext, topic: String):
InputDStream[ConsumerRecord[String, Bytes]] = {
KafkaUtils.createDirectStream[String, Bytes](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, Bytes](Set(topic), kafkaParams))
  }

  def deserializeStream[EventType:
ClassTag](serializedAdRequestStream:
InputDStream[ConsumerRecord[String, Bytes]], avroSchema: String,
recordMapper: GenericRecord => EventType): DStream[EventType] = {
serializedAdRequestStream.mapPartitions {
  iteratorOfMessages =>
val schema: Schema = new Schema.Parser().parse(avroSchema)
val recordInjection = GenericAvroCodecs.toBinary(schema)
    iteratorOfMessages.map(message => {
  recordInjection.invert(message.value().get())

}).filter(_.isSuccess).map(_.get.asInstanceOf[GenericRecord]).map(recordMapper)
}
  }
}


2016-10-10 17:42 GMT+02:00 Matthias Niehoff <matthias.nieh...@codecentric.de
>:

> Yes, without commiting the data the consumer rebalances.
> The job consumes 3 streams process them. When consuming only one stream it
> runs fine. But when consuming three streams, even without joining them,
> just deserialize the payload and trigger an output action it fails.
>
> I will prepare code sample.
>
> 2016-10-07 3:35 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>
>> OK, so at this point, even without involving commitAsync, you're
>> seeing consumer rebalances after a particular batch takes longer than
>> the session timeout?
>>
>> Do you have a minimal code example you can share?
>>
>> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff
>> <matthias.nieh...@codecentric.de> wrote:
>> > Hi,
>> > sry for the late reply. A public holiday in Germany.
>> >
>> > Yes, its using a unique group id which no other job or consumer group is
>> > using. I have increased the session.timeout to 1 minutes and set the
>> > max.poll.rate to 1000. The processing takes ~1 second.
>> >
>> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>> >>
>> >> Well, I'd start at the first thing suggested by the error, namely that
>> >> the group has rebalanced.
>> >>
>> >> Is that s

Re: Problems with new experimental Kafka Consumer for 0.10

2016-10-10 Thread Matthias Niehoff
Yes, without commiting the data the consumer rebalances.
The job consumes 3 streams process them. When consuming only one stream it
runs fine. But when consuming three streams, even without joining them,
just deserialize the payload and trigger an output action it fails.

I will prepare code sample.

2016-10-07 3:35 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> OK, so at this point, even without involving commitAsync, you're
> seeing consumer rebalances after a particular batch takes longer than
> the session timeout?
>
> Do you have a minimal code example you can share?
>
> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff
> <matthias.nieh...@codecentric.de> wrote:
> > Hi,
> > sry for the late reply. A public holiday in Germany.
> >
> > Yes, its using a unique group id which no other job or consumer group is
> > using. I have increased the session.timeout to 1 minutes and set the
> > max.poll.rate to 1000. The processing takes ~1 second.
> >
> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >>
> >> Well, I'd start at the first thing suggested by the error, namely that
> >> the group has rebalanced.
> >>
> >> Is that stream using a unique group id?
> >>
> >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
> >> <matthias.nieh...@codecentric.de> wrote:
> >> > Hi,
> >> >
> >> > the stacktrace:
> >> >
> >> > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot
> >> > be
> >> > completed since the group has already rebalanced and assigned the
> >> > partitions
> >> > to another member. This means that the time between subsequent calls
> to
> >> > poll() was longer than the configured session.timeout.ms, which
> >> > typically
> >> > implies that the poll loop is spending too much time message
> processing.
> >> > You
> >> > can address this either by increasing the session timeout or by
> reducing
> >> > the
> >> > maximum size of batches returned in poll() with max.poll.records.
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> RequestFuture$1.onSuccess(RequestFuture.java:167)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> RequestFuture.fireSuccess(RequestFuture.java:133)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(
> RequestFuture.java:107)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$
> RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> >> > at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:278)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> clientPoll(ConsumerNetworkClient.java:360)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:998)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:937)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
> latestOffsets(DirectKafkaInputDStream.scala:169)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute

Re: Problems with new experimental Kafka Consumer for 0.10

2016-09-28 Thread Matthias Niehoff
)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:192)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

But it seems like the commit is not the actual problem. The job also falls
behind if I do not commit the offsets. The delay would be ok if the
processing time was bigger than the batch size, but thats not the case in
any of the microbatches. Imho for some reason one of the microbatches falls
behind more than session.timeout.ms. Then the consumer we regroup which
takes about 1 minute (see timestamps below). Know begins a circle of slow
batches each triggering a consumer regroup. Would this be possible?


16/09/28 *08:15:55* INFO JobScheduler: Total delay: 141.580 s for time
1475050414000 ms (execution: 0.360 s) --> *the job for 08:13:34*
16/09/28 *08:16:48* INFO AbstractCoordinator: Successfully joined group
spark_aggregation_job-kafka010 with generation 6
16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly assigned
partitions [sapxm.adserving.log.ad_request-0,
sapxm.adserving.log.ad_request-2, sapxm.adserving.log.ad_request-1,
sapxm.adserving.log.ad_request-4, sapxm.adserving.log.ad_request-3,
sapxm.adserving.log.ad_request-6, sapxm.adserving.log.ad_request-5,
sapxm.adserving.log.ad_request-8, sapxm.adserving.log.ad_request-7,
sapxm.adserving.log.ad_request-9] for group spark_aggregation_job-kafka010
16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously assigned
partitions [sapxm.adserving.log.view-3, sapxm.adserving.log.view-4,
sapxm.adserving.log.view-1, sapxm.adserving.log.view-2,
sapxm.adserving.log.view-0, sapxm.adserving.log.view-9,
sapxm.adserving.log.view-7, sapxm.adserving.log.view-8,
sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for group
spark_aggregation_job-kafka010
16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group
spark_aggregation_job-kafka010

2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> What's the actual stacktrace / exception you're getting related to
> commit failure?
>
> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff
> <matthias.nieh...@codecentric.de> wrote:
> > Hi everybody,
> >
> > i am using the new Kafka Receiver for Spark Streaming for my Job. When
> > running with old consumer it runs fine.
> >
> > The Job consumes 3 Topics, saves the data to Cassandra, cogroups the
> topic,
> > calls mapWithState and stores the results in cassandra. After that I
> > manually commit the Kafka offsets using the commitAsync method of the
> > KafkaDStream.
> >
> > With the new consumer I experience the following problem:
> >
> > After a certain amount of time (about 4-5 minutes, might be more or less)
> > there are exceptions that the offset commit failed. The processing takes
> > less than the batch interval. I also adjusted the session.timeout and
> > request.timeout as well as the max.poll.records setting which did not
> help.
> >
> > After the first offset commit failed the time it takes from kafka until
> the
> > microbatch is started increases, the processing time is constantly below
> the
> > batch interval. Moreover further offset commits also fail and as result
> the
> > delay time builds up.
> >
> > Has anybody made this experience as well?
> >
> > Thank you
> >
> > Relevant Kafka Parameters:
> >
> > "session.timeout.ms" -> s"${1 * 60 * 1000}",
> > "request.timeout.ms" -> s"${2 * 60 * 1000}",
> > "auto.offset.reset" -> "largest",
> > "enable.auto.commit" -> "false",
> > "max.poll.records" -> "1000"
> >
> >
> >
> > --
> > Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> > 172.1702676
> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> > www.more4fi.de
> >
> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> > Vorstand: Michael Hochgürtel . Mi

Problems with new experimental Kafka Consumer for 0.10

2016-09-27 Thread Matthias Niehoff
Hi everybody,

i am using the new Kafka Receiver for Spark Streaming for my Job. When
running with old consumer it runs fine.

The Job consumes 3 Topics, saves the data to Cassandra, cogroups the topic,
calls mapWithState and stores the results in cassandra. After that I
manually commit the Kafka offsets using the commitAsync method of the
KafkaDStream.

With the new consumer I experience the following problem:

After a certain amount of time (about 4-5 minutes, might be more or less)
there are exceptions that the offset commit failed. The processing takes
less than the batch interval. I also adjusted the session.timeout and
request.timeout as well as the max.poll.records setting which did not help.

After the first offset commit failed the time it takes from kafka until the
microbatch is started increases, the processing time is constantly below
the batch interval. Moreover further offset commits also fail and as result
the delay time builds up.

Has anybody made this experience as well?

Thank you

Relevant Kafka Parameters:

"session.timeout.ms" -> s"${1 * 60 * 1000}",
"request.timeout.ms" -> s"${2 * 60 * 1000}",
"auto.offset.reset" -> "largest",
"enable.auto.commit" -> "false",
"max.poll.records" -> "1000"



-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: How spark decides whether to do BroadcastHashJoin or SortMergeJoin

2016-07-22 Thread Matthias Niehoff
Hi,

there is a property you can set. Quoting the docs (
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
)

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) Configures the
maximum size in bytes for a table that will be broadcast to all worker
nodes when performing a join. By setting this value to -1 broadcasting can
be disabled.

2016-07-20 10:07 GMT+02:00 raaggarw <raagg...@adobe.com>:

> Hi,
>
> How spark decides/optimizes internally as to when it needs to a
> BroadcastHashJoin vs SortMergeJoin? Is there anyway we can guide from
> outside or through options which Join to use?
> Because in my case when i am trying to do a join, spark makes that join as
> BroadCastHashJoin internally and when join is actually being executed it
> waits for broadcast to be done (which is big data), resulting in timeout.
> I do not want to increase value of timeout i.e.
> "spark.sql.broadcastTimeout". Rather i want this to be done via
> SortMergeJoin. How can i enforce that?
>
> Thanks
> Ravi
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-spark-decides-whether-to-do-BroadcastHashJoin-or-SortMergeJoin-tp27369.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Structured Streaming and Microbatches

2016-07-13 Thread Matthias Niehoff
Hi everybody,

as far as I understand with new the structured Streaming API the output
will not get processed every x seconds anymore. Instead the data will be
processed as soon as is arrived. But there might be a delay due to
processing time for the data.

A small example:
Data comes in and the processing takes 1 second (quite long)
In this 1 second a lot of new data come in which will be processed after
the processing of the first data finished.

My questions are:
Is the data for each processing, i.e all the data collected in the 1 second
still processed as a microbatch (included reprocessing in case of failure
on another worker, etc.)? Or is the bulk of data processed one by one?

With regards to the processing time: is the behavior the same for high
processing times as in spark 1.x? Meaning we get a scheduling delay, data
is stored by a receiver,.. (is there even a concept of receiver in Spark 2?
Is a source in streaming basically a receiver?)

Hope those questions aren’t to confusing :-)

Thank you!


Substract two DStreams

2016-06-15 Thread Matthias Niehoff
Hi,

i want to subtract 2 DStreams (based on the same Input Stream) to get all
elements that exist in the original stream, but not in the modified stream
(the modified Stream is changed using joinWithCassandraTable which does an
inner join and because of this might remove entries).

Subtract is only possible on RDDs. So I could use a foreachRDD right in the
beginning of the Stream processing and work on rdds. I think its quite ugly
to use the output op at the beginning and then implement a lot of
transformations in the foreachRDD. So could you think of different ways to
do an efficient diff between to DStreams?

Thank you

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: about an exception when receiving data from kafka topic using Direct mode of Spark Streaming

2016-05-25 Thread Matthias Niehoff
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105
>
>
> This is the code that rises the exception in the spark streaming process:
>
> try{
> messages.foreachRDD( rdd =>{
>   val count = rdd.count()
>   if (count > 0){
> //someMessages should be AmazonRating...
> val someMessages = rdd.take(count.toInt)
> println("<-->")
> println("someMessages is " + someMessages)
> someMessages.foreach(println)
> println("<-->")
> println("<---POSSIBLE SOLUTION--->")
> messages
> .map { case (_, jsonRating) =>
>   val jsValue = Json.parse(jsonRating)
>   AmazonRating.amazonRatingFormat.reads(jsValue) match {
> case JsSuccess(rating, _) => rating
> case JsError(_) => AmazonRating.empty
>   }
>  }
> .filter(_ != AmazonRating.empty)
> *//I think that this line provokes the runtime exception...*
> *.foreachRDD(_.foreachPartition(it =>
> recommender.predictWithALS(it.toSeq)))*
>
> println("<---POSSIBLE SOLUTION--->")
>
>   }
>   }
> )
> }catch{
>   case e: IllegalArgumentException => {println("illegal arg.
> exception")};
>   case e: IllegalStateException=> {println("illegal state
> exception")};
>   case e: ClassCastException   => {println("ClassCastException")};
>   case e: Exception=> {println(" Generic Exception")};
> }finally{
>
>   println("Finished taking data from kafka topic...")
> }
>
> Recommender object:
>
> *def predictWithALS(ratings: Seq[AmazonRating])* = {
> // train model
> val myRatings = ratings.map(toSparkRating)
> val myRatingRDD = sc.parallelize(myRatings)
>
> val startAls = DateTime.now
> val model = ALS.train((sparkRatings ++
> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)
>
> val myProducts = myRatings.map(_.product).toSet
> val candidates = sc.parallelize((0 until
> productDict.size).filterNot(myProducts.contains))
>
> // get ratings of all products not in my history ordered by rating
> (higher first) and only keep the first NumRecommendations
> val myUserId = userDict.getIndex(MyUsername)
> val recommendations = model.predict(candidates.map((myUserId,
> _))).collect
> val endAls = DateTime.now
> val result =
> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
> val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds
>
> println(s"ALS Time: $alsTime seconds")
> result
>   }
> }
>
> And this is the kafka producer that push the json data within the topic:
>
> object AmazonProducerExample {
>   def main(args: Array[String]): Unit = {
>
> val productId = args(0).toString
> val userId = args(1).toString
> val rating = args(2).toDouble
> val topicName = "amazonRatingsTopic"
>
> val producer = Producer[String](topicName)
>
> //0981531679 is Scala Puzzlers...
> //AmazonProductAndRating
> AmazonPageParser.parse(productId,userId,rating).onSuccess { case
> amazonRating =>
>   //Is this the correct way? the best performance? possibly not, what
> about using avro or parquet?
>   producer.send(Json.toJson(amazonRating).toString)
>   //producer.send(amazonRating)
>   println("amazon product with rating sent to kafka cluster..." +
> amazonRating.toString)
>   System.exit(0)
> }
>
>   }
> }
>
>
> I have written a stack overflow post
> <http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>,
> with more details, please help, i am stuck with this issue and i don't know
> how to continue.
>
> Regards
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
>
> --
> View this message in context: about an exception when receiving data from
> kafka topic using Direct mode of Spark Streaming
> <http://apache-spark-user-list.1001560.n3.nabble.com/about-an-exception-when-receiving-data-from-kafka-topic-using-Direct-mode-of-Spark-Streami

Sliding Average over Window in Spark Streaming

2016-05-06 Thread Matthias Niehoff
Hi,

If i want to have a sliding average over the 10 minutes for some keys I can
do something like
groupBy(window(…),“my-key“).avg(“some-values“) in Spark 2.0

I try to implement this sliding average using Spark 1.6.x:
I tried with reduceByKeyAndWindow but it did not find a solution. Imo i
have to keep all the values in the window to compute the average. One way
would be add every new value to a list in the reduce method and then to the
avg computation in a separate map, but this seems kind of ugly.

Do you have an idea how to solve this?

Thanks!

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Please assist: Spark 1.5.2 / cannot find StateSpec / State

2016-04-13 Thread Matthias Niehoff
The StateSpec and the mapWithState method is only available in Spark 1.6.x

2016-04-13 11:34 GMT+02:00 Marco Mistroni <mmistr...@gmail.com>:

> hi all
>  i am trying to replicate the Streaming Wordcount example described here
>
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
>
> in my build,sbt i have the following dependencies
>
> .
> libraryDependencies += "org.apache.spark" %% "spark-core"   % "1.5.2" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming"   % "1.5.2"
> % "provided"
> libraryDependencies += "org.apache.spark" %% "spark-mllib"   % "1.5.2"  %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-flume"   %
> "1.3.0"  % "provided"
> ...
> But compilations fail mentioning that class StateSpec and State are not
> found
>
> Could pls someone point me to the right packages to refer if i want to use
> StateSpec?
>
> kind regards
>  marco
>
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Do not wrap result of a UDAF in an Struct

2016-03-29 Thread Matthias Niehoff
Hi,

given is a simple DF:

root
 |-- id1: string (nullable = true)
 |-- id2: string (nullable = true)
 |-- val: string (nullable = true)

I run an UDAF on this DF with groupBy($“id1“,$“id2“).agg(udaf($“val“) as
„valsStruct“).
The aggregates simply stores all val in Set.

The result is:

root
 |-- id1: string (nullable = true)
 |-- id2: integer (nullable = true)
 |-- valsStruct: struct (nullable = true)
 ||-- vals: array (nullable = true)
 |||-- element: string (containsNull = true)

But i would expect:

root
 |-- id1: string (nullable = true)
 |-- id2: integer (nullable = true)
 |-- vals: array (nullable = true)
 ||— element: string (containsNull = true)

What I’m doing right now is to add a new columns val with valsStruct.vals
as a value and drop valsStruct afterwards, but i’m quite sure there is a
more elegant way. I tried various implementations of the evaluate method,
but none of those worked for me. Can you tell me what I am missing here?

The implementation of the UDAF:

class AggregateVals extends UserDefinedAggregateFunction {

  def inputSchema: StructType = StructType(Array(
StructField("val", StringType, true)
  ))

  def bufferSchema: StructType = StructType(Array(
StructField("vals", ArrayType(StringType, true))
  ))

  def dataType: DataType = bufferSchema

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Seq[String]()
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val existing: Seq[String] = buffer.getSeq[String](0)
val newBuffer = existing :+ input.getAs[String](0)
buffer(0) = newBuffer
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
  }

  def evaluate(buffer: Row): Any = {
    buffer
  }
}

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


DataFrames UDAF with array and struct

2016-03-23 Thread Matthias Niehoff
Hello Everybody,

I want to write an UDAF for DataFrames where the Buffer Schema is a

ArrayType(StructType(List(StructField(String),
StructField(String),StructField(String

When I want to access the buffer in the update() or merge() method the
ArrayType gets returned as a List. But what is the Type of the List? Or in
other words: What is mapping of StructType with StructFields into Scala
collection/data types?

Thanks!

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Spark job for Reading time series data from Cassandra

2016-03-10 Thread Matthias Niehoff
Hi,

the spark connector docs say: (
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
)

"The number of Spark partitions(tasks) created is directly controlled by
the setting spark.cassandra.input.split.size_in_mb. This number reflects
the approximate amount of Cassandra Data in any given Spark partition. To
increase the number of Spark Partitions decrease this number from the
default (64mb) to one that will sufficiently break up your C* token range. „

So, maybe your partitions are quite big?

2016-03-10 16:46 GMT+01:00 Bryan Jeffrey <bryan.jeff...@gmail.com>:

> Prateek,
>
> I believe that one task is created per Cassandra partition.  How is your
> data partitioned?
>
> Regards,
>
> Bryan Jeffrey
>
> On Thu, Mar 10, 2016 at 10:36 AM, Prateek . <prat...@aricent.com> wrote:
>
>> Hi,
>>
>>
>>
>> I have a Spark Batch job for reading timeseries data from Cassandra which
>> has 50,000 rows.
>>
>>
>>
>>
>>
>> JavaRDD cassandraRowsRDD = javaFunctions.cassandraTable("iotdata",
>> "coordinate")
>>
>> .map(*new* Function<CassandraRow, String>() {
>>
>> @Override
>>
>> *public* String call(CassandraRow cassandraRow)
>> *throws* Exception {
>>
>> *return* cassandraRow.toString();
>>
>> }
>>
>> });
>>
>>
>>
>> List lm = cassandraRowsRDD.collect();
>>
>>
>>
>>
>>
>> I am testing in local mode where I am observing Spark is creating 770870
>> tasks (one job, one stage) which is taking many hours to complete. Can any
>> please suggest, what could be possible issues.
>>
>>
>>
>>
>>
>> *Stage Id*
>>
>> *Description*
>>
>> *Submitted*
>>
>> *Duration*
>>
>> *Tasks: Succeeded/Total*
>>
>> *Input*
>>
>> *Output*
>>
>> *Shuffle Read*
>>
>> *Shuffle Write*
>>
>> 0
>>
>> collect at CassandraSpark.java:94
>> <http://localhost:4040/stages/stage?id=0=0>+details
>>
>> 2016/03/10 21:01:15
>>
>> 9 s
>>
>> 137/*770870*
>>
>>
>>
>>
>>
>> Thank You
>>
>>
>>
>> Prateek
>> "DISCLAIMER: This message is proprietary to Aricent and is intended
>> solely for the use of the individual to whom it is addressed. It may
>> contain privileged or confidential information and should not be circulated
>> or used for any purpose other than for what it is intended. If you have
>> received this message in error, please notify the originator immediately.
>> If you are not the intended recipient, you are notified that you are
>> strictly prohibited from using, copying, altering, or disclosing the
>> contents of this message. Aricent accepts no responsibility for loss or
>> damage arising from the use of the information transmitted by this email
>> including damage from virus."
>>
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Streaming job delays

2016-03-10 Thread Matthias Niehoff
Hi,

dynamic allocation is afaik not supported for streaming applications, thats
maybe a reason. See also:

https://mail-archives.apache.org/mod_mbox/spark-user/201510.mbox/%3CCA+AHuKkxg44WvXZGr4MVNUxioWH3o8pZZQRTaXR=m5cb-op...@mail.gmail.com%3E

If you are using Spark 1.6 there should also be a warning about using
dynamic allocation in Streaming mode.

2016-03-09 17:45 GMT+01:00 Juan Leaniz <juan.lea...@gmail.com>:

> Hi
>
> Batch interval is 5min. I actually managed to fix the issue by turning off
> dynamic allocation and the external shuffle service.
>
> This seems to have helped and now the scheduling delay is between 0-5ms
> and processing time is about 2.8min which is lower than my batch interval.
>
> I also noticed that enabling dynamic allocation and the external shuffle
> service had a high impact on cpu usage.
>
> Thanks
> Juan
>
> On Wed, Mar 9, 2016 at 6:00 AM, Matthias Niehoff <
> matthias.nieh...@codecentric.de> wrote:
>
>> hi,
>>
>> What’s your batch interval? if the processing time is constantly bigger
>> than your batch interval it is totally normal that your scheduling delay is
>> going up.
>>
>> 2016-03-08 23:28 GMT+01:00 jleaniz <juan.lea...@gmail.com>:
>>
>>> Hi,
>>>
>>> I have a streaming application that reads batches from Flume, does some
>>> transformations and then writes parquet files to HDFS.
>>>
>>> The problem I have right now is that the scheduling delays are really
>>> really
>>> high, and get even higher as time goes. Have seen it go up to 24 hours.
>>> The
>>> processing time for each batch is usually steady at 50s or less.
>>>
>>> The workers and master are pretty much idle most of the time. Any ideas
>>> why
>>> the scheduling time would be so high when the processing time is low?
>>>
>>> Thanks
>>>
>>> Juan
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-job-delays-tp26433.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> 172.1702676
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Streaming job delays

2016-03-09 Thread Matthias Niehoff
hi,

What’s your batch interval? if the processing time is constantly bigger
than your batch interval it is totally normal that your scheduling delay is
going up.

2016-03-08 23:28 GMT+01:00 jleaniz <juan.lea...@gmail.com>:

> Hi,
>
> I have a streaming application that reads batches from Flume, does some
> transformations and then writes parquet files to HDFS.
>
> The problem I have right now is that the scheduling delays are really
> really
> high, and get even higher as time goes. Have seen it go up to 24 hours. The
> processing time for each batch is usually steady at 50s or less.
>
> The workers and master are pretty much idle most of the time. Any ideas why
> the scheduling time would be so high when the processing time is low?
>
> Thanks
>
> Juan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-job-delays-tp26433.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Add Jars to Master/Worker classpath

2016-03-03 Thread Matthias Niehoff
Hi,

driver and executor path does not work because its for the driver and
executor and not for the master and worker jvm. It works fine for
driver/executor but we want to add classes to the master/worker. The
SPARK_DIST_CLASSPATH looks good, will try this!

Thanks!

2016-03-02 18:35 GMT+01:00 Sumedh Wale <sw...@snappydata.io>:

> On Wednesday 02 March 2016 09:39 PM, Matthias Niehoff wrote:
>
> no, not to driver and executor but to the master and worker instances of
> the spark standalone cluster
>
>
> Why exactly does adding jars to driver/executor extraClassPath not work?
>
> Classpath of master/worker is setup by AbstractCommandBuilder that
> explicitly adds the following:
>
> jars named "datanucleus-*", environment variables: _SPARK_ASSEMBLY (for
> assembly jar), SPARK_DIST_CLASSPATH, HADOOP_CONF_DIR, YARN_CONF_DIR
>
> So you can set SPARK_DIST_CLASSPATH in conf/spark-env.sh to add the
> required jars (separated by platform's File.pathSeparator).
>
>
> thanks
>
> --
> Sumedh Wale
> SnappyData (http://www.snappydata.io)
>
>
> Am 2. März 2016 um 17:05 schrieb Igor Berman <igor.ber...@gmail.com>:
>
>> spark.driver.extraClassPath
>> spark.executor.extraClassPath
>>
>> 2016-03-02 18:01 GMT+02:00 Matthias Niehoff <
>> <matthias.nieh...@codecentric.de>matthias.nieh...@codecentric.de>:
>>
>>> Hi,
>>>
>>> we want to add jars to the Master and Worker class path mainly for
>>> logging reason (we have a redis appender to send logs to redis -> logstash
>>> -> elasticsearch).
>>>
>>> While it is working with setting SPARK_CLASSPATH, this solution is
>>> afaik deprecated and should not be used. Furthermore we are also using 
>>> —driver-java-options
>>> and spark.executor.extraClassPath which leads to exceptions when
>>> running our apps in standalone cluster mode.
>>>
>>> So what is the best way to add jars to the master and worker classpath?
>>>
>>> Thank you
>>>
>>> --
>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>> tel: +49 (0) 721.9595-681 <%2B49%20%280%29%20721.9595-681> | fax: +49
>>> (0) 721.9595-666 <%2B49%20%280%29%20721.9595-666> | mobil: +49 (0)
>>> 172.1702676 <%2B49%20%280%29%20172.1702676>
>>> <http://www.codecentric.de/>www.codecentric.de | blog.codecentric.de |
>>> www.meettheexperts.de | www.more4fi.de
>>>
>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>> Schütz
>>>
>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>> E-Mail ist nicht gestattet
>>>
>>
>>
>
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> <http://www.codecentric.de/>www.codecentric.de | blog.codecentric.de |
> <http://www.meettheexperts.de/>www.meettheexperts.de |
> <http://www.more4fi.de/>www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +4

Re: Add Jars to Master/Worker classpath

2016-03-02 Thread Matthias Niehoff
no, not to driver and executor but to the master and worker instances of
the spark standalone cluster

Am 2. März 2016 um 17:05 schrieb Igor Berman <igor.ber...@gmail.com>:

> spark.driver.extraClassPath
> spark.executor.extraClassPath
>
> 2016-03-02 18:01 GMT+02:00 Matthias Niehoff <
> matthias.nieh...@codecentric.de>:
>
>> Hi,
>>
>> we want to add jars to the Master and Worker class path mainly for
>> logging reason (we have a redis appender to send logs to redis -> logstash
>> -> elasticsearch).
>>
>> While it is working with setting SPARK_CLASSPATH, this solution is afaik
>> deprecated and should not be used. Furthermore we are also using 
>> —driver-java-options
>> and spark.executor.extraClassPath which leads to exceptions when running
>> our apps in standalone cluster mode.
>>
>> So what is the best way to add jars to the master and worker classpath?
>>
>> Thank you
>>
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> 172.1702676
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Add Jars to Master/Worker classpath

2016-03-02 Thread Matthias Niehoff
Hi,

we want to add jars to the Master and Worker class path mainly for logging
reason (we have a redis appender to send logs to redis -> logstash ->
elasticsearch).

While it is working with setting SPARK_CLASSPATH, this solution is afaik
deprecated and should not be used. Furthermore we are also using
—driver-java-options
and spark.executor.extraClassPath which leads to exceptions when running
our apps in standalone cluster mode.

So what is the best way to add jars to the master and worker classpath?

Thank you

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Using jar bundled log4j.xml on worker nodes

2016-02-05 Thread Matthias Niehoff
mh, that seems to be the problem we are facing. but with —files you can
just pass local files, no files in the class path. so we would need a file
outside of our jar..

2016-02-04 18:20 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:

> Have you taken a look at SPARK-11105 ?
>
> Cheers
>
> On Thu, Feb 4, 2016 at 9:06 AM, Matthias Niehoff <
> matthias.nieh...@codecentric.de> wrote:
>
>> Hello everybody,
>>
>> we’ve bundle our log4j.xml into our jar (in the classpath root).
>>
>> I’ve added the log4j.xml to the spark-defaults.conf with
>>
>> spark.{driver,executor}.extraJavaOptions=-Dlog4j.configuration=log4j.xml
>>
>> There is no log4j.properties or log4j.xml in one of the conf folders on
>> any machine.
>>
>> When I start the app the driver is using our log4j.xml, but all the
>> executors use the default log4j.properties („Using Spark’s default log4j
>> profile: org/apache/spark/log4j-defaults.properties“).
>>
>> What do I have to change to make spark use the log4j.xml from our jar
>> also on our executors?
>>
>> We are using Spark 1.5.2
>>
>> Thank you!
>>
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> 172.1702676
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Using jar bundled log4j.xml on worker nodes

2016-02-04 Thread Matthias Niehoff
Hello everybody,

we’ve bundle our log4j.xml into our jar (in the classpath root).

I’ve added the log4j.xml to the spark-defaults.conf with

spark.{driver,executor}.extraJavaOptions=-Dlog4j.configuration=log4j.xml

There is no log4j.properties or log4j.xml in one of the conf folders on any
machine.

When I start the app the driver is using our log4j.xml, but all the
executors use the default log4j.properties („Using Spark’s default log4j
profile: org/apache/spark/log4j-defaults.properties“).

What do I have to change to make spark use the log4j.xml from our jar also
on our executors?

We are using Spark 1.5.2

Thank you!

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Logs of Custom Receiver

2015-11-30 Thread Matthias Niehoff
Hi,

I've built a customer receiver and deployed an application using this
receiver to a cluster.
When I run the application locally I see the log output my logger in
Stdout/Stderr but when I run it  on the cluster I don't see the log output
in Stdout/Stderr.
I just see the logs in the constructor but no following statements in the
start() method and other methods called from there. (All log at the same
level)
Where do I find this log statements?

Thanks!


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, Spark 1.5.1)

2015-10-26 Thread Matthias Niehoff
Hello everybody,

I have a few (~15) Spark Streaming jobs which have load peaks as well as
long times with a low load. So I thought the new Dynamic Resource
Allocation for Standalone Clusters might be helpful (SPARK-4751).

I have a test "cluster" with 1 worker consisting of 4 executors with 2
cores each, so 8 cores in total.

I started a simple streaming application without limiting the max cores for
this app. As expected the app occupied every core of the cluster. Then I
started a second app, also without limiting the maximum cores. As the first
app did not get any input through the stream, my naive expectation was that
the second app would get at least 2 cores (1 receiver, 1 processing), but
that's not what happened. The cores are still assigned to the first app.
When I look at the application UI of the first app every executor is still
running. That explains why no executor is used for the second app.

I end up with two questions:
- When does an executor getting idle in a Spark Streaming application? (and
so could be reassigned to another app)
- Is there another way to compete with uncertain load when using Spark
Streaming Applications? I already combined multiple jobs to a Spark
Application using different threads, but this approach comes to a limit for
me, because Spark Applications get to big to manage.

Thank You!