[
https://issues.apache.org/jira/browse/FLINK-1287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14244154#comment-14244154
]
ASF GitHub Bot commented on FLINK-1287:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/258#discussion_r21744870
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
---
@@ -36,89 +33,111 @@
/**
* The locatable input split assigner assigns to each host splits that are
local, before assigning
- * splits that are not local.
+ * splits that are not local.
*/
public final class LocatableInputSplitAssigner implements
InputSplitAssigner {
private static final Logger LOG =
LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
+ // unassigned input splits
+ private final Set<LocatableInputSplitWithCount> unassigned = new
HashSet<LocatableInputSplitWithCount>();
+
+ // input splits indexed by host for local assignment
+ private final ConcurrentHashMap<String, LocatableInputSplitChooser>
localPerHost = new ConcurrentHashMap<String, LocatableInputSplitChooser>();
+
+ // unassigned splits for remote assignment
+ private final LocatableInputSplitChooser remoteSplitChooser;
- private final Set<LocatableInputSplit> unassigned = new
HashSet<LocatableInputSplit>();
-
- private final ConcurrentHashMap<String, List<LocatableInputSplit>>
localPerHost = new ConcurrentHashMap<String, List<LocatableInputSplit>>();
-
private int localAssignments; // lock protected by the
unassigned set lock
-
+
private int remoteAssignments; // lock protected by the
unassigned set lock
//
--------------------------------------------------------------------------------------------
-
+
public LocatableInputSplitAssigner(Collection<LocatableInputSplit>
splits) {
- this.unassigned.addAll(splits);
+ for(LocatableInputSplit split : splits) {
+ this.unassigned.add(new
LocatableInputSplitWithCount(split));
+ }
+ this.remoteSplitChooser = new
LocatableInputSplitChooser(unassigned);
}
-
+
public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
- Collections.addAll(this.unassigned, splits);
+ for(LocatableInputSplit split : splits) {
+ this.unassigned.add(new
LocatableInputSplitWithCount(split));
+ }
+ this.remoteSplitChooser = new
LocatableInputSplitChooser(unassigned);
}
-
+
//
--------------------------------------------------------------------------------------------
@Override
public LocatableInputSplit getNextInputSplit(String host) {
- // for a null host, we return an arbitrary split
+
+ // for a null host, we return a remote split
if (host == null) {
-
- synchronized (this.unassigned) {
- Iterator<LocatableInputSplit> iter =
this.unassigned.iterator();
- if (iter.hasNext()) {
- LocatableInputSplit next = iter.next();
- iter.remove();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Assigning split to
null host (random assignment).");
- }
-
- remoteAssignments++;
- return next;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No more unassigned
input splits remaining.");
+ synchronized (this.remoteSplitChooser) {
+ synchronized (this.unassigned) {
+
+ LocatableInputSplitWithCount split =
this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned);
+
+ if (split != null) {
+ // got a split to assign.
Double check that it hasn't been assigned before.
+ if
(this.unassigned.remove(split)) {
+ if
(LOG.isInfoEnabled()) {
+
LOG.info("Assigning split to null host (random assignment).");
+ }
+
+ remoteAssignments++;
+ return split.getSplit();
+ } else {
+ throw new
IllegalStateException("Chosen InputSplit has already been assigned. This should
not happen!");
+ }
+ } else {
+ // all splits consumed
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No more
unassigned input splits remaining.");
+ }
+ return null;
}
- return null;
}
}
}
-
+
host = host.toLowerCase(Locale.US);
-
+
// for any non-null host, we take the list of non-null splits
- List<LocatableInputSplit> localSplits =
this.localPerHost.get(host);
-
+ LocatableInputSplitChooser localSplits =
this.localPerHost.get(host);
+
// if we have no list for this host yet, create one
if (localSplits == null) {
- localSplits = new ArrayList<LocatableInputSplit>(16);
-
+ localSplits = new LocatableInputSplitChooser();
+
// lock the list, to be sure that others have to wait
for that host's local list
synchronized (localSplits) {
- List<LocatableInputSplit> prior =
this.localPerHost.putIfAbsent(host, localSplits);
-
+ LocatableInputSplitChooser prior =
this.localPerHost.putIfAbsent(host, localSplits);
+
// if someone else beat us in the case to
create this list, then we do not populate this one, but
// simply work with that other list
if (prior == null) {
// we are the first, we populate
-
+
// first, copy the remaining splits to
release the lock on the set early
// because that is shared among threads
- LocatableInputSplit[] remaining;
+ LocatableInputSplitWithCount[]
remaining;
synchronized (this.unassigned) {
- remaining =
(LocatableInputSplit[]) this.unassigned.toArray(new
LocatableInputSplit[this.unassigned.size()]);
+ remaining =
(LocatableInputSplitWithCount[]) this.unassigned.toArray(new
LocatableInputSplitWithCount[this.unassigned.size()]);
--- End diff --
I think the cast is not necessary?
> Improve File Input Split assignment
> -----------------------------------
>
> Key: FLINK-1287
> URL: https://issues.apache.org/jira/browse/FLINK-1287
> Project: Flink
> Issue Type: Improvement
> Components: Local Runtime
> Reporter: Robert Metzger
> Assignee: Fabian Hueske
>
> While running some DFS read-intensive benchmarks, I found that the assignment
> of input splits is not optimal. In particular in cases where the numWorker !=
> numDataNodes and when the replication factor is low (in my case it was 1).
> In the particular example, the input had 40960 splits, of which 4694 were
> read remotely. Spark did only 2056 remote reads for the same dataset.
> With the replication factor increased to 2, Flink did only 290 remote reads.
> So usually, users shouldn't be affected by this issue.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)