When updating the ZK offset in the driver (within foreachRDD), there is
somehow a serialization exception getting thrown:

15/08/24 15:45:40 ERROR JobScheduler: Error in job generator
java.io.NotSerializableException: org.I0Itec.zkclient.ZkClient
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

The code looks something like:

directKafkaStream.transform { rdd =>
   currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.foreachRDD { rdd =>
   ... /*do stuff with shuffling involved, then update DynamoDB*/

      val props = new Properties()
      kafkaConf.foreach(param => props.put(param._1, param._2))
      props.setProperty(AUTO_OFFSET_COMMIT, "false")

      val consumerConfig = new ConsumerConfig(props)
      assert(!consumerConfig.autoCommitEnable)

      zkClient = new ZkClient(consumerConfig.zkConnect,
consumerConfig.zkSessionTimeoutMs,
        consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

      offsetRanges.foreach { osr =>
        val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
        val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
        ZkUtils.updatePersistentPath(zkClient, zkPath,
osr.untilOffset.toString)
      }
}

Why would there be serialization issues when I'm not trying to pass ZkClient
to any of the workers?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24432.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to