[ https://issues.apache.org/jira/browse/SPARK-27456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-27456: ---------------------------------- Affects Version/s: (was: 2.4.1) 3.0.0 > Support commitSync for offsets in DirectKafkaInputDStream > --------------------------------------------------------- > > Key: SPARK-27456 > URL: https://issues.apache.org/jira/browse/SPARK-27456 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Jackson Westeen > Priority: Major > > Hello! I left a comment under SPARK-22486 but wasn't sure if it would get > noticed as that one got closed; x-posting here. > ---- > I'm trying to achieve "effectively once" semantics with Spark Streaming for > batch writes to S3. Only way to do this is to partitionBy(startOffsets) in > some way, such that re-writes on failure/retry are idempotent; they overwrite > the past batch if failure occurred before commitAsync was successful. > > Here's my example: > {code:java} > stream.foreachRDD((rdd: ConsumerRecord[String, Array[Byte]]) => { > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > // make dataset, with this batch's offsets included > spark > .createDataset(inputRdd) > .map(record => from_json(new String(record.value))) // just for example > .write > .mode(SaveMode.Overwrite) > .option("partitionOverwriteMode", "dynamic") > .withColumn("dateKey", from_unixtime($"from_json.timestamp"), "yyyyMMDD")) > .withColumn("startOffsets", > lit(offsetRanges.sortBy(_.partition).map(_.fromOffset).mkString("_")) ) > .partitionBy("dateKey", "startOffsets") > .parquet("s3://mybucket/kafka-parquet") > stream.asInstanceOf[CanCommitOffsets].commitAsync... > }) > {code} > This almost works. The only issue is, I can still end up with > duplicate/overlapping data if: > # an initial write to S3 succeeds (batch A) > # commitAsync takes a long time, eventually fails, *but the job carries on > to successfully write another batch in the meantime (batch B)* > # job fails for any reason, we start back at the last committed offsets, > however now with more data in Kafka to process than before... (batch A' which > includes A, B, ...) > # we successfully overwrite the initial batch by startOffsets with (batch > A') and progress as normal. No data is lost, however (batch B) is leftover in > S3 and contains partially duplicate data. > It would be very nice to have an atomic operation for write and > commitOffsets, or be able to simulate one with commitSync in Spark Streaming > :) > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org