[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14585373#comment-14585373
 ] 

Bikas Saha commented on TEZ-2496:
---------------------------------

Will thread safety be an issue if a different partition writers are writing the 
their partition sizes?
{code}
+  public void setDataSizeForDest(int destIndex, long size) {
+    if (partitionStatistics == null) {
+      partitionStatistics = new BitSet();
+    }{code}

Since sizeInMb is used in getBucket() can the whole calculation wrt size be 
moved to getBucket() to simplify the code.
{code}+  public void setDataSizeForDest(int destIndex, long size) {
+    if (partitionStatistics == null) {
+      partitionStatistics = new BitSet();
+    }
+    int sizeInMB = (int) (size / (1024l * 1024l));
+    int index = destIndex * (BUCKET_SIZE_IN_MB.length + 1);
+    int bucket = getBucket(sizeInMB) + 1;{code}

The actual int value is not being used anywhere other than as an exists check, 
right? Then why not simply writeBoolean?
{code}+    int cardinality = in.readInt();
+    if (cardinality > -1) {
+      BytesWritable pStats = new BytesWritable();{code}

Duplication of existing code?
{code}+  private int[] createIndices(int partitionRange, int taskIndex, int 
offSetPerTask) {
+    int startIndex = taskIndex * offSetPerTask;
+    int[] indices = new int[partitionRange];
+    for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) {
+      indices[currentIndex] = (startIndex + currentIndex);
+    }
+    return indices;
+  }{code}

The code in shufflevertexmanager can be tricky to get right :) Needs some 
targeted test coverage, ideally with odd and even number of partitions and 
basePartitionRange == & != remainderRangeForLastShuffle.

Does computing partition size and sorting have to be done every time a task is 
scheduled?

>From the API pov, this is introducing redundancy with the existing 
>setDataSize() that sets it globally. Do both need to be called now? Perhaps we 
>can create setDataSizeWithPartitions(long[] sizes) such that if partition 
>sizes are reported then this can be used, else the existing global API can be 
>used. Internally, setDataSizeWithPartitions() could 1) check all partitions 
>have been specified and then also set the global size by summing the 
>partitions.

Do we have some before after numbers that show memory usage in the AM, say for 
a 10Kx10K job. Ideally, a new case could be added in TestMemoryWithEvents that 
covers this case of sending per partition stats so that we can repro easily.


> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> ----------------------------------------------------------------------------------------------
>
>                 Key: TEZ-2496
>                 URL: https://issues.apache.org/jira/browse/TEZ-2496
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>            WHEN ss_sold_time_sk IS NULL THEN 70429
>            ELSE ss_sold_time_sk
>        END AS ss_sold_time_sk,
>        ss_item_sk,
>        ss_customer_sk,
>        ss_cdemo_sk,
>        ss_hdemo_sk,
>        ss_addr_sk,
>        ss_store_sk,
>        ss_promo_sk,
>        ss_ticket_number,
>        ss_quantity,
>        ss_wholesale_cost,
>        ss_list_price,
>        ss_sales_price,
>        ss_ext_discount_amt,
>        ss_ext_sales_price,
>        ss_ext_wholesale_cost,
>        ss_ext_list_price,
>        ss_ext_tax,
>        ss_coupon_amt,
>        ss_net_paid,
>        ss_net_paid_inc_tax,
>        ss_net_profit,
>        ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to