Try "spark.shuffle.io.numConnectionsPerPeer=10" On Fri, Aug 30, 2019 at 10:22 AM Daniel Zhang <java8...@hotmail.com> wrote:
> Hi, All: > We are testing the EMR and compare with our on-premise HDP solution. We > use one application as the test: > EMR (5.21.1) with Hadoop 2.8.5 + Spark 2.4.3 vs HDP (2.6.3) with Hadoop > 2.7.3 + Spark 2.2.0 > The application is very simple, just read Parquet raw file, then do a > DS.repartition(id_col).flatMap().write.partitionBy(col).save() operation. > > For the testing data on HDP with 6 slave nodes (32G each), the whole > application can finish around 3 hours. We are fine with it. > This application will run a Spark application with 2 stages. The 2nd stage > will run with 200 tasks as default. > On EMR, we observed that 2 of 200 tasks is hanging for more than 10 hours, > while the rests are done, and we have to give up. > > The first test is to read the raw parquet file from S3 and use AWS S3 as > the output directly. So I think it could be some issue with S3 output > committer. So we change the test to read parquet file from S3 and use EMR > HDFS as the output location. > To my surprise, we observed the same behavior using HDFS, 2 of 200 tasks > hanging forever, and they are on different executors. These 2 executors are > normal to process other tasks but just hang for these 2 tasks, while all > the rest finished. > > This looks like data skew, but we know it is not. As the same application > and the same data work fine on HDP, and we saw well-balanced data across > all 200 tasks. > > Now I checked more careful for the executors log on EMR for using HDFS > test case, and I know the S3 is not an issue here, as all the parquet raw > data being read in the first stage of the job WITHOUT any delay. > > Sample log from the finished executor on EMR: > *19/08/29 20:18:49 INFO Executor: Finished task 157.0 in stage 2.0 (TID > 170). 3854 bytes result sent to driver* > *19/08/29 20:18:49 INFO CoarseGrainedExecutorBackend: Got assigned task > 179* > *19/08/29 20:18:49 INFO Executor: Running task 166.0 in stage 2.0 (TID > 179)* > *19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty > blocks including 1 local blocks and 11 remote blocks* > *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 > <http://10.51.51.147:7337>, creating a new one.* > *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 > <http://10.51.51.147:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 > <http://10.51.51.141:7337>, creating a new one.* > *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 > <http://10.51.51.141:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 > <http://10.51.51.155:7337>, creating a new one.* > *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 > <http://10.51.51.155:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 > <http://10.51.51.142:7337>, creating a new one.* > *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 > <http://10.51.51.142:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 > <http://10.51.51.140:7337>, creating a new one.* > *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 > <http://10.51.51.140:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 > <http://10.51.51.157:7337>, creating a new one.* > *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 > <http://10.51.51.157:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Started 11 remote > fetches in 61 ms* > *19/08/29 20:28:55 INFO FileOutputCommitter: File Output Committer > Algorithm version is 1* > ................. > > The last log from the hanging executor on EMR: > *19/08/29 19:40:40 INFO Executor: Finished task 78.0 in stage 2.0 (TID > 91). 3854 bytes result sent to driver* > *19/08/29 19:40:40 INFO CoarseGrainedExecutorBackend: Got assigned task > 101* > *19/08/29 19:40:40 INFO Executor: Running task 88.0 in stage 2.0 (TID 101)* > *19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty > blocks including 1 local blocks and 11 remote blocks* > *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 > <http://10.51.51.147:7337>, creating a new one.* > *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 > <http://10.51.51.147:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 > <http://10.51.51.157:7337>, creating a new one.* > *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 > <http://10.51.51.157:7337> after 1 ms (0 ms spent in bootstraps)* > *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 > <http://10.51.51.142:7337>, creating a new one.* > *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 > <http://10.51.51.142:7337> after 1 ms (0 ms spent in bootstraps)* > *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 > <http://10.51.51.141:7337>, creating a new one.* > *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 > <http://10.51.51.141:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 > <http://10.51.51.155:7337>, creating a new one.* > *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 > <http://10.51.51.155:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection > to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 > <http://10.51.51.140:7337>, creating a new one.* > *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created > connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 > <http://10.51.51.140:7337> after 0 ms (0 ms spent in bootstraps)* > *19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Started 11 remote > fetches in 73 ms* > > It shows that on the hanging executor, it started fetching data for task > "101", but never reached "FileOutputCommitter", for this particular task > "101". There were other tasks "91" finished without any issue on this > executor before. > I checked the HDFS output location: > *[hadoop@ip-10-51-51-151 ~]$ hadoop fs -ls -R > /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101* > *drwxr-xr-x - hadoop hadoop 0 2019-08-29 19:51 > /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01* > *-rw-r--r-- 2 hadoop hadoop 170976376 2019-08-29 19:51 > /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet* > *drwxr-xr-x - hadoop hadoop 0 2019-08-29 19:51 > /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02* > *-rw-r--r-- 2 hadoop hadoop 102985213 2019-08-29 19:51 > /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet* > *drwxr-xr-x - hadoop hadoop 0 2019-08-29 19:51 > /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03* > *-rw-r--r-- 2 hadoop hadoop 58306503 2019-08-29 19:51 > /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet* > *drwxr-xr-x - hadoop hadoop 0 2019-08-29 19:51 > /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN* > *-rw-r--r-- 2 hadoop hadoop 258330267 2019-08-29 19:51 > /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet* > > In fact, for me, all the intermedia data for this task "101" *SHOULD > ALREADY BE DONE *on HDFS at "19:51". The output parquet files size is > close to other tasks' output which already was finished. > > So my questions are: > > 1) What COULD stop these 2 executors reaching "FileOutputCommitter" in > Spark 2.4.3 in this case? I really don't believe at this time they were > still fetching data from remote. > 2) Of course, this Spark 2.4.3 is running on EMR, and AWS gave us the > following configurations may related to the above issue as below: > > spark.hadoop.yarn.timeline-service.enabled false > spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS $(hostname -f) > spark.files.fetchFailure.unRegisterOutputOnHost true > spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem > 2 > spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem > true > spark.sql.parquet.output.committer.class > com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter > spark.sql.parquet.fs.optimized.committer.optimization-enabled true > spark.sql.emr.internal.extensions > com.amazonaws.emr.spark.EmrSparkSessionExtensions > > Can anyone give me some idea what could cause this issue? > > Thanks > > Yong > -- Sent from my iPhone