[ 
https://issues.apache.org/jira/browse/FLINK-1287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14244154#comment-14244154
 ] 

ASF GitHub Bot commented on FLINK-1287:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/258#discussion_r21744870
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
 ---
    @@ -36,89 +33,111 @@
     
     /**
      * The locatable input split assigner assigns to each host splits that are 
local, before assigning
    - * splits that are not local. 
    + * splits that are not local.
      */
     public final class LocatableInputSplitAssigner implements 
InputSplitAssigner {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
     
    +   // unassigned input splits
    +   private final Set<LocatableInputSplitWithCount> unassigned = new 
HashSet<LocatableInputSplitWithCount>();
    +
    +   // input splits indexed by host for local assignment
    +   private final ConcurrentHashMap<String, LocatableInputSplitChooser> 
localPerHost = new ConcurrentHashMap<String, LocatableInputSplitChooser>();
    +
    +    // unassigned splits for remote assignment
    +   private final LocatableInputSplitChooser remoteSplitChooser;
     
    -   private final Set<LocatableInputSplit> unassigned = new 
HashSet<LocatableInputSplit>();
    -   
    -   private final ConcurrentHashMap<String, List<LocatableInputSplit>> 
localPerHost = new ConcurrentHashMap<String, List<LocatableInputSplit>>();
    -   
        private int localAssignments;           // lock protected by the 
unassigned set lock
    -   
    +
        private int remoteAssignments;          // lock protected by the 
unassigned set lock
     
        // 
--------------------------------------------------------------------------------------------
    -   
    +
        public LocatableInputSplitAssigner(Collection<LocatableInputSplit> 
splits) {
    -           this.unassigned.addAll(splits);
    +           for(LocatableInputSplit split : splits) {
    +                   this.unassigned.add(new 
LocatableInputSplitWithCount(split));
    +           }
    +           this.remoteSplitChooser = new 
LocatableInputSplitChooser(unassigned);
        }
    -   
    +
        public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
    -           Collections.addAll(this.unassigned, splits);
    +           for(LocatableInputSplit split : splits) {
    +                   this.unassigned.add(new 
LocatableInputSplitWithCount(split));
    +           }
    +           this.remoteSplitChooser = new 
LocatableInputSplitChooser(unassigned);
        }
    -   
    +
        // 
--------------------------------------------------------------------------------------------
     
        @Override
        public LocatableInputSplit getNextInputSplit(String host) {
    -           // for a null host, we return an arbitrary split
    +
    +           // for a null host, we return a remote split
                if (host == null) {
    -                   
    -                   synchronized (this.unassigned) {
    -                           Iterator<LocatableInputSplit> iter = 
this.unassigned.iterator();
    -                           if (iter.hasNext()) {
    -                                   LocatableInputSplit next = iter.next();
    -                                   iter.remove();
    -                                   
    -                                   if (LOG.isInfoEnabled()) {
    -                                           LOG.info("Assigning split to 
null host (random assignment).");
    -                                   }
    -                                   
    -                                   remoteAssignments++;
    -                                   return next;
    -                           } else {
    -                                   if (LOG.isDebugEnabled()) {
    -                                           LOG.debug("No more unassigned 
input splits remaining.");
    +                   synchronized (this.remoteSplitChooser) {
    +                           synchronized (this.unassigned) {
    +
    +                                   LocatableInputSplitWithCount split = 
this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned);
    +
    +                                   if (split != null) {
    +                                           // got a split to assign. 
Double check that it hasn't been assigned before.
    +                                           if 
(this.unassigned.remove(split)) {
    +                                                   if 
(LOG.isInfoEnabled()) {
    +                                                           
LOG.info("Assigning split to null host (random assignment).");
    +                                                   }
    +
    +                                                   remoteAssignments++;
    +                                                   return split.getSplit();
    +                                           } else {
    +                                                   throw new 
IllegalStateException("Chosen InputSplit has already been assigned. This should 
not happen!");
    +                                           }
    +                                   } else {
    +                                           // all splits consumed
    +                                           if (LOG.isDebugEnabled()) {
    +                                                   LOG.debug("No more 
unassigned input splits remaining.");
    +                                           }
    +                                           return null;
                                        }
    -                                   return null;
                                }
                        }
                }
    -           
    +
                host = host.toLowerCase(Locale.US);
    -           
    +
                // for any non-null host, we take the list of non-null splits
    -           List<LocatableInputSplit> localSplits = 
this.localPerHost.get(host);
    -           
    +           LocatableInputSplitChooser localSplits = 
this.localPerHost.get(host);
    +
                // if we have no list for this host yet, create one
                if (localSplits == null) {
    -                   localSplits = new ArrayList<LocatableInputSplit>(16);
    -                   
    +                   localSplits = new LocatableInputSplitChooser();
    +
                        // lock the list, to be sure that others have to wait 
for that host's local list
                        synchronized (localSplits) {
    -                           List<LocatableInputSplit> prior = 
this.localPerHost.putIfAbsent(host, localSplits);
    -                           
    +                           LocatableInputSplitChooser prior = 
this.localPerHost.putIfAbsent(host, localSplits);
    +
                                // if someone else beat us in the case to 
create this list, then we do not populate this one, but
                                // simply work with that other list
                                if (prior == null) {
                                        // we are the first, we populate
    -                                   
    +
                                        // first, copy the remaining splits to 
release the lock on the set early
                                        // because that is shared among threads
    -                                   LocatableInputSplit[] remaining;
    +                                   LocatableInputSplitWithCount[] 
remaining;
                                        synchronized (this.unassigned) {
    -                                           remaining = 
(LocatableInputSplit[]) this.unassigned.toArray(new 
LocatableInputSplit[this.unassigned.size()]);
    +                                           remaining = 
(LocatableInputSplitWithCount[]) this.unassigned.toArray(new 
LocatableInputSplitWithCount[this.unassigned.size()]);
    --- End diff --
    
    I think the cast is not necessary?


> Improve File Input Split assignment
> -----------------------------------
>
>                 Key: FLINK-1287
>                 URL: https://issues.apache.org/jira/browse/FLINK-1287
>             Project: Flink
>          Issue Type: Improvement
>          Components: Local Runtime
>            Reporter: Robert Metzger
>            Assignee: Fabian Hueske
>
> While running some DFS read-intensive benchmarks, I found that the assignment 
> of input splits is not optimal. In particular in cases where the numWorker != 
> numDataNodes and when the replication factor is low (in my case it was 1).
> In the particular example, the input had 40960 splits, of which 4694 were 
> read remotely.  Spark did only 2056 remote reads for the same dataset.
> With the replication factor increased to 2, Flink did only 290 remote reads. 
> So usually, users shouldn't be affected by this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to