Re: Correctness bug on Shuffle+Repartition scenario
I don't think this addresses my comment at all. Please try correctly implementing equals and hashCode for your key class first. On Tue, Dec 29, 2020 at 8:31 PM Shiao-An Yuan wrote: > Hi Sean, > > Sorry, I didn't describe it clearly. The column "pkey" is like a "Primary > Key" and I do "reduce by key" on this column, so the "amount of rows" > should always equal to the "cardinality of pkey". > When I said data get duplicated & lost, I mean duplicated "pkey" exists in > the output file (after "reduce by key") and some "pkey" missing. > Since it only happens when executors being preempted, I believe this is a > bug (nondeterministic shuffle) that SPARK-23207 trying to solve. > > Thanks, > > Shiao-An Yuan > > On Tue, Dec 29, 2020 at 10:53 PM Sean Owen wrote: > >> Total guess here, but your key is a case class. It does define hashCode >> and equals for you, but, you have an array as one of the members. Array >> equality is by reference, so, two arrays of the same elements are not >> equal. You may have to define hashCode and equals manually to make them >> correct. >> >> On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan >> wrote: >> >>> Hi folks, >>> >>> We recently identified a data correctness issue in our pipeline. >>> >>> The data processing flow is as follows: >>> 1. read the current snapshot (provide empty if it doesn't exist yet) >>> 2. read unprocessed new data >>> 3. union them and do a `reduceByKey` operation >>> 4. output a new version of the snapshot >>> 5. repeat step 1~4 >>> >>> The simplified version of code: >>> ``` >>> // schema >>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) >>> >>> // function for reduce >>> def merge(left: Log, right: Log): Log = { >>> Log(pkey = left.pkey >>> a= if (left.a!=null) left.a else right.a, >>> b= if (left.a!=null) left.b else right.b, >>> ... >>> ) >>> } >>> >>> // a very large parquet file (>10G, 200 partitions) >>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] >>> >>> // multiple small parquet files >>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] >>> >>> val newSnapshot = currentSnapshot.union(newAddedLog) >>> .groupByKey(new String(pkey)) // generate key >>> .reduceGroups(_.merge(_))// >>> spark.sql.shuffle.partitions=200 >>> .map(_._2) // drop key >>> >>> newSnapshot >>> .repartition(60) // (1) >>> .write.parquet(newPath) >>> ``` >>> >>> The issue we have is that some data were duplicated or lost, and the >>> amount of >>> duplicated and loss data are similar. >>> >>> We also noticed that this situation only happens if some instances got >>> preempted. Spark will retry the stage, so some of the partitioned files >>> are >>> generated at the 1st time, and other files are generated at the >>> 2nd(retry) time. >>> Moreover, those duplicated logs will be duplicated exactly twice and >>> located in >>> both batches (one in the first batch; and one in the second batch). >>> >>> The input/output files are parquet on GCS. The Spark version is 2.4.4 >>> with >>> standalone deployment. Workers running on GCP preemptible instances and >>> they >>> being preempted very frequently. >>> >>> The pipeline is running in a single long-running process with >>> multi-threads, >>> each snapshot represent an "hour" of data, and we do the >>> "read-reduce-write" operations >>> on multiple snapshots(hours) simultaneously. We pretty sure the same >>> snapshot(hour) never process parallelly and the output path always >>> generated with a timestamp, so those jobs shouldn't affect each other. >>> >>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` >>> the issue >>> was gone, but I believe there is still a correctness bug that hasn't >>> been reported yet. >>> >>> We have tried to reproduce this bug on a smaller scale but haven't >>> succeeded yet. I >>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug. >>> Since this case is DataSet, I believe it is unrelated to SPARK-24243. >>> >>> Can anyone give me some advice about the following tasks? >>> Thanks in advance. >>> >>> Shiao-An Yuan >>> >>
About
check it out https://backbutton.co.uk/about.html Regards not Linda - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Correctness bug on Shuffle+Repartition scenario
Hi Sean, Sorry, I didn't describe it clearly. The column "pkey" is like a "Primary Key" and I do "reduce by key" on this column, so the "amount of rows" should always equal to the "cardinality of pkey". When I said data get duplicated & lost, I mean duplicated "pkey" exists in the output file (after "reduce by key") and some "pkey" missing. Since it only happens when executors being preempted, I believe this is a bug (nondeterministic shuffle) that SPARK-23207 trying to solve. Thanks, Shiao-An Yuan On Tue, Dec 29, 2020 at 10:53 PM Sean Owen wrote: > Total guess here, but your key is a case class. It does define hashCode > and equals for you, but, you have an array as one of the members. Array > equality is by reference, so, two arrays of the same elements are not > equal. You may have to define hashCode and equals manually to make them > correct. > > On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan > wrote: > >> Hi folks, >> >> We recently identified a data correctness issue in our pipeline. >> >> The data processing flow is as follows: >> 1. read the current snapshot (provide empty if it doesn't exist yet) >> 2. read unprocessed new data >> 3. union them and do a `reduceByKey` operation >> 4. output a new version of the snapshot >> 5. repeat step 1~4 >> >> The simplified version of code: >> ``` >> // schema >> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) >> >> // function for reduce >> def merge(left: Log, right: Log): Log = { >> Log(pkey = left.pkey >> a= if (left.a!=null) left.a else right.a, >> b= if (left.a!=null) left.b else right.b, >> ... >> ) >> } >> >> // a very large parquet file (>10G, 200 partitions) >> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] >> >> // multiple small parquet files >> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] >> >> val newSnapshot = currentSnapshot.union(newAddedLog) >> .groupByKey(new String(pkey)) // generate key >> .reduceGroups(_.merge(_))// >> spark.sql.shuffle.partitions=200 >> .map(_._2) // drop key >> >> newSnapshot >> .repartition(60) // (1) >> .write.parquet(newPath) >> ``` >> >> The issue we have is that some data were duplicated or lost, and the >> amount of >> duplicated and loss data are similar. >> >> We also noticed that this situation only happens if some instances got >> preempted. Spark will retry the stage, so some of the partitioned files >> are >> generated at the 1st time, and other files are generated at the >> 2nd(retry) time. >> Moreover, those duplicated logs will be duplicated exactly twice and >> located in >> both batches (one in the first batch; and one in the second batch). >> >> The input/output files are parquet on GCS. The Spark version is 2.4.4 with >> standalone deployment. Workers running on GCP preemptible instances and >> they >> being preempted very frequently. >> >> The pipeline is running in a single long-running process with >> multi-threads, >> each snapshot represent an "hour" of data, and we do the >> "read-reduce-write" operations >> on multiple snapshots(hours) simultaneously. We pretty sure the same >> snapshot(hour) never process parallelly and the output path always >> generated with a timestamp, so those jobs shouldn't affect each other. >> >> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` >> the issue >> was gone, but I believe there is still a correctness bug that hasn't been >> reported yet. >> >> We have tried to reproduce this bug on a smaller scale but haven't >> succeeded yet. I >> have read SPARK-23207 and SPARK-28699, but couldn't found the bug. >> Since this case is DataSet, I believe it is unrelated to SPARK-24243. >> >> Can anyone give me some advice about the following tasks? >> Thanks in advance. >> >> Shiao-An Yuan >> >
[Spark Streaming] Why is ZooKeeper LeaderElection Agent not being called by Spark Master?
Hello, Request you to please help me out on the below queries: I have 2 spark masters and 3 zookeepers deployed on my system on separate virtual machines. The services come up online in the below sequence: 1. zookeeper-1 2. sparkmaster-1 3. sparkmaster-2 4. zookeeper-2 5. zookeeper-3 The above sequence leads to both the spark masters running in STANDBY mode. >From the logs, I can see that only after zookeeper-2 service comes up (i.e. 2 zookeeper services are up), spark master is successfully able to create a zookeeper session. Until zookeeper-2 is up, it re-tries session creation. However, after both zookeeper services are up and Persistence Engine is able to successfully connect and create a session; the ZooKeeper LeaderElection Agent is not called. Logs: 10:03:47.241 INFO org.apache.spark.internal.Logging:57 - Persisting recovery state to ZooKeeper Initiating client connection, connectString=zookeeper-2:,zookeeper-3:,zookeeper-1: sessionTimeout=6 watcher=org.apache.curator.ConnectionState # Only zookeeper-2 is online # 10:03:47.630 INFO org.apache.zookeeper.ClientCnxn$SendThread:1025 - Opening socket connection to server zookeeper-1:. Will not attempt to authenticate using SASL (unknown error) 10:03:50.635 INFO org.apache.zookeeper.ClientCnxn$SendThread:1162 - Socket error occurred: zookeeper-1:: No route to host 10:03:50.738 INFO org.apache.zookeeper.ClientCnxn$SendThread:1025 - Opening socket connection to server zookeeper-2:. Will not attempt to authenticate using SASL (unknown error) 2020-12-18 10:03:50.739 INFO org.apache.zookeeper.ClientCnxn$SendThread:879 - Socket connection established to zookeeper-2:, initiating session 10:03:50.742 INFO org.apache.zookeeper.ClientCnxn$SendThread:1158 - Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect 10:03:51.842 INFO org.apache.zookeeper.ClientCnxn$SendThread:1025 - Opening socket connection to server zookeeper-3:. Will not attempt to authenticate using SASL (unknown error) 10:03:51.843 INFO org.apache.zookeeper.ClientCnxn$SendThread:1162 - Socket error occurred: zookeeper-3:: Connection refused 10:04:02.685 ERROR org.apache.curator.ConnectionState:200 - Connection timed out for connection string (zookeeper-2:,zookeeper-3:,zookeeper-1:) and timeout (15000) / elapsed (15274) org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87) 10:04:22.691 ERROR org.apache.curator.ConnectionState:200 - Connection timed out for connection string (zookeeper-2:,zookeeper-3:,zookeeper-1:) and timeout (15000) / elapsed (35297) org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) 10:04:42.696 ERROR org.apache.curator.ConnectionState:200 - Connection timed out for connection string (zookeeper-2:,zookeeper-3:,zookeeper-1:) and timeout (15000) / elapsed (55301) org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87) 10:05:32.699 WARN org.apache.curator.ConnectionState:191 - Connection attempt unsuccessful after 105305 (greater than max timeout of 6). Resetting connection and trying again with a new connection. 10:05:32.864 INFO org.apache.zookeeper.ZooKeeper:693 - Session: 0x0 closed 10:05:32.865 INFO org.apache.zookeeper.ZooKeeper:442 - Initiating client connection, connectString=zookeeper-2:,zookeeper-3:,zookeeper-1: sessionTimeout=6 watcher=org.apache.curator.ConnectionState@ 10:05:32.864 INFO org.apache.zookeeper.ClientCnxn$EventThread:522 - EventThread shut down for session: 0x0 10:05:32.969 ERROR org.apache.spark.internal.Logging:94 - Ignoring error org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /x/y at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) # zookeeper-2, zookeeper-3 are online # 10:05:47.357 INFO org.apache.zookeeper.ClientCnxn$SendThread:1025 - Opening socket connection to server zookeeper-2:. Will not attempt to authenticate using SASL (unknown error) 10:05:47.358 INFO org.apache.zookeeper.ClientCnxn$SendThread:879 - Socket connection established to zookeeper-2:x
Re: Correctness bug on Shuffle+Repartition scenario
Total guess here, but your key is a case class. It does define hashCode and equals for you, but, you have an array as one of the members. Array equality is by reference, so, two arrays of the same elements are not equal. You may have to define hashCode and equals manually to make them correct. On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan wrote: > Hi folks, > > We recently identified a data correctness issue in our pipeline. > > The data processing flow is as follows: > 1. read the current snapshot (provide empty if it doesn't exist yet) > 2. read unprocessed new data > 3. union them and do a `reduceByKey` operation > 4. output a new version of the snapshot > 5. repeat step 1~4 > > The simplified version of code: > ``` > // schema > case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) > > // function for reduce > def merge(left: Log, right: Log): Log = { > Log(pkey = left.pkey > a= if (left.a!=null) left.a else right.a, > b= if (left.a!=null) left.b else right.b, > ... > ) > } > > // a very large parquet file (>10G, 200 partitions) > val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] > > // multiple small parquet files > val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] > > val newSnapshot = currentSnapshot.union(newAddedLog) > .groupByKey(new String(pkey)) // generate key > .reduceGroups(_.merge(_))// > spark.sql.shuffle.partitions=200 > .map(_._2) // drop key > > newSnapshot > .repartition(60) // (1) > .write.parquet(newPath) > ``` > > The issue we have is that some data were duplicated or lost, and the > amount of > duplicated and loss data are similar. > > We also noticed that this situation only happens if some instances got > preempted. Spark will retry the stage, so some of the partitioned files are > generated at the 1st time, and other files are generated at the 2nd(retry) > time. > Moreover, those duplicated logs will be duplicated exactly twice and > located in > both batches (one in the first batch; and one in the second batch). > > The input/output files are parquet on GCS. The Spark version is 2.4.4 with > standalone deployment. Workers running on GCP preemptible instances and > they > being preempted very frequently. > > The pipeline is running in a single long-running process with > multi-threads, > each snapshot represent an "hour" of data, and we do the > "read-reduce-write" operations > on multiple snapshots(hours) simultaneously. We pretty sure the same > snapshot(hour) never process parallelly and the output path always > generated with a timestamp, so those jobs shouldn't affect each other. > > After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` > the issue > was gone, but I believe there is still a correctness bug that hasn't been > reported yet. > > We have tried to reproduce this bug on a smaller scale but haven't > succeeded yet. I > have read SPARK-23207 and SPARK-28699, but couldn't found the bug. > Since this case is DataSet, I believe it is unrelated to SPARK-24243. > > Can anyone give me some advice about the following tasks? > Thanks in advance. > > Shiao-An Yuan >
Correctness bug on Shuffle+Repartition scenario
Hi folks, We recently identified a data correctness issue in our pipeline. The data processing flow is as follows: 1. read the current snapshot (provide empty if it doesn't exist yet) 2. read unprocessed new data 3. union them and do a `reduceByKey` operation 4. output a new version of the snapshot 5. repeat step 1~4 The simplified version of code: ``` // schema case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) // function for reduce def merge(left: Log, right: Log): Log = { Log(pkey = left.pkey a= if (left.a!=null) left.a else right.a, b= if (left.a!=null) left.b else right.b, ... ) } // a very large parquet file (>10G, 200 partitions) val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] // multiple small parquet files val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] val newSnapshot = currentSnapshot.union(newAddedLog) .groupByKey(new String(pkey)) // generate key .reduceGroups(_.merge(_))// spark.sql.shuffle.partitions=200 .map(_._2) // drop key newSnapshot .repartition(60) // (1) .write.parquet(newPath) ``` The issue we have is that some data were duplicated or lost, and the amount of duplicated and loss data are similar. We also noticed that this situation only happens if some instances got preempted. Spark will retry the stage, so some of the partitioned files are generated at the 1st time, and other files are generated at the 2nd(retry) time. Moreover, those duplicated logs will be duplicated exactly twice and located in both batches (one in the first batch; and one in the second batch). The input/output files are parquet on GCS. The Spark version is 2.4.4 with standalone deployment. Workers running on GCP preemptible instances and they being preempted very frequently. The pipeline is running in a single long-running process with multi-threads, each snapshot represent an "hour" of data, and we do the "read-reduce-write" operations on multiple snapshots(hours) simultaneously. We pretty sure the same snapshot(hour) never process parallelly and the output path always generated with a timestamp, so those jobs shouldn't affect each other. After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` the issue was gone, but I believe there is still a correctness bug that hasn't been reported yet. We have tried to reproduce this bug on a smaller scale but haven't succeeded yet. I have read SPARK-23207 and SPARK-28699, but couldn't found the bug. Since this case is DataSet, I believe it is unrelated to SPARK-24243. Can anyone give me some advice about the following tasks? Thanks in advance. Shiao-An Yuan