Map is lazy. You need an actual action, or nothing will happen. Use foreachPartition, or do an empty foreach after the map.
On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.na...@gmail.com> wrote: > Dears, > > I needs to commit DB Transaction for each partition,Not for each row. > below didn't work for me. > > > rdd.mapPartitions(partitionOfRecords => { > > DBConnectionInit() > > val results = partitionOfRecords.map(......) > > DBConnection.commit() > > > }) > > > > Best regards, > > Ahmed Atef Nawwar > > Data Management & Big Data Consultant > > > > > > > On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Your kafka broker died or you otherwise had a rebalance. >> >> Normally spark retries take care of that. >> >> Is there something going on with your kafka installation, that rebalance >> is taking especially long? >> >> Yes, increasing backoff / max number of retries will "help", but it's >> better to figure out what's going on with kafka. >> >> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Hi >>> >>> My streaming application gets killed with below error >>> >>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>> kafka.common.NotLeaderForPartitionException, >>> kafka.common.NotLeaderForPartitionException, >>> kafka.common.NotLeaderForPartitionException, >>> kafka.common.NotLeaderForPartitionException, >>> org.apache.spark.SparkException: Couldn't find leader offsets for >>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >>> [testtopic,193])) >>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs >>> for time 1440626120000 ms >>> org.apache.spark.SparkException: >>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>> org.apache.spark.SparkException: Couldn't find leader offsets for >>> Set([testtopic,115])) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>> at >>> >>> >>> >>> Kafka params in job logs printed are : >>> value.serializer = class >>> org.apache.kafka.common.serialization.StringSerializer >>> key.serializer = class >>> org.apache.kafka.common.serialization.StringSerializer >>> block.on.buffer.full = true >>> retry.backoff.ms = 100 >>> buffer.memory = 1048576 >>> batch.size = 16384 >>> metrics.sample.window.ms = 30000 >>> metadata.max.age.ms = 300000 >>> receive.buffer.bytes = 32768 >>> timeout.ms = 30000 >>> max.in.flight.requests.per.connection = 5 >>> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] >>> metric.reporters = [] >>> client.id = >>> compression.type = none >>> retries = 0 >>> max.request.size = 1048576 >>> send.buffer.bytes = 131072 >>> acks = all >>> reconnect.backoff.ms = 10 >>> linger.ms = 0 >>> metrics.num.samples = 2 >>> metadata.fetch.timeout.ms = 60000 >>> >>> >>> Is it kafka broker getting down and job is getting killed ? Whats the >>> best way to handle it ? >>> Increasing retries and backoff time wil help and to what values those >>> should be set to never have streaming application failure - rather it keep >>> on retrying after few seconds and send a event so that my custom code can >>> send notification of kafka broker down if its because of that. >>> >>> >>> Thanks >>> >>> >> >