Yes, of course, I am doing that. But once i added results.foreach(row=> {}) i pot empty RDD.
rdd.mapPartitions(partitionOfRecords => { DBConnectionInit() val results = partitionOfRecords.map(......) DBConnection.commit() results.foreach(row=> {}) results }) On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <c...@koeninger.org> wrote: > You need to return an iterator from the closure you provide to > mapPartitions > > On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ahmed.na...@gmail.com> > wrote: > >> Thanks for foreach idea. But once i used it i got empty rdd. I think >> because "results" is an iterator. >> >> Yes i know "Map is lazy" but i expected there is solution to force action. >> >> I can not use foreachPartition because i need reuse the new RDD after >> some maps. >> >> >> >> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> >>> Map is lazy. You need an actual action, or nothing will happen. Use >>> foreachPartition, or do an empty foreach after the map. >>> >>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.na...@gmail.com> >>> wrote: >>> >>>> Dears, >>>> >>>> I needs to commit DB Transaction for each partition,Not for each >>>> row. below didn't work for me. >>>> >>>> >>>> rdd.mapPartitions(partitionOfRecords => { >>>> >>>> DBConnectionInit() >>>> >>>> val results = partitionOfRecords.map(......) >>>> >>>> DBConnection.commit() >>>> >>>> >>>> }) >>>> >>>> >>>> >>>> Best regards, >>>> >>>> Ahmed Atef Nawwar >>>> >>>> Data Management & Big Data Consultant >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> Your kafka broker died or you otherwise had a rebalance. >>>>> >>>>> Normally spark retries take care of that. >>>>> >>>>> Is there something going on with your kafka installation, that >>>>> rebalance is taking especially long? >>>>> >>>>> Yes, increasing backoff / max number of retries will "help", but it's >>>>> better to figure out what's going on with kafka. >>>>> >>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora < >>>>> shushantaror...@gmail.com> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> My streaming application gets killed with below error >>>>>> >>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>>>> kafka.common.NotLeaderForPartitionException, >>>>>> kafka.common.NotLeaderForPartitionException, >>>>>> kafka.common.NotLeaderForPartitionException, >>>>>> kafka.common.NotLeaderForPartitionException, >>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >>>>>> [testtopic,193])) >>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs >>>>>> for time 1440626120000 ms >>>>>> org.apache.spark.SparkException: >>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>>> Set([testtopic,115])) >>>>>> at >>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>>>>> at >>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>>>> at >>>>>> >>>>>> >>>>>> >>>>>> Kafka params in job logs printed are : >>>>>> value.serializer = class >>>>>> org.apache.kafka.common.serialization.StringSerializer >>>>>> key.serializer = class >>>>>> org.apache.kafka.common.serialization.StringSerializer >>>>>> block.on.buffer.full = true >>>>>> retry.backoff.ms = 100 >>>>>> buffer.memory = 1048576 >>>>>> batch.size = 16384 >>>>>> metrics.sample.window.ms = 30000 >>>>>> metadata.max.age.ms = 300000 >>>>>> receive.buffer.bytes = 32768 >>>>>> timeout.ms = 30000 >>>>>> max.in.flight.requests.per.connection = 5 >>>>>> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] >>>>>> metric.reporters = [] >>>>>> client.id = >>>>>> compression.type = none >>>>>> retries = 0 >>>>>> max.request.size = 1048576 >>>>>> send.buffer.bytes = 131072 >>>>>> acks = all >>>>>> reconnect.backoff.ms = 10 >>>>>> linger.ms = 0 >>>>>> metrics.num.samples = 2 >>>>>> metadata.fetch.timeout.ms = 60000 >>>>>> >>>>>> >>>>>> Is it kafka broker getting down and job is getting killed ? Whats the >>>>>> best way to handle it ? >>>>>> Increasing retries and backoff time wil help and to what values >>>>>> those should be set to never have streaming application failure - rather >>>>>> it >>>>>> keep on retrying after few seconds and send a event so that my custom >>>>>> code >>>>>> can send notification of kafka broker down if its because of that. >>>>>> >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>> >>>> >>> >> >