frostruan commented on code in PR #4246:
URL: https://github.com/apache/hbase/pull/4246#discussion_r844018501


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java:
##########
@@ -927,33 +930,59 @@ public void run(PRESP resp) {
 
   @Override
   public CompletableFuture<Void> flush(TableName tableName, byte[] 
columnFamily) {
+    // This is for keeping compatibility with old implementation.
+    // If the server version is lower than the client version, it's possible 
that the
+    // flushTable method is not present in the server side, if so, we need to 
fall back
+    // to the old implementation.
+    FlushTableRequest request = RequestConverter
+      .buildFlushTableRequest(tableName, columnFamily, ng.getNonceGroup(), 
ng.newNonce());
+    CompletableFuture<Void> procFuture =
+      this.<FlushTableRequest, FlushTableResponse>procedureCall(tableName, 
request,
+        (s, c, req, done) -> s.flushTable(c, req, done), (resp) -> 
resp.getProcId(),
+        new FlushTableProcedureBiConsumer(tableName));
+    // here we use another new CompletableFuture because the
+    // procFuture is not fully controlled by ourselves.
     CompletableFuture<Void> future = new CompletableFuture<>();
-    addListener(tableExists(tableName), (exists, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else if (!exists) {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-      } else {
-        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else if (!tableEnabled) {
-            future.completeExceptionally(new 
TableNotEnabledException(tableName));
-          } else {
-            Map<String, String> props = new HashMap<>();
-            if (columnFamily != null) {
-              props.put(HConstants.FAMILY_KEY_STR, 
Bytes.toString(columnFamily));
-            }
-            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, 
tableName.getNameAsString(),
-              props), (ret, err3) -> {
-                if (err3 != null) {
-                  future.completeExceptionally(err3);
+    addListener(procFuture, (ret, error) -> {
+      if (error != null) {
+        if (error instanceof DoNotRetryIOException) {
+          // usually this is caused by the method is not present on the server 
or
+          // the hbase hadoop version does not match the running hadoop 
version.
+          // if that happens, we need fall back to the old flush 
implementation.
+          LOG.info("Unrecoverable error in master side. Fallback to 
FlushTableProcedure V1", error);
+          addListener(tableExists(tableName), (exists, err) -> {

Review Comment:
   Ok. Thanks Duo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to