When updating the ZK offset in the driver (within foreachRDD), there is somehow a serialization exception getting thrown:
15/08/24 15:45:40 ERROR JobScheduler: Error in job generator java.io.NotSerializableException: org.I0Itec.zkclient.ZkClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) The code looks something like: directKafkaStream.transform { rdd => currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd => ... /*do stuff with shuffling involved, then update DynamoDB*/ val props = new Properties() kafkaConf.foreach(param => props.put(param._1, param._2)) props.setProperty(AUTO_OFFSET_COMMIT, "false") val consumerConfig = new ConsumerConfig(props) assert(!consumerConfig.autoCommitEnable) zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) offsetRanges.foreach { osr => val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}" ZkUtils.updatePersistentPath(zkClient, zkPath, osr.untilOffset.toString) } } Why would there be serialization issues when I'm not trying to pass ZkClient to any of the workers? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24432.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org