You need to return an iterator from the closure you provide to mapPartitions
On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ahmed.na...@gmail.com> wrote: > Thanks for foreach idea. But once i used it i got empty rdd. I think > because "results" is an iterator. > > Yes i know "Map is lazy" but i expected there is solution to force action. > > I can not use foreachPartition because i need reuse the new RDD after some > maps. > > > > On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> >> Map is lazy. You need an actual action, or nothing will happen. Use >> foreachPartition, or do an empty foreach after the map. >> >> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.na...@gmail.com> >> wrote: >> >>> Dears, >>> >>> I needs to commit DB Transaction for each partition,Not for each >>> row. below didn't work for me. >>> >>> >>> rdd.mapPartitions(partitionOfRecords => { >>> >>> DBConnectionInit() >>> >>> val results = partitionOfRecords.map(......) >>> >>> DBConnection.commit() >>> >>> >>> }) >>> >>> >>> >>> Best regards, >>> >>> Ahmed Atef Nawwar >>> >>> Data Management & Big Data Consultant >>> >>> >>> >>> >>> >>> >>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Your kafka broker died or you otherwise had a rebalance. >>>> >>>> Normally spark retries take care of that. >>>> >>>> Is there something going on with your kafka installation, that >>>> rebalance is taking especially long? >>>> >>>> Yes, increasing backoff / max number of retries will "help", but it's >>>> better to figure out what's going on with kafka. >>>> >>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> Hi >>>>> >>>>> My streaming application gets killed with below error >>>>> >>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>>> kafka.common.NotLeaderForPartitionException, >>>>> kafka.common.NotLeaderForPartitionException, >>>>> kafka.common.NotLeaderForPartitionException, >>>>> kafka.common.NotLeaderForPartitionException, >>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >>>>> [testtopic,193])) >>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs >>>>> for time 1440626120000 ms >>>>> org.apache.spark.SparkException: >>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>> Set([testtopic,115])) >>>>> at >>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>>>> at >>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>> at >>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>>> at >>>>> >>>>> >>>>> >>>>> Kafka params in job logs printed are : >>>>> value.serializer = class >>>>> org.apache.kafka.common.serialization.StringSerializer >>>>> key.serializer = class >>>>> org.apache.kafka.common.serialization.StringSerializer >>>>> block.on.buffer.full = true >>>>> retry.backoff.ms = 100 >>>>> buffer.memory = 1048576 >>>>> batch.size = 16384 >>>>> metrics.sample.window.ms = 30000 >>>>> metadata.max.age.ms = 300000 >>>>> receive.buffer.bytes = 32768 >>>>> timeout.ms = 30000 >>>>> max.in.flight.requests.per.connection = 5 >>>>> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] >>>>> metric.reporters = [] >>>>> client.id = >>>>> compression.type = none >>>>> retries = 0 >>>>> max.request.size = 1048576 >>>>> send.buffer.bytes = 131072 >>>>> acks = all >>>>> reconnect.backoff.ms = 10 >>>>> linger.ms = 0 >>>>> metrics.num.samples = 2 >>>>> metadata.fetch.timeout.ms = 60000 >>>>> >>>>> >>>>> Is it kafka broker getting down and job is getting killed ? Whats the >>>>> best way to handle it ? >>>>> Increasing retries and backoff time wil help and to what values those >>>>> should be set to never have streaming application failure - rather it keep >>>>> on retrying after few seconds and send a event so that my custom code can >>>>> send notification of kafka broker down if its because of that. >>>>> >>>>> >>>>> Thanks >>>>> >>>>> >>>> >>> >> >