[ 
https://issues.apache.org/jira/browse/SPARK-27456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27456:
----------------------------------
    Affects Version/s:     (was: 2.4.1)
                       3.0.0

> Support commitSync for offsets in DirectKafkaInputDStream
> ---------------------------------------------------------
>
>                 Key: SPARK-27456
>                 URL: https://issues.apache.org/jira/browse/SPARK-27456
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Jackson Westeen
>            Priority: Major
>
> Hello! I left a comment under SPARK-22486 but wasn't sure if it would get 
> noticed as that one got closed; x-posting here.
> ----
> I'm trying to achieve "effectively once" semantics with Spark Streaming for 
> batch writes to S3. Only way to do this is to partitionBy(startOffsets) in 
> some way, such that re-writes on failure/retry are idempotent; they overwrite 
> the past batch if failure occurred before commitAsync was successful.
>  
> Here's my example:
> {code:java}
> stream.foreachRDD((rdd:  ConsumerRecord[String, Array[Byte]]) => {
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   // make dataset, with this batch's offsets included
>   spark
>     .createDataset(inputRdd)
>     .map(record => from_json(new String(record.value))) // just for example
>     .write
>     .mode(SaveMode.Overwrite)
>     .option("partitionOverwriteMode", "dynamic")
>     .withColumn("dateKey", from_unixtime($"from_json.timestamp"), "yyyyMMDD"))
>     .withColumn("startOffsets",   
> lit(offsetRanges.sortBy(_.partition).map(_.fromOffset).mkString("_"))  )
>     .partitionBy("dateKey", "startOffsets")
>     .parquet("s3://mybucket/kafka-parquet")
>   stream.asInstanceOf[CanCommitOffsets].commitAsync...
> })
> {code}
> This almost works. The only issue is, I can still end up with 
> duplicate/overlapping data if:
>  # an initial write to S3 succeeds (batch A)
>  # commitAsync takes a long time, eventually fails, *but the job carries on 
> to successfully write another batch in the meantime (batch B)*
>  # job fails for any reason, we start back at the last committed offsets, 
> however now with more data in Kafka to process than before... (batch A' which 
> includes A, B, ...)
>  # we successfully overwrite the initial batch by startOffsets with (batch 
> A') and progress as normal. No data is lost, however (batch B) is leftover in 
> S3 and contains partially duplicate data.
> It would be very nice to have an atomic operation for write and 
> commitOffsets, or be able to simulate one with commitSync in Spark Streaming 
> :)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to