Hi Cody, I am following to implement the exactly once semantics and also utilize storing the offsets in database. Question I have is how to use hive instead of traditional datastores. write to hive will be successful even though there is any issue with saving offsets into DB. Could you please correct me if I am wrong or let me know if you have any other suggestions.
stream.foreachRDD { rdd => if (!rdd.isEmpty()) { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges DB.localTx { implicit session => * // Write data to Hive after creating dataframes from Dtream RDD* // Store Offsets to DB offsetRanges.foreach { osr => val offsetRows = sql""" update txn_offsets set offset = ${osr.untilOffset} where topic = ${osr.topic} and part = ${osr.partition} and offset = ${osr.fromOffset} """.update.apply() if (offsetRows != 1) { throw new Exception(s""" Got $offsetRows rows affected instead of 1 when attempting to update offsets for ${osr.topic} ${osr.partition} ${osr.fromOffset} -> ${osr.untilOffset} Was a partition repeated after a worker failure? """) } } } } } Thanks, Asmath