This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push: new a192ec444 Revert "TEZ-4397 Open Tez Input splits asynchronously" a192ec444 is described below commit a192ec4443032c33269f5326755a292c87a98292 Author: Laszlo Bodor <bodorlaszlo0...@gmail.com> AuthorDate: Mon Jul 4 11:04:34 2022 +0200 Revert "TEZ-4397 Open Tez Input splits asynchronously" This reverts commit f724c546069885e29e6446813805bb63bf0d5d9d. --- .../mapred/split/TezGroupedSplitsInputFormat.java | 72 ++-------------------- .../tez/mapreduce/grouper/TezSplitGrouper.java | 11 ---- 2 files changed, 4 insertions(+), 79 deletions(-) diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java index 6266ec1bc..61ba56030 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java @@ -19,16 +19,8 @@ package org.apache.hadoop.mapred.split; import java.io.IOException; -import java.util.LinkedList; import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -137,58 +129,14 @@ public class TezGroupedSplitsInputFormat<K, V> int idx = 0; long progress; RecordReader<K, V> curReader; - final AtomicInteger initIndex; - final int numReaders; - final ExecutorService initReaderExecService; - final Queue<Future<RecordReader<K,V>>> initedReaders; - + public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job, Reporter reporter) throws IOException { this.groupedSplit = split; this.job = job; this.reporter = reporter; - this.initIndex = new AtomicInteger(0); - int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS, - TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT); - this.numReaders = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS, - TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT); - this.initReaderExecService = Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder() - .setDaemon(true) - .setPriority(Thread.MAX_PRIORITY) - .setNameFormat("TEZ-Split-Init-Thread-%d") - .build()); - this.initedReaders = new LinkedList<>(); - preInitReaders(); initNextRecordReader(); } - - private void preInitReaders() { - if (initReaderExecService == null) { - LOG.info("Init record reader threadpool is not initialized"); - return; - } - for (int i = 0; i < numReaders; i++) { - initedReaders.offer(this.initReaderExecService.submit(() -> { - try { - int index = initIndex.getAndIncrement(); - if (index >= groupedSplit.wrappedSplits.size()) { - return null; - } - InputSplit s = groupedSplit.wrappedSplits.get(index); - RecordReader<K, V> reader = wrappedInputFormat.getRecordReader(s, job, reporter); - LOG.debug("Init Thread processed reader number {} initialization", index); - return reader; - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - cancelsFutures(); - throw new RuntimeException(e); - } - })); - } - } @Override public boolean next(K key, V value) throws IOException { @@ -235,8 +183,6 @@ public class TezGroupedSplitsInputFormat<K, V> // if all chunks have been processed, nothing more to do. if (idx == groupedSplit.wrappedSplits.size()) { - LOG.info("Shutting down the init record reader threadpool"); - initReaderExecService.shutdownNow(); return false; } @@ -247,25 +193,15 @@ public class TezGroupedSplitsInputFormat<K, V> // get a record reader for the idx-th chunk try { - curReader = initedReaders.poll().get(); - preInitReaders(); + curReader = wrappedInputFormat.getRecordReader( + groupedSplit.wrappedSplits.get(idx), job, reporter); } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - cancelsFutures(); - throw new RuntimeException(e); + throw new RuntimeException (e); } idx++; return true; } - private void cancelsFutures() { - for (Future<RecordReader<K,V>> f : initedReaders) { - f.cancel(true); - } - } - @Override public long getPos() throws IOException { long subprogress = 0; // bytes processed in current split diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java index 3b2f17d1f..a1d6b6c80 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java @@ -102,17 +102,6 @@ public abstract class TezSplitGrouper { public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only"; public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false; - /** - * Number of threads used to initialize the grouped splits, to asynchronously open the readers. - */ - public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init-threads"; - public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4; - - /** - * Number of record readers to asynchronously and proactively init. - */ - public static final String TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS = "tez.grouping.split.init.num-recordreaders"; - public static final int TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT = 10; static class LocationHolder { List<SplitContainer> splits;