Re: Invalidating/Remove complete mapWithState state
That would be the hard way, but if possible I want to clear the cache without stopping the application, maybe triggered by a message in the stream. Am 17. April 2017 um 19:41 schrieb ayan guha <guha.a...@gmail.com>: > It sounds like you want to stop the stream process, wipe out the check > point and restart? > > On Mon, 17 Apr 2017 at 10:13 pm, Matthias Niehoff < > matthias.nieh...@codecentric.de> wrote: > >> Hi everybody, >> >> is there a way to complete invalidate or remove the state used by >> mapWithState, not only for a given key using State#remove()? >> >> Deleting the state key by key is not an option, as a) not all possible >> keys are known(might be work around of course) and b) the number of keys is >> to big and therefore takes to long. >> >> I tried to unpersist the RDD retrieved by stateSnapshot ( >> stateSnapshots().transform(_.unpersist()) ) , but this did not work as >> expected. >> >> Thank you, >> >> Matthias >> -- >> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >> codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland >> telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676> >> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de >> >> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >> Vorstand: Michael Hochgürtel . Rainer Vehns >> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >> Schütz >> >> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und >> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >> E-Mail ist nicht gestattet >> > -- > Best Regards, > Ayan Guha > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Invalidating/Remove complete mapWithState state
Hi everybody, is there a way to complete invalidate or remove the state used by mapWithState, not only for a given key using State#remove()? Deleting the state key by key is not an option, as a) not all possible keys are known(might be work around of course) and b) the number of keys is to big and therefore takes to long. I tried to unpersist the RDD retrieved by stateSnapshot ( stateSnapshots().transform(_.unpersist()) ) , but this did not work as expected. Thank you, Matthias -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Re: Problems with new experimental Kafka Consumer for 0.10
heartbeat.interval.ms default group.max.session.timeout.ms default session.timeout.ms 6 default values as of kafka 0.10. complete Kafka params: val kafkaParams = Map[String, String]( "bootstrap.servers" -> kafkaBrokers, "auto.offset.reset" -> "latest", "enable.auto.commit" -> "false", "key.deserializer" -> classOf[StringDeserializer].getName, "value.deserializer" -> classOf[BytesDeserializer].getName, "session.timeout.ms" -> s"${1 * 60 * 1000}", "request.timeout.ms" -> s"${2 * 60 * 1000}", "max.poll.records" -> "1000" ) As pointed out, when using different groups for each DirectStream everything is fine. 2016-10-15 2:42 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > For you or anyone else having issues with consumer rebalance, what are > your settings for > > heartbeat.interval.ms > session.timeout.ms > group.max.session.timeout.ms > > relative to your batch time? > > On Tue, Oct 11, 2016 at 10:19 AM, static-max <flasha...@googlemail.com> > wrote: > > Hi, > > > > I run into the same exception > > (org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > be > > completed since the group has already rebalanced ...), but I only use one > > stream. > > I get the exceptions when trying to manually commit the offset to Kafka: > > > > OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); > > CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream.dstream(); > > cco.commitAsync(offsets); > > > > I tried setting "max.poll.records" to 1000 but this did not help. > > > > Any idea what could be wrong? > > > > 2016-10-11 15:36 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >> > >> The new underlying kafka consumer prefetches data and is generally > heavier > >> weight, so it is cached on executors. Group id is part of the cache > key. I > >> assumed kafka users would use different group ids for consumers they > wanted > >> to be distinct, since otherwise would cause problems even with the > normal > >> kafka consumer, but that appears to be a poor assumption. > >> > >> I'll figure out a way to make this more obvious. > >> > >> > >> On Oct 11, 2016 8:19 AM, "Matthias Niehoff" > >> <matthias.nieh...@codecentric.de> wrote: > >> > >> good point, I changed the group id to be unique for the separate streams > >> and now it works. Thanks! > >> > >> Although changing this is ok for us, i am interested in the why :-) With > >> the old connector this was not a problem nor is it afaik with the pure > kafka > >> consumer api > >> > >> 2016-10-11 14:30 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >>> > >>> Just out of curiosity, have you tried using separate group ids for the > >>> separate streams? > >>> > >>> > >>> On Oct 11, 2016 4:46 AM, "Matthias Niehoff" > >>> <matthias.nieh...@codecentric.de> wrote: > >>>> > >>>> I stripped down the job to just consume the stream and print it, > without > >>>> avro deserialization. When I only consume one topic, everything is > fine. As > >>>> soon as I add a second stream it stucks after about 5 minutes. So I > >>>> basically bails down to: > >>>> > >>>> > >>>> val kafkaParams = Map[String, String]( > >>>> "bootstrap.servers" -> kafkaBrokers, > >>>> "group.id" -> group, > >>>> "key.deserializer" -> classOf[StringDeserializer].getName, > >>>> "value.deserializer" -> classOf[BytesDeserializer].getName, > >>>> "session.timeout.ms" -> s"${1 * 60 * 1000}", > >>>> "request.timeout.ms" -> s"${2 * 60 * 1000}", > >>>> "auto.offset.reset" -> "latest", > >>>> "enable.auto.commit" -> "false" > >>>> ) > >>>> > >>>> def main(args: Array[String]) { > >>>> > >>>> def createStreamingContext(): StreamingContext = { > >>>> > >>>> val sparkConf = new SparkConf(true) > >>>> .setAppName("Kafka Consumer Test") > >>>>
Re: Problems with new experimental Kafka Consumer for 0.10
good point, I changed the group id to be unique for the separate streams and now it works. Thanks! Although changing this is ok for us, i am interested in the why :-) With the old connector this was not a problem nor is it afaik with the pure kafka consumer api 2016-10-11 14:30 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > Just out of curiosity, have you tried using separate group ids for the > separate streams? > > On Oct 11, 2016 4:46 AM, "Matthias Niehoff" <matthias.niehoff@codecentric. > de> wrote: > >> I stripped down the job to just consume the stream and print it, without >> avro deserialization. When I only consume one topic, everything is fine. As >> soon as I add a second stream it stucks after about 5 minutes. So I >> basically bails down to: >> >> >> val kafkaParams = Map[String, String]( >> "bootstrap.servers" -> kafkaBrokers, >> "group.id" -> group, >> "key.deserializer" -> classOf[StringDeserializer].getName, >> "value.deserializer" -> classOf[BytesDeserializer].getName, >> "session.timeout.ms" -> s"${1 * 60 * 1000}", >> "request.timeout.ms" -> s"${2 * 60 * 1000}", >> "auto.offset.reset" -> "latest", >> "enable.auto.commit" -> "false" >> ) >> >> def main(args: Array[String]) { >> >> def createStreamingContext(): StreamingContext = { >> >> val sparkConf = new SparkConf(true) >> .setAppName("Kafka Consumer Test") >> sparkConf.setMaster("local[*]") >> >> val ssc = new StreamingContext(sparkConf, >> Seconds(streaming_interval_seconds)) >> >> // AD REQUESTS >> // === >> val serializedAdRequestStream = createStream(ssc, topic_adrequest) >> serializedAdRequestStream.map(record => record.value().get()).print(10) >> >> // VIEWS >> // == >> val serializedViewStream = createStream(ssc, topic_view) >> serializedViewStream.map(record => record.value().get()).print(10) >> >> // // CLICKS >> // // == >> // val serializedClickStream = createStream(ssc, topic_click) >> // serializedClickStream.map(record => record.value().get()).print(10) >> >> ssc >> } >> >> val streamingContext = createStreamingContext >> streamingContext.start() >> streamingContext.awaitTermination() >> } >> >> >> And in the logs you see: >> >> >> 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job >> 1476100944000 ms.2 from job set of time 1476100944000 ms*16/10/10 14:02:26 >> *INFO JobScheduler: Total delay: 2,314 s for time 1476100944000 ms >> (execution: 0,698 s)*16/10/10 14:03:26 *INFO JobScheduler: Added jobs for >> time 1476100946000 ms >> 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from persistence >> list >> 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job >> 1476100946000 ms.0 from job set of time 1476100946000 ms >> >> >> 2016-10-11 9:28 GMT+02:00 Matthias Niehoff <matthias.niehoff@codecentric. >> de>: >> >>> This Job will fail after about 5 minutes: >>> >>> >>> object SparkJobMinimal { >>> >>> //read Avro schemas >>> var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc") >>> val avroSchemaAdRequest = >>> scala.io.Source.fromInputStream(stream).getLines.mkString >>> stream.close >>> stream = getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc") >>> val avroSchemaEvent = >>> scala.io.Source.fromInputStream(stream).getLines.mkString >>> stream.close >>> >>> >>> val kafkaBrokers = "broker-0.kafka.mesos:9092" >>> >>> val topic_adrequest = "adserving.log.ad_request" >>> val topic_view = "adserving.log.view" >>> val topic_click = "adserving.log.click" >>> val group = UUID.randomUUID().toString >>> val streaming_interval_seconds = 2 >>> >>> val kafkaParams = Map[String, String]( >>> "bootstrap.servers" -> kafkaBrokers, >>> "group.id" -> group, >>> "key.deserializer" -> classOf[StringDeserializer].getName, >>> "value.deserializer" -> classOf[Byte
Re: Problems with new experimental Kafka Consumer for 0.10
I re-ran the job with DEBUG Log Level for org.apache.spark, kafka.consumer and org.apache.kafka. Please find the output here: http://pastebin.com/VgtRUQcB most of the delay is introduced by *16/10/11 13:20:12 DEBUG RecurringTimer: Callback for JobGenerator called at time x*, which repeats multiple times, until about on minute has passed. I think this class is responsible for the endless loop, scheduling the microbatches, but I do not know exactly what it does and why it has a problem with multiple Kafka Direct Streams. 2016-10-11 11:46 GMT+02:00 Matthias Niehoff <matthias.nieh...@codecentric.de >: > I stripped down the job to just consume the stream and print it, without > avro deserialization. When I only consume one topic, everything is fine. As > soon as I add a second stream it stucks after about 5 minutes. So I > basically bails down to: > > > val kafkaParams = Map[String, String]( > "bootstrap.servers" -> kafkaBrokers, > "group.id" -> group, > "key.deserializer" -> classOf[StringDeserializer].getName, > "value.deserializer" -> classOf[BytesDeserializer].getName, > "session.timeout.ms" -> s"${1 * 60 * 1000}", > "request.timeout.ms" -> s"${2 * 60 * 1000}", > "auto.offset.reset" -> "latest", > "enable.auto.commit" -> "false" > ) > > def main(args: Array[String]) { > > def createStreamingContext(): StreamingContext = { > > val sparkConf = new SparkConf(true) > .setAppName("Kafka Consumer Test") > sparkConf.setMaster("local[*]") > > val ssc = new StreamingContext(sparkConf, > Seconds(streaming_interval_seconds)) > > // AD REQUESTS > // === > val serializedAdRequestStream = createStream(ssc, topic_adrequest) > serializedAdRequestStream.map(record => record.value().get()).print(10) > > // VIEWS > // == > val serializedViewStream = createStream(ssc, topic_view) > serializedViewStream.map(record => record.value().get()).print(10) > > // // CLICKS > // // == > // val serializedClickStream = createStream(ssc, topic_click) > // serializedClickStream.map(record => record.value().get()).print(10) > > ssc > } > > val streamingContext = createStreamingContext > streamingContext.start() > streamingContext.awaitTermination() > } > > > And in the logs you see: > > > 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job 1476100944000 > ms.2 from job set of time 1476100944000 ms*16/10/10 14:02:26 *INFO > JobScheduler: Total delay: 2,314 s for time 1476100944000 ms (execution: > 0,698 s)*16/10/10 14:03:26 *INFO JobScheduler: Added jobs for time > 1476100946000 ms > 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from persistence > list > 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job 1476100946000 > ms.0 from job set of time 1476100946000 ms > > > 2016-10-11 9:28 GMT+02:00 Matthias Niehoff <matthias.niehoff@codecentric. > de>: > >> This Job will fail after about 5 minutes: >> >> >> object SparkJobMinimal { >> >> //read Avro schemas >> var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc") >> val avroSchemaAdRequest = >> scala.io.Source.fromInputStream(stream).getLines.mkString >> stream.close >> stream = getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc") >> val avroSchemaEvent = >> scala.io.Source.fromInputStream(stream).getLines.mkString >> stream.close >> >> >> val kafkaBrokers = "broker-0.kafka.mesos:9092" >> >> val topic_adrequest = "adserving.log.ad_request" >> val topic_view = "adserving.log.view" >> val topic_click = "adserving.log.click" >> val group = UUID.randomUUID().toString >> val streaming_interval_seconds = 2 >> >> val kafkaParams = Map[String, String]( >> "bootstrap.servers" -> kafkaBrokers, >> "group.id" -> group, >> "key.deserializer" -> classOf[StringDeserializer].getName, >> "value.deserializer" -> classOf[BytesDeserializer].getName, >> "session.timeout.ms" -> s"${1 * 60 * 1000}", >> "request.timeout.ms" -> s"${2 * 60 * 1000}", >> "auto.offset.reset" -> "latest", >> "enable.auto.commit" -> "false" >&
Re: Problems with new experimental Kafka Consumer for 0.10
I stripped down the job to just consume the stream and print it, without avro deserialization. When I only consume one topic, everything is fine. As soon as I add a second stream it stucks after about 5 minutes. So I basically bails down to: val kafkaParams = Map[String, String]( "bootstrap.servers" -> kafkaBrokers, "group.id" -> group, "key.deserializer" -> classOf[StringDeserializer].getName, "value.deserializer" -> classOf[BytesDeserializer].getName, "session.timeout.ms" -> s"${1 * 60 * 1000}", "request.timeout.ms" -> s"${2 * 60 * 1000}", "auto.offset.reset" -> "latest", "enable.auto.commit" -> "false" ) def main(args: Array[String]) { def createStreamingContext(): StreamingContext = { val sparkConf = new SparkConf(true) .setAppName("Kafka Consumer Test") sparkConf.setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(streaming_interval_seconds)) // AD REQUESTS // === val serializedAdRequestStream = createStream(ssc, topic_adrequest) serializedAdRequestStream.map(record => record.value().get()).print(10) // VIEWS // == val serializedViewStream = createStream(ssc, topic_view) serializedViewStream.map(record => record.value().get()).print(10) // // CLICKS // // == // val serializedClickStream = createStream(ssc, topic_click) // serializedClickStream.map(record => record.value().get()).print(10) ssc } val streamingContext = createStreamingContext streamingContext.start() streamingContext.awaitTermination() } And in the logs you see: 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job 1476100944000 ms.2 from job set of time 1476100944000 ms*16/10/10 14:02:26 *INFO JobScheduler: Total delay: 2,314 s for time 1476100944000 ms (execution: 0,698 s)*16/10/10 14:03:26 *INFO JobScheduler: Added jobs for time 1476100946000 ms 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from persistence list 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job 1476100946000 ms.0 from job set of time 1476100946000 ms 2016-10-11 9:28 GMT+02:00 Matthias Niehoff <matthias.nieh...@codecentric.de> : > This Job will fail after about 5 minutes: > > > object SparkJobMinimal { > > //read Avro schemas > var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc") > val avroSchemaAdRequest = > scala.io.Source.fromInputStream(stream).getLines.mkString > stream.close > stream = getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc") > val avroSchemaEvent = > scala.io.Source.fromInputStream(stream).getLines.mkString > stream.close > > > val kafkaBrokers = "broker-0.kafka.mesos:9092" > > val topic_adrequest = "adserving.log.ad_request" > val topic_view = "adserving.log.view" > val topic_click = "adserving.log.click" > val group = UUID.randomUUID().toString > val streaming_interval_seconds = 2 > > val kafkaParams = Map[String, String]( > "bootstrap.servers" -> kafkaBrokers, > "group.id" -> group, > "key.deserializer" -> classOf[StringDeserializer].getName, > "value.deserializer" -> classOf[BytesDeserializer].getName, > "session.timeout.ms" -> s"${1 * 60 * 1000}", > "request.timeout.ms" -> s"${2 * 60 * 1000}", > "auto.offset.reset" -> "latest", > "enable.auto.commit" -> "false" > ) > > def main(args: Array[String]) { > > def createStreamingContext(): StreamingContext = { > > val sparkConf = new SparkConf(true) > .setAppName("Kafka Consumer Test") > sparkConf.setMaster("local[*]") > > > val ssc = new StreamingContext(sparkConf, > Seconds(streaming_interval_seconds)) > > // AD REQUESTS > // === > val serializedAdRequestStream = createStream(ssc, topic_adrequest) > > val adRequestStream = deserializeStream(serializedAdRequestStream, > avroSchemaAdRequest, record => AdRequestLog(record)).cache() > adRequestStream.print(10) > > // VIEWS > // == > > val serializedViewStream = createStream(ssc, topic_view) > val viewStream = deserializeStream(serializedViewStream, > avroSchemaEvent, record => Event(record, EventType.View)).cache() > viewStream.print(10) > > > // CLICKS > // == > val serializedClickStream
Re: Problems with new experimental Kafka Consumer for 0.10
This Job will fail after about 5 minutes: object SparkJobMinimal { //read Avro schemas var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc") val avroSchemaAdRequest = scala.io.Source.fromInputStream(stream).getLines.mkString stream.close stream = getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc") val avroSchemaEvent = scala.io.Source.fromInputStream(stream).getLines.mkString stream.close val kafkaBrokers = "broker-0.kafka.mesos:9092" val topic_adrequest = "adserving.log.ad_request" val topic_view = "adserving.log.view" val topic_click = "adserving.log.click" val group = UUID.randomUUID().toString val streaming_interval_seconds = 2 val kafkaParams = Map[String, String]( "bootstrap.servers" -> kafkaBrokers, "group.id" -> group, "key.deserializer" -> classOf[StringDeserializer].getName, "value.deserializer" -> classOf[BytesDeserializer].getName, "session.timeout.ms" -> s"${1 * 60 * 1000}", "request.timeout.ms" -> s"${2 * 60 * 1000}", "auto.offset.reset" -> "latest", "enable.auto.commit" -> "false" ) def main(args: Array[String]) { def createStreamingContext(): StreamingContext = { val sparkConf = new SparkConf(true) .setAppName("Kafka Consumer Test") sparkConf.setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(streaming_interval_seconds)) // AD REQUESTS // === val serializedAdRequestStream = createStream(ssc, topic_adrequest) val adRequestStream = deserializeStream(serializedAdRequestStream, avroSchemaAdRequest, record => AdRequestLog(record)).cache() adRequestStream.print(10) // VIEWS // == val serializedViewStream = createStream(ssc, topic_view) val viewStream = deserializeStream(serializedViewStream, avroSchemaEvent, record => Event(record, EventType.View)).cache() viewStream.print(10) // CLICKS // == val serializedClickStream = createStream(ssc, topic_click) val clickEventStream = deserializeStream(serializedClickStream, avroSchemaEvent, record => Event(record, EventType.Click)).cache() clickEventStream.print(10) ssc } val streamingContext = createStreamingContext streamingContext.start() streamingContext.awaitTermination() } def createStream(ssc: StreamingContext, topic: String): InputDStream[ConsumerRecord[String, Bytes]] = { KafkaUtils.createDirectStream[String, Bytes](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, Bytes](Set(topic), kafkaParams)) } def deserializeStream[EventType: ClassTag](serializedAdRequestStream: InputDStream[ConsumerRecord[String, Bytes]], avroSchema: String, recordMapper: GenericRecord => EventType): DStream[EventType] = { serializedAdRequestStream.mapPartitions { iteratorOfMessages => val schema: Schema = new Schema.Parser().parse(avroSchema) val recordInjection = GenericAvroCodecs.toBinary(schema) iteratorOfMessages.map(message => { recordInjection.invert(message.value().get()) }).filter(_.isSuccess).map(_.get.asInstanceOf[GenericRecord]).map(recordMapper) } } } 2016-10-10 17:42 GMT+02:00 Matthias Niehoff <matthias.nieh...@codecentric.de >: > Yes, without commiting the data the consumer rebalances. > The job consumes 3 streams process them. When consuming only one stream it > runs fine. But when consuming three streams, even without joining them, > just deserialize the payload and trigger an output action it fails. > > I will prepare code sample. > > 2016-10-07 3:35 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >> OK, so at this point, even without involving commitAsync, you're >> seeing consumer rebalances after a particular batch takes longer than >> the session timeout? >> >> Do you have a minimal code example you can share? >> >> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff >> <matthias.nieh...@codecentric.de> wrote: >> > Hi, >> > sry for the late reply. A public holiday in Germany. >> > >> > Yes, its using a unique group id which no other job or consumer group is >> > using. I have increased the session.timeout to 1 minutes and set the >> > max.poll.rate to 1000. The processing takes ~1 second. >> > >> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> >> >> Well, I'd start at the first thing suggested by the error, namely that >> >> the group has rebalanced. >> >> >> >> Is that s
Re: Problems with new experimental Kafka Consumer for 0.10
Yes, without commiting the data the consumer rebalances. The job consumes 3 streams process them. When consuming only one stream it runs fine. But when consuming three streams, even without joining them, just deserialize the payload and trigger an output action it fails. I will prepare code sample. 2016-10-07 3:35 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > OK, so at this point, even without involving commitAsync, you're > seeing consumer rebalances after a particular batch takes longer than > the session timeout? > > Do you have a minimal code example you can share? > > On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff > <matthias.nieh...@codecentric.de> wrote: > > Hi, > > sry for the late reply. A public holiday in Germany. > > > > Yes, its using a unique group id which no other job or consumer group is > > using. I have increased the session.timeout to 1 minutes and set the > > max.poll.rate to 1000. The processing takes ~1 second. > > > > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >> > >> Well, I'd start at the first thing suggested by the error, namely that > >> the group has rebalanced. > >> > >> Is that stream using a unique group id? > >> > >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff > >> <matthias.nieh...@codecentric.de> wrote: > >> > Hi, > >> > > >> > the stacktrace: > >> > > >> > org.apache.kafka.clients.consumer.CommitFailedException: Commit > cannot > >> > be > >> > completed since the group has already rebalanced and assigned the > >> > partitions > >> > to another member. This means that the time between subsequent calls > to > >> > poll() was longer than the configured session.timeout.ms, which > >> > typically > >> > implies that the poll loop is spending too much time message > processing. > >> > You > >> > can address this either by increasing the session timeout or by > reducing > >> > the > >> > maximum size of batches returned in poll() with max.poll.records. > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals. > RequestFuture$1.onSuccess(RequestFuture.java:167) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals. > RequestFuture.fireSuccess(RequestFuture.java:133) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete( > RequestFuture.java:107) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$ > RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > >> > at org.apache.kafka.clients.NetworkClient.poll( > NetworkClient.java:278) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > clientPoll(ConsumerNetworkClient.java:360) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:998) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:937) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream. > latestOffsets(DirectKafkaInputDStream.scala:169) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute
Re: Problems with new experimental Kafka Consumer for 0.10
) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) at org.apache.spark.streaming.scheduler.JobGenerator.org $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) But it seems like the commit is not the actual problem. The job also falls behind if I do not commit the offsets. The delay would be ok if the processing time was bigger than the batch size, but thats not the case in any of the microbatches. Imho for some reason one of the microbatches falls behind more than session.timeout.ms. Then the consumer we regroup which takes about 1 minute (see timestamps below). Know begins a circle of slow batches each triggering a consumer regroup. Would this be possible? 16/09/28 *08:15:55* INFO JobScheduler: Total delay: 141.580 s for time 1475050414000 ms (execution: 0.360 s) --> *the job for 08:13:34* 16/09/28 *08:16:48* INFO AbstractCoordinator: Successfully joined group spark_aggregation_job-kafka010 with generation 6 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly assigned partitions [sapxm.adserving.log.ad_request-0, sapxm.adserving.log.ad_request-2, sapxm.adserving.log.ad_request-1, sapxm.adserving.log.ad_request-4, sapxm.adserving.log.ad_request-3, sapxm.adserving.log.ad_request-6, sapxm.adserving.log.ad_request-5, sapxm.adserving.log.ad_request-8, sapxm.adserving.log.ad_request-7, sapxm.adserving.log.ad_request-9] for group spark_aggregation_job-kafka010 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously assigned partitions [sapxm.adserving.log.view-3, sapxm.adserving.log.view-4, sapxm.adserving.log.view-1, sapxm.adserving.log.view-2, sapxm.adserving.log.view-0, sapxm.adserving.log.view-9, sapxm.adserving.log.view-7, sapxm.adserving.log.view-8, sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for group spark_aggregation_job-kafka010 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group spark_aggregation_job-kafka010 2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > What's the actual stacktrace / exception you're getting related to > commit failure? > > On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff > <matthias.nieh...@codecentric.de> wrote: > > Hi everybody, > > > > i am using the new Kafka Receiver for Spark Streaming for my Job. When > > running with old consumer it runs fine. > > > > The Job consumes 3 Topics, saves the data to Cassandra, cogroups the > topic, > > calls mapWithState and stores the results in cassandra. After that I > > manually commit the Kafka offsets using the commitAsync method of the > > KafkaDStream. > > > > With the new consumer I experience the following problem: > > > > After a certain amount of time (about 4-5 minutes, might be more or less) > > there are exceptions that the offset commit failed. The processing takes > > less than the batch interval. I also adjusted the session.timeout and > > request.timeout as well as the max.poll.records setting which did not > help. > > > > After the first offset commit failed the time it takes from kafka until > the > > microbatch is started increases, the processing time is constantly below > the > > batch interval. Moreover further offset commits also fail and as result > the > > delay time builds up. > > > > Has anybody made this experience as well? > > > > Thank you > > > > Relevant Kafka Parameters: > > > > "session.timeout.ms" -> s"${1 * 60 * 1000}", > > "request.timeout.ms" -> s"${2 * 60 * 1000}", > > "auto.offset.reset" -> "largest", > > "enable.auto.commit" -> "false", > > "max.poll.records" -> "1000" > > > > > > > > -- > > Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting > > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) > > 172.1702676 > > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | > > www.more4fi.de > > > > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > > Vorstand: Michael Hochgürtel . Mi
Problems with new experimental Kafka Consumer for 0.10
Hi everybody, i am using the new Kafka Receiver for Spark Streaming for my Job. When running with old consumer it runs fine. The Job consumes 3 Topics, saves the data to Cassandra, cogroups the topic, calls mapWithState and stores the results in cassandra. After that I manually commit the Kafka offsets using the commitAsync method of the KafkaDStream. With the new consumer I experience the following problem: After a certain amount of time (about 4-5 minutes, might be more or less) there are exceptions that the offset commit failed. The processing takes less than the batch interval. I also adjusted the session.timeout and request.timeout as well as the max.poll.records setting which did not help. After the first offset commit failed the time it takes from kafka until the microbatch is started increases, the processing time is constantly below the batch interval. Moreover further offset commits also fail and as result the delay time builds up. Has anybody made this experience as well? Thank you Relevant Kafka Parameters: "session.timeout.ms" -> s"${1 * 60 * 1000}", "request.timeout.ms" -> s"${2 * 60 * 1000}", "auto.offset.reset" -> "largest", "enable.auto.commit" -> "false", "max.poll.records" -> "1000" -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Re: How spark decides whether to do BroadcastHashJoin or SortMergeJoin
Hi, there is a property you can set. Quoting the docs ( http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options ) spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. 2016-07-20 10:07 GMT+02:00 raaggarw <raagg...@adobe.com>: > Hi, > > How spark decides/optimizes internally as to when it needs to a > BroadcastHashJoin vs SortMergeJoin? Is there anyway we can guide from > outside or through options which Join to use? > Because in my case when i am trying to do a join, spark makes that join as > BroadCastHashJoin internally and when join is actually being executed it > waits for broadcast to be done (which is big data), resulting in timeout. > I do not want to increase value of timeout i.e. > "spark.sql.broadcastTimeout". Rather i want this to be done via > SortMergeJoin. How can i enforce that? > > Thanks > Ravi > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-spark-decides-whether-to-do-BroadcastHashJoin-or-SortMergeJoin-tp27369.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > ----- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Structured Streaming and Microbatches
Hi everybody, as far as I understand with new the structured Streaming API the output will not get processed every x seconds anymore. Instead the data will be processed as soon as is arrived. But there might be a delay due to processing time for the data. A small example: Data comes in and the processing takes 1 second (quite long) In this 1 second a lot of new data come in which will be processed after the processing of the first data finished. My questions are: Is the data for each processing, i.e all the data collected in the 1 second still processed as a microbatch (included reprocessing in case of failure on another worker, etc.)? Or is the bulk of data processed one by one? With regards to the processing time: is the behavior the same for high processing times as in spark 1.x? Meaning we get a scheduling delay, data is stored by a receiver,.. (is there even a concept of receiver in Spark 2? Is a source in streaming basically a receiver?) Hope those questions aren’t to confusing :-) Thank you!
Substract two DStreams
Hi, i want to subtract 2 DStreams (based on the same Input Stream) to get all elements that exist in the original stream, but not in the modified stream (the modified Stream is changed using joinWithCassandraTable which does an inner join and because of this might remove entries). Subtract is only possible on RDDs. So I could use a foreachRDD right in the beginning of the Stream processing and work on rdds. I think its quite ugly to use the output op at the beginning and then implement a lot of transformations in the foreachRDD. So could you think of different ways to do an efficient diff between to DStreams? Thank you -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Re: about an exception when receiving data from kafka topic using Direct mode of Spark Streaming
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105 > > > This is the code that rises the exception in the spark streaming process: > > try{ > messages.foreachRDD( rdd =>{ > val count = rdd.count() > if (count > 0){ > //someMessages should be AmazonRating... > val someMessages = rdd.take(count.toInt) > println("<-->") > println("someMessages is " + someMessages) > someMessages.foreach(println) > println("<-->") > println("<---POSSIBLE SOLUTION--->") > messages > .map { case (_, jsonRating) => > val jsValue = Json.parse(jsonRating) > AmazonRating.amazonRatingFormat.reads(jsValue) match { > case JsSuccess(rating, _) => rating > case JsError(_) => AmazonRating.empty > } > } > .filter(_ != AmazonRating.empty) > *//I think that this line provokes the runtime exception...* > *.foreachRDD(_.foreachPartition(it => > recommender.predictWithALS(it.toSeq)))* > > println("<---POSSIBLE SOLUTION--->") > > } > } > ) > }catch{ > case e: IllegalArgumentException => {println("illegal arg. > exception")}; > case e: IllegalStateException=> {println("illegal state > exception")}; > case e: ClassCastException => {println("ClassCastException")}; > case e: Exception=> {println(" Generic Exception")}; > }finally{ > > println("Finished taking data from kafka topic...") > } > > Recommender object: > > *def predictWithALS(ratings: Seq[AmazonRating])* = { > // train model > val myRatings = ratings.map(toSparkRating) > val myRatingRDD = sc.parallelize(myRatings) > > val startAls = DateTime.now > val model = ALS.train((sparkRatings ++ > myRatingRDD).repartition(NumPartitions), 10, 20, 0.01) > > val myProducts = myRatings.map(_.product).toSet > val candidates = sc.parallelize((0 until > productDict.size).filterNot(myProducts.contains)) > > // get ratings of all products not in my history ordered by rating > (higher first) and only keep the first NumRecommendations > val myUserId = userDict.getIndex(MyUsername) > val recommendations = model.predict(candidates.map((myUserId, > _))).collect > val endAls = DateTime.now > val result = > recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating) > val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds > > println(s"ALS Time: $alsTime seconds") > result > } > } > > And this is the kafka producer that push the json data within the topic: > > object AmazonProducerExample { > def main(args: Array[String]): Unit = { > > val productId = args(0).toString > val userId = args(1).toString > val rating = args(2).toDouble > val topicName = "amazonRatingsTopic" > > val producer = Producer[String](topicName) > > //0981531679 is Scala Puzzlers... > //AmazonProductAndRating > AmazonPageParser.parse(productId,userId,rating).onSuccess { case > amazonRating => > //Is this the correct way? the best performance? possibly not, what > about using avro or parquet? > producer.send(Json.toJson(amazonRating).toString) > //producer.send(amazonRating) > println("amazon product with rating sent to kafka cluster..." + > amazonRating.toString) > System.exit(0) > } > > } > } > > > I have written a stack overflow post > <http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>, > with more details, please help, i am stuck with this issue and i don't know > how to continue. > > Regards > > Alonso Isidoro Roman > [image: https://]about.me/alonso.isidoro.roman > > <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links> > > -- > View this message in context: about an exception when receiving data from > kafka topic using Direct mode of Spark Streaming > <http://apache-spark-user-list.1001560.n3.nabble.com/about-an-exception-when-receiving-data-from-kafka-topic-using-Direct-mode-of-Spark-Streami
Sliding Average over Window in Spark Streaming
Hi, If i want to have a sliding average over the 10 minutes for some keys I can do something like groupBy(window(…),“my-key“).avg(“some-values“) in Spark 2.0 I try to implement this sliding average using Spark 1.6.x: I tried with reduceByKeyAndWindow but it did not find a solution. Imo i have to keep all the values in the window to compute the average. One way would be add every new value to a list in the reduce method and then to the avg computation in a separate map, but this seems kind of ugly. Do you have an idea how to solve this? Thanks! -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Re: Please assist: Spark 1.5.2 / cannot find StateSpec / State
The StateSpec and the mapWithState method is only available in Spark 1.6.x 2016-04-13 11:34 GMT+02:00 Marco Mistroni <mmistr...@gmail.com>: > hi all > i am trying to replicate the Streaming Wordcount example described here > > > https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala > > in my build,sbt i have the following dependencies > > . > libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2" % > "provided" > libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.2" > % "provided" > libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.2" % > "provided" > libraryDependencies += "org.apache.spark" %% "spark-streaming-flume" % > "1.3.0" % "provided" > ... > But compilations fail mentioning that class StateSpec and State are not > found > > Could pls someone point me to the right packages to refer if i want to use > StateSpec? > > kind regards > marco > > > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Do not wrap result of a UDAF in an Struct
Hi, given is a simple DF: root |-- id1: string (nullable = true) |-- id2: string (nullable = true) |-- val: string (nullable = true) I run an UDAF on this DF with groupBy($“id1“,$“id2“).agg(udaf($“val“) as „valsStruct“). The aggregates simply stores all val in Set. The result is: root |-- id1: string (nullable = true) |-- id2: integer (nullable = true) |-- valsStruct: struct (nullable = true) ||-- vals: array (nullable = true) |||-- element: string (containsNull = true) But i would expect: root |-- id1: string (nullable = true) |-- id2: integer (nullable = true) |-- vals: array (nullable = true) ||— element: string (containsNull = true) What I’m doing right now is to add a new columns val with valsStruct.vals as a value and drop valsStruct afterwards, but i’m quite sure there is a more elegant way. I tried various implementations of the evaluate method, but none of those worked for me. Can you tell me what I am missing here? The implementation of the UDAF: class AggregateVals extends UserDefinedAggregateFunction { def inputSchema: StructType = StructType(Array( StructField("val", StringType, true) )) def bufferSchema: StructType = StructType(Array( StructField("vals", ArrayType(StringType, true)) )) def dataType: DataType = bufferSchema def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = Seq[String]() } def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val existing: Seq[String] = buffer.getSeq[String](0) val newBuffer = existing :+ input.getAs[String](0) buffer(0) = newBuffer } def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0) } def evaluate(buffer: Row): Any = { buffer } } -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
DataFrames UDAF with array and struct
Hello Everybody, I want to write an UDAF for DataFrames where the Buffer Schema is a ArrayType(StructType(List(StructField(String), StructField(String),StructField(String When I want to access the buffer in the update() or merge() method the ArrayType gets returned as a List. But what is the Type of the List? Or in other words: What is mapping of StructType with StructFields into Scala collection/data types? Thanks! -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Re: Spark job for Reading time series data from Cassandra
Hi, the spark connector docs say: ( https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md ) "The number of Spark partitions(tasks) created is directly controlled by the setting spark.cassandra.input.split.size_in_mb. This number reflects the approximate amount of Cassandra Data in any given Spark partition. To increase the number of Spark Partitions decrease this number from the default (64mb) to one that will sufficiently break up your C* token range. „ So, maybe your partitions are quite big? 2016-03-10 16:46 GMT+01:00 Bryan Jeffrey <bryan.jeff...@gmail.com>: > Prateek, > > I believe that one task is created per Cassandra partition. How is your > data partitioned? > > Regards, > > Bryan Jeffrey > > On Thu, Mar 10, 2016 at 10:36 AM, Prateek . <prat...@aricent.com> wrote: > >> Hi, >> >> >> >> I have a Spark Batch job for reading timeseries data from Cassandra which >> has 50,000 rows. >> >> >> >> >> >> JavaRDD cassandraRowsRDD = javaFunctions.cassandraTable("iotdata", >> "coordinate") >> >> .map(*new* Function<CassandraRow, String>() { >> >> @Override >> >> *public* String call(CassandraRow cassandraRow) >> *throws* Exception { >> >> *return* cassandraRow.toString(); >> >> } >> >> }); >> >> >> >> List lm = cassandraRowsRDD.collect(); >> >> >> >> >> >> I am testing in local mode where I am observing Spark is creating 770870 >> tasks (one job, one stage) which is taking many hours to complete. Can any >> please suggest, what could be possible issues. >> >> >> >> >> >> *Stage Id* >> >> *Description* >> >> *Submitted* >> >> *Duration* >> >> *Tasks: Succeeded/Total* >> >> *Input* >> >> *Output* >> >> *Shuffle Read* >> >> *Shuffle Write* >> >> 0 >> >> collect at CassandraSpark.java:94 >> <http://localhost:4040/stages/stage?id=0=0>+details >> >> 2016/03/10 21:01:15 >> >> 9 s >> >> 137/*770870* >> >> >> >> >> >> Thank You >> >> >> >> Prateek >> "DISCLAIMER: This message is proprietary to Aricent and is intended >> solely for the use of the individual to whom it is addressed. It may >> contain privileged or confidential information and should not be circulated >> or used for any purpose other than for what it is intended. If you have >> received this message in error, please notify the originator immediately. >> If you are not the intended recipient, you are notified that you are >> strictly prohibited from using, copying, altering, or disclosing the >> contents of this message. Aricent accepts no responsibility for loss or >> damage arising from the use of the information transmitted by this email >> including damage from virus." >> > > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Re: Streaming job delays
Hi, dynamic allocation is afaik not supported for streaming applications, thats maybe a reason. See also: https://mail-archives.apache.org/mod_mbox/spark-user/201510.mbox/%3CCA+AHuKkxg44WvXZGr4MVNUxioWH3o8pZZQRTaXR=m5cb-op...@mail.gmail.com%3E If you are using Spark 1.6 there should also be a warning about using dynamic allocation in Streaming mode. 2016-03-09 17:45 GMT+01:00 Juan Leaniz <juan.lea...@gmail.com>: > Hi > > Batch interval is 5min. I actually managed to fix the issue by turning off > dynamic allocation and the external shuffle service. > > This seems to have helped and now the scheduling delay is between 0-5ms > and processing time is about 2.8min which is lower than my batch interval. > > I also noticed that enabling dynamic allocation and the external shuffle > service had a high impact on cpu usage. > > Thanks > Juan > > On Wed, Mar 9, 2016 at 6:00 AM, Matthias Niehoff < > matthias.nieh...@codecentric.de> wrote: > >> hi, >> >> What’s your batch interval? if the processing time is constantly bigger >> than your batch interval it is totally normal that your scheduling delay is >> going up. >> >> 2016-03-08 23:28 GMT+01:00 jleaniz <juan.lea...@gmail.com>: >> >>> Hi, >>> >>> I have a streaming application that reads batches from Flume, does some >>> transformations and then writes parquet files to HDFS. >>> >>> The problem I have right now is that the scheduling delays are really >>> really >>> high, and get even higher as time goes. Have seen it go up to 24 hours. >>> The >>> processing time for each batch is usually steady at 50s or less. >>> >>> The workers and master are pretty much idle most of the time. Any ideas >>> why >>> the scheduling time would be so high when the processing time is low? >>> >>> Thanks >>> >>> Juan >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-job-delays-tp26433.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >> >> -- >> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >> 172.1702676 >> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >> www.more4fi.de >> >> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >> Schütz >> >> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und >> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >> E-Mail ist nicht gestattet >> > > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Re: Streaming job delays
hi, What’s your batch interval? if the processing time is constantly bigger than your batch interval it is totally normal that your scheduling delay is going up. 2016-03-08 23:28 GMT+01:00 jleaniz <juan.lea...@gmail.com>: > Hi, > > I have a streaming application that reads batches from Flume, does some > transformations and then writes parquet files to HDFS. > > The problem I have right now is that the scheduling delays are really > really > high, and get even higher as time goes. Have seen it go up to 24 hours. The > processing time for each batch is usually steady at 50s or less. > > The workers and master are pretty much idle most of the time. Any ideas why > the scheduling time would be so high when the processing time is low? > > Thanks > > Juan > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-job-delays-tp26433.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Re: Add Jars to Master/Worker classpath
Hi, driver and executor path does not work because its for the driver and executor and not for the master and worker jvm. It works fine for driver/executor but we want to add classes to the master/worker. The SPARK_DIST_CLASSPATH looks good, will try this! Thanks! 2016-03-02 18:35 GMT+01:00 Sumedh Wale <sw...@snappydata.io>: > On Wednesday 02 March 2016 09:39 PM, Matthias Niehoff wrote: > > no, not to driver and executor but to the master and worker instances of > the spark standalone cluster > > > Why exactly does adding jars to driver/executor extraClassPath not work? > > Classpath of master/worker is setup by AbstractCommandBuilder that > explicitly adds the following: > > jars named "datanucleus-*", environment variables: _SPARK_ASSEMBLY (for > assembly jar), SPARK_DIST_CLASSPATH, HADOOP_CONF_DIR, YARN_CONF_DIR > > So you can set SPARK_DIST_CLASSPATH in conf/spark-env.sh to add the > required jars (separated by platform's File.pathSeparator). > > > thanks > > -- > Sumedh Wale > SnappyData (http://www.snappydata.io) > > > Am 2. März 2016 um 17:05 schrieb Igor Berman <igor.ber...@gmail.com>: > >> spark.driver.extraClassPath >> spark.executor.extraClassPath >> >> 2016-03-02 18:01 GMT+02:00 Matthias Niehoff < >> <matthias.nieh...@codecentric.de>matthias.nieh...@codecentric.de>: >> >>> Hi, >>> >>> we want to add jars to the Master and Worker class path mainly for >>> logging reason (we have a redis appender to send logs to redis -> logstash >>> -> elasticsearch). >>> >>> While it is working with setting SPARK_CLASSPATH, this solution is >>> afaik deprecated and should not be used. Furthermore we are also using >>> —driver-java-options >>> and spark.executor.extraClassPath which leads to exceptions when >>> running our apps in standalone cluster mode. >>> >>> So what is the best way to add jars to the master and worker classpath? >>> >>> Thank you >>> >>> -- >>> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >>> tel: +49 (0) 721.9595-681 <%2B49%20%280%29%20721.9595-681> | fax: +49 >>> (0) 721.9595-666 <%2B49%20%280%29%20721.9595-666> | mobil: +49 (0) >>> 172.1702676 <%2B49%20%280%29%20172.1702676> >>> <http://www.codecentric.de/>www.codecentric.de | blog.codecentric.de | >>> www.meettheexperts.de | www.more4fi.de >>> >>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >>> Schütz >>> >>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und >>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >>> E-Mail ist nicht gestattet >>> >> >> > > > -- > Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) > 172.1702676 > <http://www.codecentric.de/>www.codecentric.de | blog.codecentric.de | > <http://www.meettheexperts.de/>www.meettheexperts.de | > <http://www.more4fi.de/>www.more4fi.de > > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz > > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. > beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen > evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist > nicht gestattet > > > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +4
Re: Add Jars to Master/Worker classpath
no, not to driver and executor but to the master and worker instances of the spark standalone cluster Am 2. März 2016 um 17:05 schrieb Igor Berman <igor.ber...@gmail.com>: > spark.driver.extraClassPath > spark.executor.extraClassPath > > 2016-03-02 18:01 GMT+02:00 Matthias Niehoff < > matthias.nieh...@codecentric.de>: > >> Hi, >> >> we want to add jars to the Master and Worker class path mainly for >> logging reason (we have a redis appender to send logs to redis -> logstash >> -> elasticsearch). >> >> While it is working with setting SPARK_CLASSPATH, this solution is afaik >> deprecated and should not be used. Furthermore we are also using >> —driver-java-options >> and spark.executor.extraClassPath which leads to exceptions when running >> our apps in standalone cluster mode. >> >> So what is the best way to add jars to the master and worker classpath? >> >> Thank you >> >> -- >> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >> 172.1702676 >> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >> www.more4fi.de >> >> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >> Schütz >> >> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und >> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >> E-Mail ist nicht gestattet >> > > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Add Jars to Master/Worker classpath
Hi, we want to add jars to the Master and Worker class path mainly for logging reason (we have a redis appender to send logs to redis -> logstash -> elasticsearch). While it is working with setting SPARK_CLASSPATH, this solution is afaik deprecated and should not be used. Furthermore we are also using —driver-java-options and spark.executor.extraClassPath which leads to exceptions when running our apps in standalone cluster mode. So what is the best way to add jars to the master and worker classpath? Thank you -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Re: Using jar bundled log4j.xml on worker nodes
mh, that seems to be the problem we are facing. but with —files you can just pass local files, no files in the class path. so we would need a file outside of our jar.. 2016-02-04 18:20 GMT+01:00 Ted Yu <yuzhih...@gmail.com>: > Have you taken a look at SPARK-11105 ? > > Cheers > > On Thu, Feb 4, 2016 at 9:06 AM, Matthias Niehoff < > matthias.nieh...@codecentric.de> wrote: > >> Hello everybody, >> >> we’ve bundle our log4j.xml into our jar (in the classpath root). >> >> I’ve added the log4j.xml to the spark-defaults.conf with >> >> spark.{driver,executor}.extraJavaOptions=-Dlog4j.configuration=log4j.xml >> >> There is no log4j.properties or log4j.xml in one of the conf folders on >> any machine. >> >> When I start the app the driver is using our log4j.xml, but all the >> executors use the default log4j.properties („Using Spark’s default log4j >> profile: org/apache/spark/log4j-defaults.properties“). >> >> What do I have to change to make spark use the log4j.xml from our jar >> also on our executors? >> >> We are using Spark 1.5.2 >> >> Thank you! >> >> -- >> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >> 172.1702676 >> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >> www.more4fi.de >> >> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >> Schütz >> >> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und >> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >> E-Mail ist nicht gestattet >> > > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Using jar bundled log4j.xml on worker nodes
Hello everybody, we’ve bundle our log4j.xml into our jar (in the classpath root). I’ve added the log4j.xml to the spark-defaults.conf with spark.{driver,executor}.extraJavaOptions=-Dlog4j.configuration=log4j.xml There is no log4j.properties or log4j.xml in one of the conf folders on any machine. When I start the app the driver is using our log4j.xml, but all the executors use the default log4j.properties („Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties“). What do I have to change to make spark use the log4j.xml from our jar also on our executors? We are using Spark 1.5.2 Thank you! -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Logs of Custom Receiver
Hi, I've built a customer receiver and deployed an application using this receiver to a cluster. When I run the application locally I see the log output my logger in Stdout/Stderr but when I run it on the cluster I don't see the log output in Stdout/Stderr. I just see the logs in the constructor but no following statements in the start() method and other methods called from there. (All log at the same level) Where do I find this log statements? Thanks! -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet
Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, Spark 1.5.1)
Hello everybody, I have a few (~15) Spark Streaming jobs which have load peaks as well as long times with a low load. So I thought the new Dynamic Resource Allocation for Standalone Clusters might be helpful (SPARK-4751). I have a test "cluster" with 1 worker consisting of 4 executors with 2 cores each, so 8 cores in total. I started a simple streaming application without limiting the max cores for this app. As expected the app occupied every core of the cluster. Then I started a second app, also without limiting the maximum cores. As the first app did not get any input through the stream, my naive expectation was that the second app would get at least 2 cores (1 receiver, 1 processing), but that's not what happened. The cores are still assigned to the first app. When I look at the application UI of the first app every executor is still running. That explains why no executor is used for the second app. I end up with two questions: - When does an executor getting idle in a Spark Streaming application? (and so could be reassigned to another app) - Is there another way to compete with uncertain load when using Spark Streaming Applications? I already combined multiple jobs to a Spark Application using different threads, but this approach comes to a limit for me, because Spark Applications get to big to manage. Thank You!