[ https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust updated SPARK-20103: ------------------------------------- Fix Version/s: 2.2.0 > Spark structured steaming from kafka - last message processed again after > resume from checkpoint > ------------------------------------------------------------------------------------------------ > > Key: SPARK-20103 > URL: https://issues.apache.org/jira/browse/SPARK-20103 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.0 > Environment: Linux, Spark 2.10 > Reporter: Rajesh Mutha > Labels: spark, streaming > Fix For: 2.2.0 > > > When the application starts after a failure or a graceful shutdown, it is > consistently processing the last message of the previous batch even though it > was already processed correctly without failure. > We are making sure database writes are idempotent using postgres 9.6 feature. > Is this the default behavior of spark? I added a code snippet with 2 > streaming queries. One of the query is idempotent; since query2 is not > idempotent, we are seeing duplicate entries in table. > {code} > object StructuredStreaming { > def main(args: Array[String]): Unit = { > val db_url = > "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password" > val spark = SparkSession > .builder > .appName("StructuredKafkaReader") > .master("local[*]") > .getOrCreate() > spark.conf.set("spark.sql.streaming.checkpointLocation", > "/tmp/checkpoint_research/") > import spark.implicits._ > val server = "10.205.82.113:9092" > val topic = "checkpoint" > val subscribeType="subscribe" > val lines = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", server) > .option(subscribeType, topic) > .load().selectExpr("CAST(value AS STRING)").as[String] > lines.printSchema() > import org.apache.spark.sql.ForeachWriter > val writer = new ForeachWriter[String] { > def open(partitionId: Long, version: Long): Boolean = { > println("After db props"); true > } > def process(value: String) = { > val conn = DriverManager.getConnection(db_url) > try{ > conn.createStatement().executeUpdate("INSERT INTO > PUBLIC.checkpoint1 VALUES ('"+value+"')") > } > finally { > conn.close() > } > } > def close(errorOrNull: Throwable) = {} > } > import scala.concurrent.duration._ > val query1 = lines.writeStream > .outputMode("append") > .queryName("checkpoint1") > .trigger(ProcessingTime(30.seconds)) > .foreach(writer) > .start() > val writer2 = new ForeachWriter[String] { > def open(partitionId: Long, version: Long): Boolean = { > println("After db props"); true > } > def process(value: String) = { > val conn = DriverManager.getConnection(db_url) > try{ > conn.createStatement().executeUpdate("INSERT INTO > PUBLIC.checkpoint2 VALUES ('"+value+"')") > } > finally { > conn.close() > } > } > def close(errorOrNull: Throwable) = {} > } > import scala.concurrent.duration._ > val query2 = lines.writeStream > .outputMode("append") > .queryName("checkpoint2") > .trigger(ProcessingTime(30.seconds)) > .foreach(writer2) > .start() > query2.awaitTermination() > query1.awaitTermination() > }} > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org