Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using?

On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan <shiao.an.y...@gmail.com>
wrote:

> Hi folks,
>
> I finally found the root cause of this issue.
> It can be easily reproduced by the following code.
> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
> environment.
>
> ```
> import org.apache.spark.TaskContext
> import scala.sys.process._
> import org.apache.spark.sql.functions._
> import com.google.common.hash.Hashing
> val murmur3 = Hashing.murmur3_32()
>
> // create a Dataset with the cardinality of the second element equals
> 50000.
> val ds = spark.range(0, 100000, 1, 130).map(i =>
> (murmur3.hashLong(i).asInt(), i/2))
>
> ds.groupByKey(_._2)
>   .agg(first($"_1").as[Long])
>   .repartition(200)
>   .map { x =>
>     if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
> == 100 && TaskContext.get.stageAttemptNumber == 0) {
>       throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
>     }
>     x
>   }
>   .map(_._2).distinct().count()   // the correct result is 50000, but we
> always got fewer number
> ```
>
> The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning
> always generate the same distribution,
> but the UDAF `first` may return non-deterministic results and caused the
> sorting result non-deterministic.
> Therefore, the first stage and the retry stage might have different
> distribution and cause duplications and loss.
>
> Thanks,
> Shiao-An Yuan
>
> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan <shiao.an.y...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> We recently identified a data correctness issue in our pipeline.
>>
>> The data processing flow is as follows:
>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>> 2. read unprocessed new data
>> 3. union them and do a `reduceByKey` operation
>> 4. output a new version of the snapshot
>> 5. repeat step 1~4
>>
>> The simplified version of code:
>> ```
>> // schema
>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>
>> // function for reduce
>> def merge(left: Log, right: Log): Log = {
>>   Log(pkey = left.pkey
>>       a    = if (left.a!=null) left.a else right.a,
>>       b    = if (left.a!=null) left.b else right.b,
>>       ...
>>   )
>> }
>>
>> // a very large parquet file (>10G, 200 partitions)
>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>
>> // multiple small parquet files
>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>
>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>   .groupByKey(new String(pkey))                  // generate key
>>   .reduceGroups(_.merge(_))                        //
>> spark.sql.shuffle.partitions=200
>>   .map(_._2)                                     // drop key
>>
>> newSnapshot
>>   .repartition(60)                              // (1)
>>   .write.parquet(newPath)
>> ```
>>
>> The issue we have is that some data were duplicated or lost, and the
>> amount of
>> duplicated and loss data are similar.
>>
>> We also noticed that this situation only happens if some instances got
>> preempted. Spark will retry the stage, so some of the partitioned files
>> are
>> generated at the 1st time, and other files are generated at the
>> 2nd(retry) time.
>> Moreover, those duplicated logs will be duplicated exactly twice and
>> located in
>> both batches (one in the first batch; and one in the second batch).
>>
>> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
>> standalone deployment. Workers running on GCP preemptible instances and
>> they
>> being preempted very frequently.
>>
>> The pipeline is running in a single long-running process with
>> multi-threads,
>> each snapshot represent an "hour" of data, and we do the
>> "read-reduce-write" operations
>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>> snapshot(hour) never process parallelly and the output path always
>> generated with a timestamp, so those jobs shouldn't affect each other.
>>
>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>> the issue
>> was gone, but I believe there is still a correctness bug that hasn't been
>> reported yet.
>>
>> We have tried to reproduce this bug on a smaller scale but haven't
>> succeeded yet. I
>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>
>> Can anyone give me some advice about the following tasks?
>> Thanks in advance.
>>
>> Shiao-An Yuan
>>
>

Reply via email to