Hi Cody,

I am following to implement the exactly once semantics and also utilize
storing the offsets in database. Question I have is how to use hive instead
of traditional datastores. write to hive will be successful even though
there is any issue with saving offsets into DB. Could you please correct me
if I am wrong or let me know if you have any other suggestions.

    stream.foreachRDD { rdd =>
       if (!rdd.isEmpty()) {
       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      DB.localTx { implicit session =>
   *    // Write data to Hive after creating dataframes from Dtream RDD*

        // Store Offsets to DB
        offsetRanges.foreach { osr =>
          val offsetRows = sql"""
                              update txn_offsets set offset =
${osr.untilOffset}
                                where topic = ${osr.topic} and part =
${osr.partition} and offset = ${osr.fromOffset}
                              """.update.apply()
          if (offsetRows != 1) {
            throw new Exception(s"""
                              Got $offsetRows rows affected instead of 1
when attempting to update offsets for
                               ${osr.topic} ${osr.partition}
${osr.fromOffset} -> ${osr.untilOffset}
                              Was a partition repeated after a worker
failure?
                              """)
          }
        }

      }
       }
    }

Thanks,
Asmath

Reply via email to