[ 
https://issues.apache.org/jira/browse/FLINK-1287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14244157#comment-14244157
 ] 

ASF GitHub Bot commented on FLINK-1287:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/258#discussion_r21745099
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
 ---
    @@ -184,15 +209,159 @@ private static final boolean isLocal(String 
flinkHost, String[] hosts) {
                                return true;
                        }
                }
    -           
    +
                return false;
        }
    -   
    +
        public int getNumberOfLocalAssignments() {
                return localAssignments;
        }
    -   
    +
        public int getNumberOfRemoteAssignments() {
                return remoteAssignments;
        }
    +
    +    /**
    +     * Wraps a LocatableInputSplit and adds a count for the number of 
observed hosts
    +     * that can access the split locally.
    +     */
    +   public static class LocatableInputSplitWithCount {
    +
    +           private final LocatableInputSplit split;
    +           private int localCount;
    +
    +           public LocatableInputSplitWithCount(LocatableInputSplit split) {
    +                   this.split = split;
    +                   this.localCount = 0;
    +           }
    +
    +           public void incrementLocalCount() {
    +                   this.localCount++;
    +           }
    +
    +           public int getLocalCount() {
    +                   return this.localCount;
    +           }
    +
    +           public LocatableInputSplit getSplit() {
    +                   return this.split;
    +           }
    +
    +   }
    +
    +   /**
    +    * Holds a list of LocatableInputSplits and returns the split with the 
lowest local count.
    +    * The rational is that splits which are local on few hosts should be 
preferred over others which
    +     * have more degrees of freedom for local assignment.
    +    *
    +    * Internally, the splits are stored in a linked list. Sorting the list 
is not a good solution,
    +    * as local counts are updated whenever a previously unseen host 
requests a split.
    +    * Instead, we track the minimum local count and iteratively look for 
splits with that minimum count.
    +    */
    +   public static class LocatableInputSplitChooser {
    --- End diff --
    
    we could decrease the visibility of this class (would also apply for the 
other one)


> Improve File Input Split assignment
> -----------------------------------
>
>                 Key: FLINK-1287
>                 URL: https://issues.apache.org/jira/browse/FLINK-1287
>             Project: Flink
>          Issue Type: Improvement
>          Components: Local Runtime
>            Reporter: Robert Metzger
>            Assignee: Fabian Hueske
>
> While running some DFS read-intensive benchmarks, I found that the assignment 
> of input splits is not optimal. In particular in cases where the numWorker != 
> numDataNodes and when the replication factor is low (in my case it was 1).
> In the particular example, the input had 40960 splits, of which 4694 were 
> read remotely.  Spark did only 2056 remote reads for the same dataset.
> With the replication factor increased to 2, Flink did only 290 remote reads. 
> So usually, users shouldn't be affected by this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to