ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573463126
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -22,154 +22,118 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
public class FileList implements AutoCloseable, Iterator<String> {
private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
- private static int fileListStreamerID = 0;
- private static final String FILE_LIST_STREAMER_PREFIX =
"file-list-streamer-";
-
- private LinkedBlockingQueue<String> cache;
- private volatile boolean thresholdHit = false;
- private int thresholdPoint;
- private float thresholdFactor = 0.9f;
private Path backingFile;
- private FileListStreamer fileListStreamer;
- private String nextElement;
- private boolean noMoreElement;
+ private String nextElement = null;
private HiveConf conf;
private BufferedReader backingFileReader;
+ private BufferedWriter backingFileWriter;
-
- public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+ public FileList(Path backingFile, HiveConf conf) {
this.backingFile = backingFile;
this.conf = conf;
- if (cacheSize > 0) {
- // Cache size must be > 0 for this list to be used for the write
operation.
- this.cache = new LinkedBlockingQueue<>(cacheSize);
- fileListStreamer = new FileListStreamer(cache, backingFile, conf);
- thresholdPoint = getThreshold(cacheSize);
- LOG.debug("File list backed by {} can be used for write operation.",
backingFile);
- } else {
- thresholdHit = true;
- }
}
- @VisibleForTesting
- FileList(Path backingFile, FileListStreamer fileListStreamer,
LinkedBlockingQueue<String> cache, HiveConf conf) {
- this.backingFile = backingFile;
- this.fileListStreamer = fileListStreamer;
- this.cache = cache;
- this.conf = conf;
- thresholdPoint = getThreshold(cache.remainingCapacity());
- }
-
- /**
- * Only add operation is safe for concurrent operations.
- */
public void add(String entry) throws SemanticException {
- if (thresholdHit && !fileListStreamer.isAlive()) {
- throw new SemanticException("List is not getting saved anymore to file "
+ backingFile.toString());
- }
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(IOException.class).build();
try {
- cache.put(entry);
- } catch (InterruptedException e) {
- throw new SemanticException(e);
- }
- if (!thresholdHit && cache.size() >= thresholdPoint) {
- initStoreToFile(cache.size());
+ retryable.executeCallable((Callable<Void>) ()-> {
+ synchronized (backingFile ) {
Review comment:
Printing the entry and writing the newline character are being done in
separate statements. For two entries e1,e2, we need the order of writing to be
e1,newline,e2,newline or e2,newline,e1,newline. This order would be guaranteed
only if we use 'synchronized' block containing the two statements. Otherwise,
we could have cases like e1,e2,newline,newline, which would cause errors.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]