I'd start off by trying to simplify that closure - you don't need the
transform step, or currOffsetRanges to be scoped outside of it.  Just do
everything in foreachRDD.  LIkewise, it looks like zkClient is also scoped
outside of the closure passed to foreachRDD

i.e. you have

zkClient = new ZkClient(consumerConfig.zkConnec

instead of

val zkClient = new ZkClient(consumerConfig.zkConnec

On Mon, Aug 24, 2015 at 5:53 PM, suchenzang <suchenz...@gmail.com> wrote:

> When updating the ZK offset in the driver (within foreachRDD), there is
> somehow a serialization exception getting thrown:
>
> 15/08/24 15:45:40 ERROR JobScheduler: Error in job generator
> java.io.NotSerializableException: org.I0Itec.zkclient.ZkClient
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
> The code looks something like:
>
> directKafkaStream.transform { rdd =>
>    currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>    rdd
>  }.foreachRDD { rdd =>
>    ... /*do stuff with shuffling involved, then update DynamoDB*/
>
>       val props = new Properties()
>       kafkaConf.foreach(param => props.put(param._1, param._2))
>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>
>       val consumerConfig = new ConsumerConfig(props)
>       assert(!consumerConfig.autoCommitEnable)
>
>       zkClient = new ZkClient(consumerConfig.zkConnect,
> consumerConfig.zkSessionTimeoutMs,
>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>
>       offsetRanges.foreach { osr =>
>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>         val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>         ZkUtils.updatePersistentPath(zkClient, zkPath,
> osr.untilOffset.toString)
>       }
> }
>
> Why would there be serialization issues when I'm not trying to pass
> ZkClient
> to any of the workers?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24432.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to