[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615484#comment-14615484 ]
Bikas Saha commented on TEZ-2496: --------------------------------- IMO, the solution is risky and I am wary of adding it into the API for everyone to use because its not clear that we have a viable long term solution. A single job with 50Kx20K vertices would end up with 1GB or more in the bitmap. Or multiple large vertices running in parallel would hold memory. Secondly, clearing the partition stats makes this susceptible to even 1 task failure. Clearing some stats but keeping others may lead of inconsistencies down the road or affect other stats improvements. Approximating large histograms is not a new problem and we may be able to find some known work (like https://metamarkets.com/2013/histograms/) to create a more viable algorithmic solution. However, we do need to start experimenting with what we can do with stats and so making progress is also important. So here is my suggestion. How about we dont make this part of the API or the internal stats flow for now? That keeps the main engine scalable and we dont expose a WIP API. We can send the same bitmap information in VertexManagerEvents from the output to the ShuffleVertexManager. We already do that to send the total output size per task. We can enhance the payload to send the partition info instead. This way we can send the necessary info and solve the scenario this jira is trying to address. Here are the pros I see with this 1) Uses the existing event mechanism by enhancing its payload. We know this works e2e and less scope for bugs. 2) Does not change the internal flow or API's. 3) Is localized to the ShuffleVertexManager (which needs this) and the feature can be turned on/off based on config for large jobs if needed, API's cannot be turned on/off based on config. Essentially, this lives in "user" land instead of "tez" land. Once we have experimented with this and refined this, we can consider moving this from user land into tez land by adding it to the API infra. Thoughts? > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > ---------------------------------------------------------------------------------------------- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement > Reporter: Rajesh Balamohan > Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE > WHEN ss_sold_time_sk IS NULL THEN 70429 > ELSE ss_sold_time_sk > END AS ss_sold_time_sk, > ss_item_sk, > ss_customer_sk, > ss_cdemo_sk, > ss_hdemo_sk, > ss_addr_sk, > ss_store_sk, > ss_promo_sk, > ss_ticket_number, > ss_quantity, > ss_wholesale_cost, > ss_list_price, > ss_sales_price, > ss_ext_discount_amt, > ss_ext_sales_price, > ss_ext_wholesale_cost, > ss_ext_list_price, > ss_ext_tax, > ss_coupon_amt, > ss_net_paid, > ss_net_paid_inc_tax, > ss_net_profit, > ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)