Correctness bug on Shuffle+Repartition scenario

2020-12-29 Thread Shiao-An Yuan
Hi folks,

We recently identified a data correctness issue in our pipeline.

The data processing flow is as follows:
1. read the current snapshot (provide empty if it doesn't exist yet)
2. read unprocessed new data
3. union them and do a `reduceByKey` operation
4. output a new version of the snapshot
5. repeat step 1~4

The simplified version of code:
```
// schema
case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)

// function for reduce
def merge(left: Log, right: Log): Log = {
  Log(pkey = left.pkey
  a= if (left.a!=null) left.a else right.a,
  b= if (left.a!=null) left.b else right.b,
  ...
  )
}

// a very large parquet file (>10G, 200 partitions)
val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]

// multiple small parquet files
val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]

val newSnapshot = currentSnapshot.union(newAddedLog)
  .groupByKey(new String(pkey))  // generate key
  .reduceGroups(_.merge(_))//
spark.sql.shuffle.partitions=200
  .map(_._2) // drop key

newSnapshot
  .repartition(60)  // (1)
  .write.parquet(newPath)
```

The issue we have is that some data were duplicated or lost, and the amount
of
duplicated and loss data are similar.

We also noticed that this situation only happens if some instances got
preempted. Spark will retry the stage, so some of the partitioned files are
generated at the 1st time, and other files are generated at the 2nd(retry)
time.
Moreover, those duplicated logs will be duplicated exactly twice and
located in
both batches (one in the first batch; and one in the second batch).

The input/output files are parquet on GCS. The Spark version is 2.4.4 with
standalone deployment. Workers running on GCP preemptible instances and they
being preempted very frequently.

The pipeline is running in a single long-running process with multi-threads,
each snapshot represent an "hour" of data, and we do the
"read-reduce-write" operations
on multiple snapshots(hours) simultaneously. We pretty sure the same
snapshot(hour) never process parallelly and the output path always
generated with a timestamp, so those jobs shouldn't affect each other.

After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
the issue
was gone, but I believe there is still a correctness bug that hasn't been
reported yet.

We have tried to reproduce this bug on a smaller scale but haven't
succeeded yet. I
have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
Since this case is DataSet, I believe it is unrelated to SPARK-24243.

Can anyone give me some advice about the following tasks?
Thanks in advance.

Shiao-An Yuan


Re: Correctness bug on Shuffle+Repartition scenario

2020-12-29 Thread Sean Owen
Total guess here, but your key is a case class. It does define hashCode and
equals for you, but, you have an array as one of the members. Array
equality is by reference, so, two arrays of the same elements are not
equal. You may have to define hashCode and equals manually to make them
correct.

On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan 
wrote:

> Hi folks,
>
> We recently identified a data correctness issue in our pipeline.
>
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
>
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>
> // function for reduce
> def merge(left: Log, right: Log): Log = {
>   Log(pkey = left.pkey
>   a= if (left.a!=null) left.a else right.a,
>   b= if (left.a!=null) left.b else right.b,
>   ...
>   )
> }
>
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>
> val newSnapshot = currentSnapshot.union(newAddedLog)
>   .groupByKey(new String(pkey))  // generate key
>   .reduceGroups(_.merge(_))//
> spark.sql.shuffle.partitions=200
>   .map(_._2) // drop key
>
> newSnapshot
>   .repartition(60)  // (1)
>   .write.parquet(newPath)
> ```
>
> The issue we have is that some data were duplicated or lost, and the
> amount of
> duplicated and loss data are similar.
>
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry)
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and
> located in
> both batches (one in the first batch; and one in the second batch).
>
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and
> they
> being preempted very frequently.
>
> The pipeline is running in a single long-running process with
> multi-threads,
> each snapshot represent an "hour" of data, and we do the
> "read-reduce-write" operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
>
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
> the issue
> was gone, but I believe there is still a correctness bug that hasn't been
> reported yet.
>
> We have tried to reproduce this bug on a smaller scale but haven't
> succeeded yet. I
> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>
> Can anyone give me some advice about the following tasks?
> Thanks in advance.
>
> Shiao-An Yuan
>


Re: Correctness bug on Shuffle+Repartition scenario

2020-12-29 Thread Shiao-An Yuan
Hi Sean,

Sorry, I didn't describe it clearly. The column "pkey" is like a "Primary
Key" and I do "reduce by key" on this column, so the "amount of rows"
should always equal to the "cardinality of pkey".
When I said data get duplicated & lost, I mean duplicated "pkey" exists in
the output file (after "reduce by key") and some "pkey" missing.
Since it only happens when executors being preempted, I believe this is a
bug (nondeterministic shuffle) that SPARK-23207 trying to solve.

Thanks,

Shiao-An Yuan

On Tue, Dec 29, 2020 at 10:53 PM Sean Owen  wrote:

> Total guess here, but your key is a case class. It does define hashCode
> and equals for you, but, you have an array as one of the members. Array
> equality is by reference, so, two arrays of the same elements are not
> equal. You may have to define hashCode and equals manually to make them
> correct.
>
> On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan 
> wrote:
>
>> Hi folks,
>>
>> We recently identified a data correctness issue in our pipeline.
>>
>> The data processing flow is as follows:
>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>> 2. read unprocessed new data
>> 3. union them and do a `reduceByKey` operation
>> 4. output a new version of the snapshot
>> 5. repeat step 1~4
>>
>> The simplified version of code:
>> ```
>> // schema
>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>
>> // function for reduce
>> def merge(left: Log, right: Log): Log = {
>>   Log(pkey = left.pkey
>>   a= if (left.a!=null) left.a else right.a,
>>   b= if (left.a!=null) left.b else right.b,
>>   ...
>>   )
>> }
>>
>> // a very large parquet file (>10G, 200 partitions)
>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>
>> // multiple small parquet files
>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>
>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>   .groupByKey(new String(pkey))  // generate key
>>   .reduceGroups(_.merge(_))//
>> spark.sql.shuffle.partitions=200
>>   .map(_._2) // drop key
>>
>> newSnapshot
>>   .repartition(60)  // (1)
>>   .write.parquet(newPath)
>> ```
>>
>> The issue we have is that some data were duplicated or lost, and the
>> amount of
>> duplicated and loss data are similar.
>>
>> We also noticed that this situation only happens if some instances got
>> preempted. Spark will retry the stage, so some of the partitioned files
>> are
>> generated at the 1st time, and other files are generated at the
>> 2nd(retry) time.
>> Moreover, those duplicated logs will be duplicated exactly twice and
>> located in
>> both batches (one in the first batch; and one in the second batch).
>>
>> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
>> standalone deployment. Workers running on GCP preemptible instances and
>> they
>> being preempted very frequently.
>>
>> The pipeline is running in a single long-running process with
>> multi-threads,
>> each snapshot represent an "hour" of data, and we do the
>> "read-reduce-write" operations
>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>> snapshot(hour) never process parallelly and the output path always
>> generated with a timestamp, so those jobs shouldn't affect each other.
>>
>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>> the issue
>> was gone, but I believe there is still a correctness bug that hasn't been
>> reported yet.
>>
>> We have tried to reproduce this bug on a smaller scale but haven't
>> succeeded yet. I
>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>
>> Can anyone give me some advice about the following tasks?
>> Thanks in advance.
>>
>> Shiao-An Yuan
>>
>


Re: Correctness bug on Shuffle+Repartition scenario

2020-12-29 Thread Sean Owen
I don't think this addresses my comment at all. Please try correctly
implementing equals and hashCode for your key class first.

On Tue, Dec 29, 2020 at 8:31 PM Shiao-An Yuan 
wrote:

> Hi Sean,
>
> Sorry, I didn't describe it clearly. The column "pkey" is like a "Primary
> Key" and I do "reduce by key" on this column, so the "amount of rows"
> should always equal to the "cardinality of pkey".
> When I said data get duplicated & lost, I mean duplicated "pkey" exists in
> the output file (after "reduce by key") and some "pkey" missing.
> Since it only happens when executors being preempted, I believe this is a
> bug (nondeterministic shuffle) that SPARK-23207 trying to solve.
>
> Thanks,
>
> Shiao-An Yuan
>
> On Tue, Dec 29, 2020 at 10:53 PM Sean Owen  wrote:
>
>> Total guess here, but your key is a case class. It does define hashCode
>> and equals for you, but, you have an array as one of the members. Array
>> equality is by reference, so, two arrays of the same elements are not
>> equal. You may have to define hashCode and equals manually to make them
>> correct.
>>
>> On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan 
>> wrote:
>>
>>> Hi folks,
>>>
>>> We recently identified a data correctness issue in our pipeline.
>>>
>>> The data processing flow is as follows:
>>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>>> 2. read unprocessed new data
>>> 3. union them and do a `reduceByKey` operation
>>> 4. output a new version of the snapshot
>>> 5. repeat step 1~4
>>>
>>> The simplified version of code:
>>> ```
>>> // schema
>>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>>
>>> // function for reduce
>>> def merge(left: Log, right: Log): Log = {
>>>   Log(pkey = left.pkey
>>>   a= if (left.a!=null) left.a else right.a,
>>>   b= if (left.a!=null) left.b else right.b,
>>>   ...
>>>   )
>>> }
>>>
>>> // a very large parquet file (>10G, 200 partitions)
>>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> // multiple small parquet files
>>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>>   .groupByKey(new String(pkey))  // generate key
>>>   .reduceGroups(_.merge(_))//
>>> spark.sql.shuffle.partitions=200
>>>   .map(_._2) // drop key
>>>
>>> newSnapshot
>>>   .repartition(60)  // (1)
>>>   .write.parquet(newPath)
>>> ```
>>>
>>> The issue we have is that some data were duplicated or lost, and the
>>> amount of
>>> duplicated and loss data are similar.
>>>
>>> We also noticed that this situation only happens if some instances got
>>> preempted. Spark will retry the stage, so some of the partitioned files
>>> are
>>> generated at the 1st time, and other files are generated at the
>>> 2nd(retry) time.
>>> Moreover, those duplicated logs will be duplicated exactly twice and
>>> located in
>>> both batches (one in the first batch; and one in the second batch).
>>>
>>> The input/output files are parquet on GCS. The Spark version is 2.4.4
>>> with
>>> standalone deployment. Workers running on GCP preemptible instances and
>>> they
>>> being preempted very frequently.
>>>
>>> The pipeline is running in a single long-running process with
>>> multi-threads,
>>> each snapshot represent an "hour" of data, and we do the
>>> "read-reduce-write" operations
>>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>>> snapshot(hour) never process parallelly and the output path always
>>> generated with a timestamp, so those jobs shouldn't affect each other.
>>>
>>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>>> the issue
>>> was gone, but I believe there is still a correctness bug that hasn't
>>> been reported yet.
>>>
>>> We have tried to reproduce this bug on a smaller scale but haven't
>>> succeeded yet. I
>>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>>
>>> Can anyone give me some advice about the following tasks?
>>> Thanks in advance.
>>>
>>> Shiao-An Yuan
>>>
>>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Shiao-An Yuan
Hi folks,

I finally found the root cause of this issue.
It can be easily reproduced by the following code.
We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
environment.

```
import org.apache.spark.TaskContext
import scala.sys.process._
import org.apache.spark.sql.functions._
import com.google.common.hash.Hashing
val murmur3 = Hashing.murmur3_32()

// create a Dataset with the cardinality of the second element equals 5.
val ds = spark.range(0, 10, 1, 130).map(i =>
(murmur3.hashLong(i).asInt(), i/2))

ds.groupByKey(_._2)
  .agg(first($"_1").as[Long])
  .repartition(200)
  .map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
== 100 && TaskContext.get.stageAttemptNumber == 0) {
  throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
}
x
  }
  .map(_._2).distinct().count()   // the correct result is 5, but we
always got fewer number
```

The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning
always generate the same distribution,
but the UDAF `first` may return non-deterministic results and caused the
sorting result non-deterministic.
Therefore, the first stage and the retry stage might have different
distribution and cause duplications and loss.

Thanks,
Shiao-An Yuan

On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan 
wrote:

> Hi folks,
>
> We recently identified a data correctness issue in our pipeline.
>
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
>
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>
> // function for reduce
> def merge(left: Log, right: Log): Log = {
>   Log(pkey = left.pkey
>   a= if (left.a!=null) left.a else right.a,
>   b= if (left.a!=null) left.b else right.b,
>   ...
>   )
> }
>
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>
> val newSnapshot = currentSnapshot.union(newAddedLog)
>   .groupByKey(new String(pkey))  // generate key
>   .reduceGroups(_.merge(_))//
> spark.sql.shuffle.partitions=200
>   .map(_._2) // drop key
>
> newSnapshot
>   .repartition(60)  // (1)
>   .write.parquet(newPath)
> ```
>
> The issue we have is that some data were duplicated or lost, and the
> amount of
> duplicated and loss data are similar.
>
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry)
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and
> located in
> both batches (one in the first batch; and one in the second batch).
>
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and
> they
> being preempted very frequently.
>
> The pipeline is running in a single long-running process with
> multi-threads,
> each snapshot represent an "hour" of data, and we do the
> "read-reduce-write" operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
>
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
> the issue
> was gone, but I believe there is still a correctness bug that hasn't been
> reported yet.
>
> We have tried to reproduce this bug on a smaller scale but haven't
> succeeded yet. I
> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>
> Can anyone give me some advice about the following tasks?
> Thanks in advance.
>
> Shiao-An Yuan
>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Sean Owen
Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using?

On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan 
wrote:

> Hi folks,
>
> I finally found the root cause of this issue.
> It can be easily reproduced by the following code.
> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
> environment.
>
> ```
> import org.apache.spark.TaskContext
> import scala.sys.process._
> import org.apache.spark.sql.functions._
> import com.google.common.hash.Hashing
> val murmur3 = Hashing.murmur3_32()
>
> // create a Dataset with the cardinality of the second element equals
> 5.
> val ds = spark.range(0, 10, 1, 130).map(i =>
> (murmur3.hashLong(i).asInt(), i/2))
>
> ds.groupByKey(_._2)
>   .agg(first($"_1").as[Long])
>   .repartition(200)
>   .map { x =>
> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
> == 100 && TaskContext.get.stageAttemptNumber == 0) {
>   throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
> }
> x
>   }
>   .map(_._2).distinct().count()   // the correct result is 5, but we
> always got fewer number
> ```
>
> The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning
> always generate the same distribution,
> but the UDAF `first` may return non-deterministic results and caused the
> sorting result non-deterministic.
> Therefore, the first stage and the retry stage might have different
> distribution and cause duplications and loss.
>
> Thanks,
> Shiao-An Yuan
>
> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan 
> wrote:
>
>> Hi folks,
>>
>> We recently identified a data correctness issue in our pipeline.
>>
>> The data processing flow is as follows:
>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>> 2. read unprocessed new data
>> 3. union them and do a `reduceByKey` operation
>> 4. output a new version of the snapshot
>> 5. repeat step 1~4
>>
>> The simplified version of code:
>> ```
>> // schema
>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>
>> // function for reduce
>> def merge(left: Log, right: Log): Log = {
>>   Log(pkey = left.pkey
>>   a= if (left.a!=null) left.a else right.a,
>>   b= if (left.a!=null) left.b else right.b,
>>   ...
>>   )
>> }
>>
>> // a very large parquet file (>10G, 200 partitions)
>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>
>> // multiple small parquet files
>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>
>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>   .groupByKey(new String(pkey))  // generate key
>>   .reduceGroups(_.merge(_))//
>> spark.sql.shuffle.partitions=200
>>   .map(_._2) // drop key
>>
>> newSnapshot
>>   .repartition(60)  // (1)
>>   .write.parquet(newPath)
>> ```
>>
>> The issue we have is that some data were duplicated or lost, and the
>> amount of
>> duplicated and loss data are similar.
>>
>> We also noticed that this situation only happens if some instances got
>> preempted. Spark will retry the stage, so some of the partitioned files
>> are
>> generated at the 1st time, and other files are generated at the
>> 2nd(retry) time.
>> Moreover, those duplicated logs will be duplicated exactly twice and
>> located in
>> both batches (one in the first batch; and one in the second batch).
>>
>> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
>> standalone deployment. Workers running on GCP preemptible instances and
>> they
>> being preempted very frequently.
>>
>> The pipeline is running in a single long-running process with
>> multi-threads,
>> each snapshot represent an "hour" of data, and we do the
>> "read-reduce-write" operations
>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>> snapshot(hour) never process parallelly and the output path always
>> generated with a timestamp, so those jobs shouldn't affect each other.
>>
>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>> the issue
>> was gone, but I believe there is still a correctness bug that hasn't been
>> reported yet.
>>
>> We have tried to reproduce this bug on a smaller scale but haven't
>> succeeded yet. I
>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>
>> Can anyone give me some advice about the following tasks?
>> Thanks in advance.
>>
>> Shiao-An Yuan
>>
>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Gourav Sengupta
Hi,

I may be wrong, but this looks like a massively complicated solution for
what could have been a simple SQL.

It always seems okay to be to first reduce the complexity and then solve
it, rather than solve a problem which should not even exist in the first
instance.

Regards,
Gourav

On Sun, Jan 17, 2021 at 12:22 PM Shiao-An Yuan 
wrote:

> Hi folks,
>
> I finally found the root cause of this issue.
> It can be easily reproduced by the following code.
> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
> environment.
>
> ```
> import org.apache.spark.TaskContext
> import scala.sys.process._
> import org.apache.spark.sql.functions._
> import com.google.common.hash.Hashing
> val murmur3 = Hashing.murmur3_32()
>
> // create a Dataset with the cardinality of the second element equals
> 5.
> val ds = spark.range(0, 10, 1, 130).map(i =>
> (murmur3.hashLong(i).asInt(), i/2))
>
> ds.groupByKey(_._2)
>   .agg(first($"_1").as[Long])
>   .repartition(200)
>   .map { x =>
> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
> == 100 && TaskContext.get.stageAttemptNumber == 0) {
>   throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
> }
> x
>   }
>   .map(_._2).distinct().count()   // the correct result is 5, but we
> always got fewer number
> ```
>
> The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning
> always generate the same distribution,
> but the UDAF `first` may return non-deterministic results and caused the
> sorting result non-deterministic.
> Therefore, the first stage and the retry stage might have different
> distribution and cause duplications and loss.
>
> Thanks,
> Shiao-An Yuan
>
> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan 
> wrote:
>
>> Hi folks,
>>
>> We recently identified a data correctness issue in our pipeline.
>>
>> The data processing flow is as follows:
>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>> 2. read unprocessed new data
>> 3. union them and do a `reduceByKey` operation
>> 4. output a new version of the snapshot
>> 5. repeat step 1~4
>>
>> The simplified version of code:
>> ```
>> // schema
>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>
>> // function for reduce
>> def merge(left: Log, right: Log): Log = {
>>   Log(pkey = left.pkey
>>   a= if (left.a!=null) left.a else right.a,
>>   b= if (left.a!=null) left.b else right.b,
>>   ...
>>   )
>> }
>>
>> // a very large parquet file (>10G, 200 partitions)
>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>
>> // multiple small parquet files
>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>
>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>   .groupByKey(new String(pkey))  // generate key
>>   .reduceGroups(_.merge(_))//
>> spark.sql.shuffle.partitions=200
>>   .map(_._2) // drop key
>>
>> newSnapshot
>>   .repartition(60)  // (1)
>>   .write.parquet(newPath)
>> ```
>>
>> The issue we have is that some data were duplicated or lost, and the
>> amount of
>> duplicated and loss data are similar.
>>
>> We also noticed that this situation only happens if some instances got
>> preempted. Spark will retry the stage, so some of the partitioned files
>> are
>> generated at the 1st time, and other files are generated at the
>> 2nd(retry) time.
>> Moreover, those duplicated logs will be duplicated exactly twice and
>> located in
>> both batches (one in the first batch; and one in the second batch).
>>
>> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
>> standalone deployment. Workers running on GCP preemptible instances and
>> they
>> being preempted very frequently.
>>
>> The pipeline is running in a single long-running process with
>> multi-threads,
>> each snapshot represent an "hour" of data, and we do the
>> "read-reduce-write" operations
>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>> snapshot(hour) never process parallelly and the output path always
>> generated with a timestamp, so those jobs shouldn't affect each other.
>>
>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>> the issue
>> was gone, but I believe there is still a correctness bug that hasn't been
>> reported yet.
>>
>> We have tried to reproduce this bug on a smaller scale but haven't
>> succeeded yet. I
>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>
>> Can anyone give me some advice about the following tasks?
>> Thanks in advance.
>>
>> Shiao-An Yuan
>>
>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Mich Talebzadeh
Hi Shiao-An,

With regard to your set-up below and I quote:

"The input/output files are parquet on GCS. The Spark version is 2.4.4
with standalone deployment. Workers running on GCP preemptible instances
and they being preempted very frequently."

Am I correct that you have foregone deploying Dataproc clusters on GCP in
favour of selecting some VM boxes, installing your own Spark cluster
running Spark in standalone mode (assuming to save costs $$$). What is the
rationale behind this choice?  Accordingly
 *Compute
Engine might stop (preempt) these instances if it requires access to those
resources for other tasks. Preemptible instances are excess Compute Engine
capacity, so their availability varies with usage*. So what causes some VM
instances to be preempted? I have not touched standalone mode for couple of
years myself. So your ETL process reads the raw snapshots, does some joins
and creates new hourly processed snapshots. There seems to be not an
intermediate stage to verify the sanity of data (data lineage).
Personally I would deploy a database to do this ETL. That would give you an
option to look at your data easier and store everything in a staging area
before final push to the analytics layer.

HTH

LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 29 Dec 2020 at 14:01, Shiao-An Yuan  wrote:

> Hi folks,
>
> We recently identified a data correctness issue in our pipeline.
>
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
>
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>
> // function for reduce
> def merge(left: Log, right: Log): Log = {
>   Log(pkey = left.pkey
>   a= if (left.a!=null) left.a else right.a,
>   b= if (left.a!=null) left.b else right.b,
>   ...
>   )
> }
>
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>
> val newSnapshot = currentSnapshot.union(newAddedLog)
>   .groupByKey(new String(pkey))  // generate key
>   .reduceGroups(_.merge(_))//
> spark.sql.shuffle.partitions=200
>   .map(_._2) // drop key
>
> newSnapshot
>   .repartition(60)  // (1)
>   .write.parquet(newPath)
> ```
>
> The issue we have is that some data were duplicated or lost, and the
> amount of
> duplicated and loss data are similar.
>
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry)
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and
> located in
> both batches (one in the first batch; and one in the second batch).
>
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and
> they
> being preempted very frequently.
>
> The pipeline is running in a single long-running process with
> multi-threads,
> each snapshot represent an "hour" of data, and we do the
> "read-reduce-write" operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
>
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
> the issue
> was gone, but I believe there is still a correctness bug that hasn't been
> reported yet.
>
> We have tried to reproduce this bug on a smaller scale but haven't
> succeeded yet. I
> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>
> Can anyone give me some advice about the following tasks?
> Thanks in advance.
>
> Shiao-An Yuan
>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-17 Thread Shiao-An Yuan
Hi,
I am using Spark 2.4.4 standalone mode.

On Mon, Jan 18, 2021 at 4:26 AM Sean Owen  wrote:

> Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using?
>
> On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan 
> wrote:
>
>> Hi folks,
>>
>> I finally found the root cause of this issue.
>> It can be easily reproduced by the following code.
>> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
>> environment.
>>
>> ```
>> import org.apache.spark.TaskContext
>> import scala.sys.process._
>> import org.apache.spark.sql.functions._
>> import com.google.common.hash.Hashing
>> val murmur3 = Hashing.murmur3_32()
>>
>> // create a Dataset with the cardinality of the second element equals
>> 5.
>> val ds = spark.range(0, 10, 1, 130).map(i =>
>> (murmur3.hashLong(i).asInt(), i/2))
>>
>> ds.groupByKey(_._2)
>>   .agg(first($"_1").as[Long])
>>   .repartition(200)
>>   .map { x =>
>> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
>> == 100 && TaskContext.get.stageAttemptNumber == 0) {
>>   throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
>> }
>> x
>>   }
>>   .map(_._2).distinct().count()   // the correct result is 5, but we
>> always got fewer number
>> ```
>>
>> The problem here is SPARK-23207 use sorting to make
>> RoundRobinPartitioning always generate the same distribution,
>> but the UDAF `first` may return non-deterministic results and caused the
>> sorting result non-deterministic.
>> Therefore, the first stage and the retry stage might have different
>> distribution and cause duplications and loss.
>>
>> Thanks,
>> Shiao-An Yuan
>>
>> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan 
>> wrote:
>>
>>> Hi folks,
>>>
>>> We recently identified a data correctness issue in our pipeline.
>>>
>>> The data processing flow is as follows:
>>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>>> 2. read unprocessed new data
>>> 3. union them and do a `reduceByKey` operation
>>> 4. output a new version of the snapshot
>>> 5. repeat step 1~4
>>>
>>> The simplified version of code:
>>> ```
>>> // schema
>>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>>
>>> // function for reduce
>>> def merge(left: Log, right: Log): Log = {
>>>   Log(pkey = left.pkey
>>>   a= if (left.a!=null) left.a else right.a,
>>>   b= if (left.a!=null) left.b else right.b,
>>>   ...
>>>   )
>>> }
>>>
>>> // a very large parquet file (>10G, 200 partitions)
>>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> // multiple small parquet files
>>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>>   .groupByKey(new String(pkey))  // generate key
>>>   .reduceGroups(_.merge(_))//
>>> spark.sql.shuffle.partitions=200
>>>   .map(_._2) // drop key
>>>
>>> newSnapshot
>>>   .repartition(60)  // (1)
>>>   .write.parquet(newPath)
>>> ```
>>>
>>> The issue we have is that some data were duplicated or lost, and the
>>> amount of
>>> duplicated and loss data are similar.
>>>
>>> We also noticed that this situation only happens if some instances got
>>> preempted. Spark will retry the stage, so some of the partitioned files
>>> are
>>> generated at the 1st time, and other files are generated at the
>>> 2nd(retry) time.
>>> Moreover, those duplicated logs will be duplicated exactly twice and
>>> located in
>>> both batches (one in the first batch; and one in the second batch).
>>>
>>> The input/output files are parquet on GCS. The Spark version is 2.4.4
>>> with
>>> standalone deployment. Workers running on GCP preemptible instances and
>>> they
>>> being preempted very frequently.
>>>
>>> The pipeline is running in a single long-running process with
>>> multi-threads,
>>> each snapshot represent an "hour" of data, and we do the
>>> "read-reduce-write" operations
>>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>>> snapshot(hour) never process parallelly and the output path always
>>> generated with a timestamp, so those jobs shouldn't affect each other.
>>>
>>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>>> the issue
>>> was gone, but I believe there is still a correctness bug that hasn't
>>> been reported yet.
>>>
>>> We have tried to reproduce this bug on a smaller scale but haven't
>>> succeeded yet. I
>>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>>
>>> Can anyone give me some advice about the following tasks?
>>> Thanks in advance.
>>>
>>> Shiao-An Yuan
>>>
>>


Re: Correctness bug on Shuffle+Repartition scenario

2021-01-18 Thread 王长春
Hi Shiao-An Yuan
I also found this correctness problem in my production environment.
My spark version is 2.3.1。 I thought it was because Spark-23243 before .
But you said You also have this problem in your environment
, and your version is 2.4.4 which had solved spark-23243. So Maybe this problem 
is not because SPARK-23243.
 
As you said ,if it was caused by ‘first’ before ‘repartition’, then how to 
solve this problem fundamentally. And is there any workaround?


> 2021年1月18日 上午10:35,Shiao-An Yuan  写道:
> 
> Hi, 
> I am using Spark 2.4.4 standalone mode.
> 
> On Mon, Jan 18, 2021 at 4:26 AM Sean Owen  > wrote:
> Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using?
> 
> On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan  > wrote:
> Hi folks,
> 
> I finally found the root cause of this issue.
> It can be easily reproduced by the following code.
> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores) 
> environment.
> 
> ```
> import org.apache.spark.TaskContext
> import scala.sys.process._
> import org.apache.spark.sql.functions._
> import com.google.common.hash.Hashing
> val murmur3 = Hashing.murmur3_32()
> 
> // create a Dataset with the cardinality of the second element equals 5.
> val ds = spark.range(0, 10, 1, 130).map(i => 
> (murmur3.hashLong(i).asInt(), i/2))
> 
> ds.groupByKey(_._2)
>   .agg(first($"_1").as[Long])
>   .repartition(200)
>   .map { x =>
> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 
> 100 && TaskContext.get.stageAttemptNumber == 0) {
>   throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
> }
> x
>   }
>   .map(_._2).distinct().count()   // the correct result is 5, but we 
> always got fewer number
> ```
> 
> The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning 
> always generate the same distribution,
> but the UDAF `first` may return non-deterministic results and caused the 
> sorting result non-deterministic.
> Therefore, the first stage and the retry stage might have different 
> distribution and cause duplications and loss.
> 
> Thanks,
> Shiao-An Yuan
> 
> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan  > wrote:
> Hi folks,
> 
> We recently identified a data correctness issue in our pipeline.
> 
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
> 
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
> 
> // function for reduce
> def merge(left: Log, right: Log): Log = {
>   Log(pkey = left.pkey
>   a= if (left.a!=null) left.a else right.a,
>   b= if (left.a!=null) left.b else right.b,
>   ...
>   )
> }
> 
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]   
> 
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
> 
> val newSnapshot = currentSnapshot.union(newAddedLog)
>   .groupByKey(new String(pkey))  // generate key
>   .reduceGroups(_.merge(_))// 
> spark.sql.shuffle.partitions=200
>   .map(_._2) // drop key
> 
> newSnapshot
>   .repartition(60)  // (1)
>   .write.parquet(newPath)
> ```
> 
> The issue we have is that some data were duplicated or lost, and the amount of
> duplicated and loss data are similar.
> 
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry) 
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and located 
> in
> both batches (one in the first batch; and one in the second batch).
> 
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and they
> being preempted very frequently.
> 
> The pipeline is running in a single long-running process with multi-threads,
> each snapshot represent an "hour" of data, and we do the "read-reduce-write" 
> operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
> 
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` the 
> issue
> was gone, but I believe there is still a correctness bug that hasn't been 
> reported yet.
> 
> We have tried to reproduce this bug on a smaller scale but haven't succeeded 
> yet. I
> have re