pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r583233006
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws
Throwable {
.verifyResults(new String[]{"2", "3"});
}
+ @Test
+ public void testReplWithRetryDisabledIterators() throws Throwable {
+ List<String> clause = new ArrayList<>();
+ //NS replacement parameters has no effect when data is also copied to
staging
+ clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET +
"'='false'");
+ clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+ primary.run("use " + primaryDbName)
+ .run("create table acid_table (key int, value int) partitioned by
(load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc
tblproperties ('transactional'='true')")
+ .run("create table table1 (i String)")
+ .run("insert into table1 values (1)")
+ .run("insert into table1 values (2)")
+ .dump(primaryDbName, clause);
Review comment:
At this point the entries are written to the _file_list and
_file_list_external.
Please add assertion on file content here.
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws
Throwable {
.verifyResults(new String[]{"2", "3"});
}
+ @Test
+ public void testReplWithRetryDisabledIterators() throws Throwable {
+ List<String> clause = new ArrayList<>();
+ //NS replacement parameters has no effect when data is also copied to
staging
+ clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET +
"'='false'");
+ clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+ primary.run("use " + primaryDbName)
+ .run("create table acid_table (key int, value int) partitioned by
(load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc
tblproperties ('transactional'='true')")
+ .run("create table table1 (i String)")
+ .run("insert into table1 values (1)")
+ .run("insert into table1 values (2)")
+ .dump(primaryDbName, clause);
+ replica.load(replicatedDbName, primaryDbName, clause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"acid_table", "table1"})
+ .run("select * from table1")
+ .verifyResults(new String[] {"1", "2"});
+
+ primary.run("use " + primaryDbName)
+ .run("insert into table1 values (3)")
+ .dump(primaryDbName, clause);
+ replica.load(replicatedDbName, primaryDbName, clause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"acid_table", "table1"})
+ .run("select * from table1")
+ .verifyResults(new String[]{"1", "2", "3"});
+
+ clause.add("'" +
HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname +
"'='false'");
+ primary.run("use " + primaryDbName)
+ .run("create external table ext_table1 (id int)")
+ .run("insert into ext_table1 values (3)")
+ .run("insert into ext_table1 values (4)")
+ .run("create external table ext_table2 (key int, value int)
partitioned by (load_time timestamp)")
+ .run("insert into ext_table2 partition(load_time = '2012-02-21
07:08:09.123') values(1,2)")
+ .run("insert into ext_table2 partition(load_time = '2012-02-21
07:08:09.124') values(1,3)")
+ .run("show partitions ext_table2")
+ .verifyResults(new String[]{
+ "load_time=2012-02-21 07%3A08%3A09.123",
+ "load_time=2012-02-21 07%3A08%3A09.124"})
+ .dump(primaryDbName, clause);
Review comment:
At this point the entries are written to the _file_list and
_file_list_external.
Please add assertion on file content here.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
package org.apache.hadoop.hive.ql.exec.repl.util;
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
public class FileList implements AutoCloseable, Iterator<String> {
private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
- private static int fileListStreamerID = 0;
- private static final String FILE_LIST_STREAMER_PREFIX =
"file-list-streamer-";
-
- private LinkedBlockingQueue<String> cache;
- private volatile boolean thresholdHit = false;
- private int thresholdPoint;
- private float thresholdFactor = 0.9f;
- private Path backingFile;
- private FileListStreamer fileListStreamer;
- private String nextElement;
- private boolean noMoreElement;
+ private final Path backingFile;
+ private String nextElement = null;
+ private String lastReadElement = null;
private HiveConf conf;
+ private volatile boolean abortOperation = false;
+ private volatile boolean retryMode;
private BufferedReader backingFileReader;
+ private volatile FSDataOutputStream backingFileWriter;
-
- public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+ public FileList(Path backingFile, HiveConf conf) {
this.backingFile = backingFile;
this.conf = conf;
- if (cacheSize > 0) {
- // Cache size must be > 0 for this list to be used for the write
operation.
- this.cache = new LinkedBlockingQueue<>(cacheSize);
- fileListStreamer = new FileListStreamer(cache, backingFile, conf);
- thresholdPoint = getThreshold(cacheSize);
- LOG.debug("File list backed by {} can be used for write operation.",
backingFile);
+ this.retryMode = false;
+ }
+
+ public void add(String entry) throws IOException {
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+ writeWithRetry(entry);
} else {
- thresholdHit = true;
+ writeEntry(entry);
}
}
- @VisibleForTesting
- FileList(Path backingFile, FileListStreamer fileListStreamer,
LinkedBlockingQueue<String> cache, HiveConf conf) {
- this.backingFile = backingFile;
- this.fileListStreamer = fileListStreamer;
- this.cache = cache;
- this.conf = conf;
- thresholdPoint = getThreshold(cache.remainingCapacity());
+ private synchronized void writeEntry(String entry) throws IOException {
+ //retry only during creating the file, no retry during writes
+ if (backingFileWriter == null) {
+ try {
+ Retryable retryable = buildRetryable();
+ retryable.executeCallable((Callable<Void>) () -> {
+ if(this.abortOperation) {
+ return null;
+ }
+ backingFileWriter = getWriterCreateMode();
+ return null;
+ });
+ } catch (Exception e) {
+ this.abortOperation = true;
+ throw new
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+ }
+ }
+ if(this.abortOperation) {
+ return;
+ }
+ try {
+ backingFileWriter.writeBytes(getEntryWithNewline(entry));
+ LOG.info("Writing entry {} to file list backed by {}", entry,
backingFile);
+ } catch (IOException e) {
+ this.abortOperation = true;
+ LOG.error("Writing entry {} to file list {} failed.", entry,
backingFile, e);
+ throw e;
+ }
}
- /**
- * Only add operation is safe for concurrent operations.
- */
- public void add(String entry) throws SemanticException {
- if (thresholdHit && !fileListStreamer.isAlive()) {
- throw new SemanticException("List is not getting saved anymore to file "
+ backingFile.toString());
+ private synchronized void writeWithRetry(String entry) throws IOException {
+ Retryable retryable = buildRetryable();
Review comment:
Add test for retry case
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,225 @@
package org.apache.hadoop.hive.ql.exec.repl.util;
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
public class FileList implements AutoCloseable, Iterator<String> {
private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
- private static int fileListStreamerID = 0;
- private static final String FILE_LIST_STREAMER_PREFIX =
"file-list-streamer-";
-
- private LinkedBlockingQueue<String> cache;
- private volatile boolean thresholdHit = false;
- private int thresholdPoint;
- private float thresholdFactor = 0.9f;
- private Path backingFile;
- private FileListStreamer fileListStreamer;
- private String nextElement;
- private boolean noMoreElement;
+ private final Path backingFile;
+ private String nextElement = null;
+ private String lastReadElement = null;
private HiveConf conf;
+ private volatile boolean abortOperation = false;
+ private volatile boolean retryMode;
private BufferedReader backingFileReader;
+ private volatile FSDataOutputStream backingFileWriter;
-
- public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+ public FileList(Path backingFile, HiveConf conf) {
this.backingFile = backingFile;
this.conf = conf;
- if (cacheSize > 0) {
- // Cache size must be > 0 for this list to be used for the write
operation.
- this.cache = new LinkedBlockingQueue<>(cacheSize);
- fileListStreamer = new FileListStreamer(cache, backingFile, conf);
- thresholdPoint = getThreshold(cacheSize);
- LOG.debug("File list backed by {} can be used for write operation.",
backingFile);
+ this.retryMode = false;
+ }
+
+ public void add(String entry) throws IOException {
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+ writeWithRetry(entry);
} else {
- thresholdHit = true;
+ writeEntry(entry);
}
}
- @VisibleForTesting
- FileList(Path backingFile, FileListStreamer fileListStreamer,
LinkedBlockingQueue<String> cache, HiveConf conf) {
- this.backingFile = backingFile;
- this.fileListStreamer = fileListStreamer;
- this.cache = cache;
- this.conf = conf;
- thresholdPoint = getThreshold(cache.remainingCapacity());
+ private synchronized void writeEntry(String entry) throws IOException {
+ //retry only during creating the file, no retry during writes
+ if (backingFileWriter == null) {
+ try {
+ Retryable retryable = buildRetryable();
+ retryable.executeCallable((Callable<Void>) () -> {
+ if(this.abortOperation) {
+ return null;
+ }
+ backingFileWriter = getWriterCreateMode();
+ return null;
+ });
+ } catch (Exception e) {
+ this.abortOperation = true;
+ throw new
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+ }
+ }
+ if(this.abortOperation) {
+ return;
+ }
+ try {
+ backingFileWriter.writeBytes(getEntryWithNewline(entry));
+ LOG.info("Writing entry {} to file list backed by {}", entry,
backingFile);
+ } catch (IOException e) {
+ this.abortOperation = true;
+ LOG.error("Writing entry {} to file list {} failed.", entry,
backingFile, e);
+ throw e;
+ }
}
- /**
- * Only add operation is safe for concurrent operations.
- */
- public void add(String entry) throws SemanticException {
- if (thresholdHit && !fileListStreamer.isAlive()) {
- throw new SemanticException("List is not getting saved anymore to file "
+ backingFile.toString());
+ private synchronized void writeWithRetry(String entry) throws IOException {
+ Retryable retryable = buildRetryable();
+ try {
+ retryable.executeCallable((Callable<Void>) () -> {
+ if (this.abortOperation) {
+ return null;
+ }
+ try {
+ if (backingFileWriter == null) {
+ backingFileWriter = initWriter();
+ }
+ backingFileWriter.writeBytes(getEntryWithNewline(entry));
+ backingFileWriter.hflush();
+ LOG.info("Writing entry {} to file list backed by {}", entry,
backingFile);
+ } catch (IOException e) {
+ LOG.error("Writing entry {} to file list {} failed, attempting
retry.", entry, backingFile, e);
+ this.retryMode = true;
+ close();
+ throw e;
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ this.abortOperation = true;
+ throw new
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
}
+ }
+
+ Retryable buildRetryable() {
+ return Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(IOException.class).build();
+ }
+
+ // Return the entry ensuring it ends with newline.
+ private String getEntryWithNewline(String entry) {
+ return new StringWriter()
+ .append(entry)
+ .append(System.lineSeparator())
+ .toString();
+ }
+
+ FSDataOutputStream initWriter() throws IOException {
+ if(shouldAppend()) {
+ return getWriterAppendMode(); // append in retry-mode if file has been
created already
+ }
+ else {
+ return getWriterCreateMode();
+ }
+ }
+
+ private boolean shouldAppend() throws IOException {
+ return backingFile.getFileSystem(conf).exists(backingFile) &&
this.retryMode;
+ }
+
+ FSDataOutputStream getWriterCreateMode() throws IOException {
try {
- cache.put(entry);
- } catch (InterruptedException e) {
- throw new SemanticException(e);
+ return backingFile.getFileSystem(conf).create(backingFile);
+ } catch (IOException e) {
+ LOG.error("Error opening {} in append mode", backingFile, e);
+ throw e;
}
- if (!thresholdHit && cache.size() >= thresholdPoint) {
- initStoreToFile(cache.size());
+ }
+
+ FSDataOutputStream getWriterAppendMode() throws IOException {
+ try {
+ return backingFile.getFileSystem(conf).append(backingFile);
+ } catch (IOException e) {
+ LOG.error("Error creating file {}", backingFile, e);
+ throw e;
}
}
@Override
public boolean hasNext() {
- if (!thresholdHit) {
- return (cache != null && !cache.isEmpty());
- }
- if (nextElement != null) {
+ /*
+ We assume that every add operation either adds an entry completely or
doesn't add at all.
+ If this assumption changes then in the following check we should check for
incomplete entries.
+ We remove duplicate entries assuming they are only written consecutively.
+ */
+ if (nextElement != null && !nextElement.equals(lastReadElement)) {
return true;
+ } else {
+ try {
+ do {
+ nextElement = readNextLine();
+ if(nextElement != null && !nextElement.equals(lastReadElement)) {
+ return true;
+ }
+ } while (nextElement != null);
+ return false;
+ } catch (IOException e) {
+ nextElement = null;
+ lastReadElement = null;
+ backingFileReader = null;
+ throw new UncheckedIOException(e);
+ }
}
- if (noMoreElement) {
- return false;
- }
- nextElement = readNextLine();
- if (nextElement == null) {
- noMoreElement = true;
- }
- return !noMoreElement;
}
@Override
public String next() {
- if (!hasNext()) {
+ if ((nextElement == null || nextElement.equals(lastReadElement)) &&
!hasNext()) {
throw new NoSuchElementException("No more element in the list backed by
" + backingFile);
}
- String retVal = nextElement;
+ lastReadElement = nextElement;
nextElement = null;
- return thresholdHit ? retVal : cache.poll();
- }
-
- private synchronized void initStoreToFile(int cacheSize) {
- if (!thresholdHit) {
- fileListStreamer.setName(getNextID());
- fileListStreamer.setDaemon(true);
- fileListStreamer.start();
- thresholdHit = true;
- LOG.info("Started streaming the list elements to file: {}, cache size
{}", backingFile, cacheSize);
- }
+ return lastReadElement;
}
- private String readNextLine() {
- String nextElement = null;
- try {
+ private String readNextLine() throws IOException {
+ try{
+ String nextElement;
if (backingFileReader == null) {
- FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
- if (fs.exists(backingFile)) {
- backingFileReader = new BufferedReader(new
InputStreamReader(fs.open(backingFile)));
+ FileSystem fs = backingFile.getFileSystem(conf);
+ if (!fs.exists(backingFile)) {
+ return null;
}
+ backingFileReader = new BufferedReader(new
InputStreamReader(fs.open(backingFile)));
}
- nextElement = (backingFileReader == null) ? null :
backingFileReader.readLine();
+ nextElement = backingFileReader.readLine();
+ return nextElement;
} catch (IOException e) {
- LOG.error("Unable to read list from backing file " + backingFile, e);
+ LOG.error("Exception while reading file {}.", backingFile, e);
+ close();
+ throw e;
}
- return nextElement;
}
@Override
public void close() throws IOException {
- if (thresholdHit && fileListStreamer != null) {
- fileListStreamer.close();
- }
- if (backingFileReader != null) {
- backingFileReader.close();
- }
- LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ",
backingFile, thresholdHit);
- }
-
- private static String getNextID() {
- if (Integer.MAX_VALUE == fileListStreamerID) {
- //reset the counter
- fileListStreamerID = 0;
+ try {
+ if (backingFileReader != null) {
+ backingFileReader.close();
+ }
+ if (backingFileWriter != null) {
+ backingFileWriter.close();
+ }
+ LOG.info("Completed close for File List backed by:{}", backingFile);
+ } finally {
+ if(backingFileReader != null) {
Review comment:
nit: You can consider removing if blocks .
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws
Throwable {
.verifyResults(new String[]{"2", "3"});
}
+ @Test
+ public void testReplWithRetryDisabledIterators() throws Throwable {
Review comment:
Actually it doesn't check the fact that retry isn't happening, no?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -658,35 +657,34 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData
dmd, Path cmRoot, Hive
Path dbRootMetadata = new Path(metadataPath, dbName);
Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME +
File.separator + dbName);
boolean dataCopyAtLoad =
conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
- try (Writer writer = new Writer(dumpRoot, conf)) {
- for (String tableName : Utils.matchesTbl(hiveDb, dbName,
work.replScope)) {
- try {
- Table table = hiveDb.getTable(dbName, tableName);
+ ReplExternalTables externalTablesWriter = new ReplExternalTables(conf);
+ for (String tableName : Utils.matchesTbl(hiveDb, dbName,
work.replScope)) {
+ try {
+ Table table = hiveDb.getTable(dbName, tableName);
- // Dump external table locations if required.
- if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
- && shouldDumpExternalTableLocation()) {
- writer.dataLocationDump(table, extTableFileList, conf);
- }
+ // Dump external table locations if required.
+ if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+ && shouldDumpExternalTableLocation()) {
Review comment:
nit: can be in the same line
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]