cool. On Tue, Mar 12, 2019 at 9:08 AM JF Chen <darou...@gmail.com> wrote:
> Hi > Finally I found the reason... > It caused by some long time gc on some datanodes. After receiving the data > from executors, the data node with long gc cannot report blocks to > namenode, so the writing progress takes a long time. > Now I have decommissioned the broken data nodes, and now my spark runs > well. > I am trying to increase the heap size of data node to check if it can > resolve the problem > > Regard, > Junfeng Chen > > > On Fri, Mar 8, 2019 at 8:54 PM Shyam P <shyamabigd...@gmail.com> wrote: > >> Did you check this , how many portions and count of records it shoes ? >> >> //count by partition_id >> import org.apache.spark.sql.functions.spark_partition_id >> df.groupBy(spark_partition_id).count.show() >> >> >> >> Are you getting same number of parquet files ? >> >> You gradually increase the sample size. >> >> On Fri, 8 Mar 2019, 14:17 JF Chen, <darou...@gmail.com> wrote: >> >>> I check my partitionBy method again, it's partitionBy(appname, year, >>> month, day, hour), and the number of partitions of appname is much more >>> than partition of year, month, day, and hour. My spark streaming app runs >>> every 5 minutes, so year, month, day, and hour should be same in most of >>> time. >>> So will the number of appname pattition affect the writing efficiency? >>> >>> Regard, >>> Junfeng Chen >>> >>> >>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <darou...@gmail.com> wrote: >>> >>>> Yes, I agree. >>>> >>>> From the spark UI I can ensure data is not skewed. There is only about >>>> 100MB for each task, where most of tasks takes several seconds to write the >>>> data to hdfs, and some tasks takes minutes of time. >>>> >>>> Regard, >>>> Junfeng Chen >>>> >>>> >>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <shyamabigd...@gmail.com> wrote: >>>> >>>>> Hi JF, >>>>> Yes first we should know actual number of partitions dataframe has and >>>>> its counts of records. Accordingly we should try to have data evenly in >>>>> all >>>>> partitions. >>>>> It always better to have Num of paritions = N * Num of executors. >>>>> >>>>> >>>>> "But the sequence of columns in partitionBy decides the >>>>> directory hierarchy structure. I hope the sequence of columns not change" >>>>> , this is correct. >>>>> Hence sometimes we should go with bigger number first then lesser .... >>>>> try this ..i.e. more parent directories and less child directories. Tweet >>>>> around it and try. >>>>> >>>>> "some tasks in write hdfs stage cost much more time than others" may >>>>> be data is skewed, need to distrube them evenly for all partitions. >>>>> >>>>> ~Shyam >>>>> >>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <darou...@gmail.com> wrote: >>>>> >>>>>> Hi Shyam >>>>>> Thanks for your reply. >>>>>> You mean after knowing the partition number of column_a, column_b, >>>>>> column_c, the sequence of column in partitionBy should be same to the >>>>>> order >>>>>> of partitions number of column a, b and c? >>>>>> But the sequence of columns in partitionBy decides the >>>>>> directory hierarchy structure. I hope the sequence of columns not >>>>>> change. >>>>>> >>>>>> And I found one more strange things, some tasks in write hdfs stage >>>>>> cost much more time than others, where the amount of writing data is >>>>>> similar. How to solve it? >>>>>> >>>>>> Regard, >>>>>> Junfeng Chen >>>>>> >>>>>> >>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <shyamabigd...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi JF , >>>>>>> Try to execute it before df.write.... >>>>>>> >>>>>>> //count by partition_id >>>>>>> import org.apache.spark.sql.functions.spark_partition_id >>>>>>> df.groupBy(spark_partition_id).count.show() >>>>>>> >>>>>>> You will come to know how data has been partitioned inside df. >>>>>>> >>>>>>> Small trick we can apply here while partitionBy(column_a, column_b, >>>>>>> column_c) >>>>>>> Makes sure you should have ( column_a partitions) > ( column_b >>>>>>> partitions) > ( column_c partitions) . >>>>>>> >>>>>>> Try this. >>>>>>> >>>>>>> Regards, >>>>>>> Shyam >>>>>>> >>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <darou...@gmail.com> wrote: >>>>>>> >>>>>>>> I am trying to write data in dataset to hdfs via df.write. >>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path) >>>>>>>> However, it costs several minutes to write only hundreds of MB data >>>>>>>> to hdfs. >>>>>>>> From this article >>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>, >>>>>>>> adding repartition method before write should work. But if there >>>>>>>> is data skew, some tasks may cost much longer time than average, which >>>>>>>> still cost much time. >>>>>>>> How to solve this problem? Thanks in advance ! >>>>>>>> >>>>>>>> >>>>>>>> Regard, >>>>>>>> Junfeng Chen >>>>>>>> >>>>>>>