Hi,
can you see whether using the option for checkPointLocation would work in case you are using structured streaming? Regards, Gourav Sengupta On Tue, Jul 24, 2018 at 12:30 PM, John, Vishal (Agoda) < vishal.j...@agoda.com.invalid> wrote: > > Hello all, > > > I have to read data from Kafka topic at regular intervals. I create the > dataframe as shown below. I don’t want to start reading from the beginning > on each run. At the same time, I don’t want to miss the messages between > run intervals. > > val queryDf = sqlContext > .read > .format("kafka") > .option("kafka.bootstrap.servers", hosts) > .option("enable.auto.commit", true) > .option("subscribe", topicName) > .option("auto.commit.interval.ms", 1000) > .option("startingOffsets", " latest") //?? earliest OR latest > .load() > .selectExpr("CAST(value AS STRING) as message") > > I would like to understand where the offsets will be stored, so that I can > supply it each time the application starts. Or is there a way to supply a > custom location where to store the offsets. > This is not a Steaming application. So, I am not sure if checkpoint > directory is valid in this case. > > Any pointers would be highly helpful. > > > thanks, > Vishal > > ________________________________ > This message is confidential and is for the sole use of the intended > recipient(s). It may also be privileged or otherwise protected by copyright > or other legal rules. If you have received it by mistake please let us know > by reply email and delete it from your system. It is prohibited to copy > this message or disclose its content to anyone. Any confidentiality or > privilege is not waived or lost by any mistaken delivery or unauthorized > disclosure of the message. All messages sent to and from Agoda may be > monitored to ensure compliance with company policies, to protect the > company's interests and to remove potential malware. Electronic messages > may be intercepted, amended, lost or deleted, or contain viruses. >