Yes, of course, I am doing that. But once i added results.foreach(row=> {})
  i pot empty RDD.



rdd.mapPartitions(partitionOfRecords => {

DBConnectionInit()

val results = partitionOfRecords.map(......)

DBConnection.commit()

results.foreach(row=> {})

results

})



On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You need to return an iterator from the closure you provide to
> mapPartitions
>
> On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ahmed.na...@gmail.com>
> wrote:
>
>> Thanks for foreach idea. But once i used it i got empty rdd. I think
>> because "results" is an iterator.
>>
>> Yes i know "Map is lazy" but i expected there is solution to force action.
>>
>> I can not use foreachPartition because i need reuse the new RDD after
>> some maps.
>>
>>
>>
>> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>>
>>> Map is lazy.  You need an actual action, or nothing will happen.  Use
>>> foreachPartition, or do an empty foreach after the map.
>>>
>>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.na...@gmail.com>
>>> wrote:
>>>
>>>> Dears,
>>>>
>>>>     I needs to commit DB Transaction for each partition,Not for each
>>>> row. below didn't work for me.
>>>>
>>>>
>>>> rdd.mapPartitions(partitionOfRecords => {
>>>>
>>>> DBConnectionInit()
>>>>
>>>> val results = partitionOfRecords.map(......)
>>>>
>>>> DBConnection.commit()
>>>>
>>>>
>>>> })
>>>>
>>>>
>>>>
>>>> Best regards,
>>>>
>>>> Ahmed Atef Nawwar
>>>>
>>>> Data Management & Big Data Consultant
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Your kafka broker died or you otherwise had a rebalance.
>>>>>
>>>>> Normally spark retries take care of that.
>>>>>
>>>>> Is there something going on with your kafka installation, that
>>>>> rebalance is taking especially long?
>>>>>
>>>>> Yes, increasing backoff / max number of retries will "help", but it's
>>>>> better to figure out what's going on with kafka.
>>>>>
>>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>>>> shushantaror...@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> My streaming application gets killed with below error
>>>>>>
>>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>>>> [testtopic,193]))
>>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
>>>>>> for time 1440626120000 ms
>>>>>> org.apache.spark.SparkException:
>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>> Set([testtopic,115]))
>>>>>> at
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>>> at
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>>> at
>>>>>>
>>>>>>
>>>>>>
>>>>>> Kafka params in job logs printed are :
>>>>>>  value.serializer = class
>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>         key.serializer = class
>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>         block.on.buffer.full = true
>>>>>>         retry.backoff.ms = 100
>>>>>>         buffer.memory = 1048576
>>>>>>         batch.size = 16384
>>>>>>         metrics.sample.window.ms = 30000
>>>>>>         metadata.max.age.ms = 300000
>>>>>>         receive.buffer.bytes = 32768
>>>>>>         timeout.ms = 30000
>>>>>>         max.in.flight.requests.per.connection = 5
>>>>>>         bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>>>>>         metric.reporters = []
>>>>>>         client.id =
>>>>>>         compression.type = none
>>>>>>         retries = 0
>>>>>>         max.request.size = 1048576
>>>>>>         send.buffer.bytes = 131072
>>>>>>         acks = all
>>>>>>         reconnect.backoff.ms = 10
>>>>>>         linger.ms = 0
>>>>>>         metrics.num.samples = 2
>>>>>>         metadata.fetch.timeout.ms = 60000
>>>>>>
>>>>>>
>>>>>> Is it kafka broker getting down and job is getting killed ? Whats the
>>>>>> best way to handle it ?
>>>>>> Increasing retries and backoff time  wil help and to what values
>>>>>> those should be set to never have streaming application failure - rather 
>>>>>> it
>>>>>> keep on retrying after few seconds and send a event so that my custom 
>>>>>> code
>>>>>> can send notification of kafka broker down if its because of that.
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to