[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14585373#comment-14585373 ]
Bikas Saha commented on TEZ-2496: --------------------------------- Will thread safety be an issue if a different partition writers are writing the their partition sizes? {code} + public void setDataSizeForDest(int destIndex, long size) { + if (partitionStatistics == null) { + partitionStatistics = new BitSet(); + }{code} Since sizeInMb is used in getBucket() can the whole calculation wrt size be moved to getBucket() to simplify the code. {code}+ public void setDataSizeForDest(int destIndex, long size) { + if (partitionStatistics == null) { + partitionStatistics = new BitSet(); + } + int sizeInMB = (int) (size / (1024l * 1024l)); + int index = destIndex * (BUCKET_SIZE_IN_MB.length + 1); + int bucket = getBucket(sizeInMB) + 1;{code} The actual int value is not being used anywhere other than as an exists check, right? Then why not simply writeBoolean? {code}+ int cardinality = in.readInt(); + if (cardinality > -1) { + BytesWritable pStats = new BytesWritable();{code} Duplication of existing code? {code}+ private int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) { + int startIndex = taskIndex * offSetPerTask; + int[] indices = new int[partitionRange]; + for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) { + indices[currentIndex] = (startIndex + currentIndex); + } + return indices; + }{code} The code in shufflevertexmanager can be tricky to get right :) Needs some targeted test coverage, ideally with odd and even number of partitions and basePartitionRange == & != remainderRangeForLastShuffle. Does computing partition size and sorting have to be done every time a task is scheduled? >From the API pov, this is introducing redundancy with the existing >setDataSize() that sets it globally. Do both need to be called now? Perhaps we >can create setDataSizeWithPartitions(long[] sizes) such that if partition >sizes are reported then this can be used, else the existing global API can be >used. Internally, setDataSizeWithPartitions() could 1) check all partitions >have been specified and then also set the global size by summing the >partitions. Do we have some before after numbers that show memory usage in the AM, say for a 10Kx10K job. Ideally, a new case could be added in TestMemoryWithEvents that covers this case of sending per partition stats so that we can repro easily. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > ---------------------------------------------------------------------------------------------- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement > Reporter: Rajesh Balamohan > Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE > WHEN ss_sold_time_sk IS NULL THEN 70429 > ELSE ss_sold_time_sk > END AS ss_sold_time_sk, > ss_item_sk, > ss_customer_sk, > ss_cdemo_sk, > ss_hdemo_sk, > ss_addr_sk, > ss_store_sk, > ss_promo_sk, > ss_ticket_number, > ss_quantity, > ss_wholesale_cost, > ss_list_price, > ss_sales_price, > ss_ext_discount_amt, > ss_ext_sales_price, > ss_ext_wholesale_cost, > ss_ext_list_price, > ss_ext_tax, > ss_coupon_amt, > ss_net_paid, > ss_net_paid_inc_tax, > ss_net_profit, > ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)