ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573451500
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
public class FileList implements AutoCloseable, Iterator<String> {
private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
- private static int fileListStreamerID = 0;
- private static final String FILE_LIST_STREAMER_PREFIX =
"file-list-streamer-";
-
- private LinkedBlockingQueue<String> cache;
- private volatile boolean thresholdHit = false;
- private int thresholdPoint;
- private float thresholdFactor = 0.9f;
private Path backingFile;
- private FileListStreamer fileListStreamer;
- private String nextElement;
- private boolean noMoreElement;
+ private String nextElement = null;
private HiveConf conf;
private BufferedReader backingFileReader;
+ private BufferedWriter backingFileWriter;
-
- public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+ public FileList(Path backingFile, HiveConf conf) {
this.backingFile = backingFile;
this.conf = conf;
- if (cacheSize > 0) {
- // Cache size must be > 0 for this list to be used for the write
operation.
- this.cache = new LinkedBlockingQueue<>(cacheSize);
- fileListStreamer = new FileListStreamer(cache, backingFile, conf);
- thresholdPoint = getThreshold(cacheSize);
- LOG.debug("File list backed by {} can be used for write operation.",
backingFile);
- } else {
- thresholdHit = true;
- }
}
- @VisibleForTesting
- FileList(Path backingFile, FileListStreamer fileListStreamer,
LinkedBlockingQueue<String> cache, HiveConf conf) {
- this.backingFile = backingFile;
- this.fileListStreamer = fileListStreamer;
- this.cache = cache;
- this.conf = conf;
- thresholdPoint = getThreshold(cache.remainingCapacity());
- }
-
- /**
- * Only add operation is safe for concurrent operations.
- */
public void add(String entry) throws SemanticException {
- if (thresholdHit && !fileListStreamer.isAlive()) {
- throw new SemanticException("List is not getting saved anymore to file "
+ backingFile.toString());
- }
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(IOException.class).build();
try {
- cache.put(entry);
- } catch (InterruptedException e) {
- throw new SemanticException(e);
- }
- if (!thresholdHit && cache.size() >= thresholdPoint) {
- initStoreToFile(cache.size());
+ retryable.executeCallable((Callable<Void>) ()-> {
+ synchronized (backingFile ) {
Review comment:
1 thread can write at a moment but synchronisation is needed since we
have two elements to write per entry. One being the entry and other is the
newline. These two elements need to be written atomically.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]