This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d8885b22d0 IGNITE-18674 Introduce async logic to CLI (#1731)
d8885b22d0 is described below

commit d8885b22d05231af53996ace5bbf9fb0564bedc9
Author: Aleksandr Pakhomov <apk...@gmail.com>
AuthorDate: Thu Mar 2 18:52:17 2023 +0400

    IGNITE-18674 Introduce async logic to CLI (#1731)
    
    Co-authored-by: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com>
---
 gradle/libs.versions.toml                          |   3 +
 modules/cli/build.gradle                           |   1 +
 .../internal/cli/commands/sql/SqlReplCommand.java  |   5 +-
 .../core/call/AbstractCallExecutionPipeline.java   | 108 +++++++++++++
 .../cli/core/call/{Call.java => AsyncCall.java}    |  14 +-
 .../cli/core/call/AsyncCallExecutionPipeline.java  |  82 ++++++++++
 .../call/AsyncCallExecutionPipelineBuilder.java    | 128 +++++++++++++++
 .../apache/ignite/internal/cli/core/call/Call.java |   1 +
 .../cli/core/call/CallExecutionPipeline.java       | 177 ++-------------------
 .../core/call/CallExecutionPipelineBuilder.java    | 107 +++++++++++++
 .../call/{Call.java => ProgressBarTracker.java}    |  24 +--
 .../core/call/{Call.java => ProgressTracker.java}  |  13 +-
 .../cli/core/call/SingleCallExecutionPipeline.java |  58 +++++++
 .../ignite/internal/cli/sql/SqlSchemaProvider.java |  25 ++-
 .../internal/cli/core/call/PipelineTest.java       |  55 ++++++-
 .../internal/cli/sql/SqlSchemaProviderTest.java    |  14 +-
 16 files changed, 613 insertions(+), 202 deletions(-)

diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index ddc93a2355..96a73f7daf 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -72,6 +72,7 @@ testkit = "1.8.2"
 openapi = "3.2.0"
 autoService = "1.0.1"
 awaitility = "4.2.0"
+progressBar = "0.9.4"
 
 #Tools
 pmdTool = "6.28.0"
@@ -229,3 +230,5 @@ auto-service = { module = 
"com.google.auto.service:auto-service", version.ref =
 auto-service-annotations = { module = 
"com.google.auto.service:auto-service-annotations", version.ref = "autoService" 
}
 
 awaitility = { module = "org.awaitility:awaitility", version.ref = 
"awaitility" }
+
+progressBar = { module = "me.tongfei:progressbar", version.ref = "progressBar" 
}
diff --git a/modules/cli/build.gradle b/modules/cli/build.gradle
index ea83e10ab1..aa408e17f9 100644
--- a/modules/cli/build.gradle
+++ b/modules/cli/build.gradle
@@ -60,6 +60,7 @@ dependencies {
     implementation libs.okhttp.logging
     implementation libs.threetenbp
     implementation libs.swagger.legacy.annotations
+    implementation libs.progressBar
 
     testAnnotationProcessor libs.picocli.annotation.processor
     testAnnotationProcessor libs.micronaut.inject.annotation.processor
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java
index 71cf1948b4..911760001a 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java
@@ -100,7 +100,10 @@ public class SqlReplCommand extends BaseCommand implements 
Runnable {
         try (SqlManager sqlManager = new SqlManager(jdbc)) {
             // When passing white space to this command, picocli will treat it 
as a positional argument
             if (execOptions == null || (execOptions.command != null && 
execOptions.command.isBlank())) {
-                SqlCompleter sqlCompleter = new SqlCompleter(new 
SqlSchemaProvider(sqlManager::getMetadata));
+                SqlSchemaProvider schemaProvider = new 
SqlSchemaProvider(sqlManager::getMetadata);
+                schemaProvider.initStateAsync();
+
+                SqlCompleter sqlCompleter = new SqlCompleter(schemaProvider);
                 IgniteSqlCommandCompleter sqlCommandCompleter = new 
IgniteSqlCommandCompleter();
                 replExecutorProvider.get().execute(Repl.builder()
                         .withPromptProvider(() -> 
ansi(fg(Color.GREEN).mark("sql-cli> ")))
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AbstractCallExecutionPipeline.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AbstractCallExecutionPipeline.java
new file mode 100644
index 0000000000..0022c4dcf9
--- /dev/null
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AbstractCallExecutionPipeline.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.PrintWriter;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+import org.apache.ignite.internal.cli.core.exception.ExceptionWriter;
+import org.apache.ignite.internal.cli.logger.CliLoggers;
+
+/**
+ * Abstract implementation of {@link CallExecutionPipeline}.
+ * Implements common error handling logic and verbose output redirection.
+ */
+public abstract class AbstractCallExecutionPipeline<I extends CallInput, T> 
implements CallExecutionPipeline<I, T> {
+
+    /** Writer for execution output. */
+    protected final PrintWriter output;
+
+    /** Writer for error execution output. */
+    protected final PrintWriter errOutput;
+
+    /** Decorator that decorates call's output. */
+    protected final Decorator<T, TerminalOutput> decorator;
+
+    /** Handlers for any exceptions. */
+    protected final ExceptionHandlers exceptionHandlers;
+
+    /** Provider for call's input. */
+    protected final Supplier<I> inputProvider;
+
+    /** If {@code true}, debug output will be printed to console. */
+    protected final boolean verbose;
+
+    AbstractCallExecutionPipeline(
+            PrintWriter output,
+            PrintWriter errOutput,
+            ExceptionHandlers exceptionHandlers,
+            Decorator<T, TerminalOutput> decorator,
+            Supplier<I> inputProvider,
+            boolean verbose
+    ) {
+        this.output = output;
+        this.exceptionHandlers = exceptionHandlers;
+        this.errOutput = errOutput;
+        this.decorator = decorator;
+        this.inputProvider = inputProvider;
+        this.verbose = verbose;
+    }
+
+    /**
+     * Runs the pipeline.
+     *
+     * @return exit code.
+     */
+    @Override
+    public int runPipeline() {
+        try {
+            if (verbose) {
+                CliLoggers.startOutputRedirect(errOutput);
+            }
+            return runPipelineInternal();
+        } finally {
+            if (verbose) {
+                CliLoggers.stopOutputRedirect();
+            }
+        }
+    }
+
+    int handleResult(CallOutput<T> callOutput) {
+        if (callOutput.hasError()) {
+            return handleException(callOutput.errorCause());
+        }
+
+        if (!callOutput.isEmpty()) {
+            TerminalOutput decoratedOutput = 
decorator.decorate(callOutput.body());
+            output.println(decoratedOutput.toTerminalString());
+        }
+
+        return 0;
+    }
+
+    /**
+     * Runs the pipeline and blocks the thread until the result is ready.
+     */
+    protected abstract int runPipelineInternal();
+
+    int handleException(Throwable error) {
+        return 
exceptionHandlers.handleException(ExceptionWriter.fromPrintWriter(errOutput), 
error);
+    }
+}
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCall.java
similarity index 72%
copy from 
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
copy to 
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCall.java
index 1870d732c0..26c060e644 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCall.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.internal.cli.core.call;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
- * Call that represents an action that can be performed given an input.
- * It can be rest call, dictionary lookup or whatever.
- *
- * @param <IT> Input for the call.
- * @param <OT> Output of the call.
+ * Call that represents an asynchronous action that can be performed given an 
input.
+ * It can be REST call, dictionary lookup or whatever.
  */
-public interface Call<IT extends CallInput, OT> {
-    CallOutput<OT> execute(IT input);
+@FunctionalInterface
+public interface AsyncCall<IT extends CallInput, OT> {
+    CompletableFuture<CallOutput<OT>> execute(IT input);
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipeline.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipeline.java
new file mode 100644
index 0000000000..9eb9db7740
--- /dev/null
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipeline.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.PrintWriter;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import me.tongfei.progressbar.DelegatingProgressBarConsumer;
+import me.tongfei.progressbar.ProgressBar;
+import me.tongfei.progressbar.ProgressBarBuilder;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+
+/** Call execution pipeline that executes an async call and displays progress 
bar. */
+public class AsyncCallExecutionPipeline<I extends CallInput, T> extends 
AbstractCallExecutionPipeline<I, T> {
+    /** Async call factory. */
+    private final Function<ProgressTracker, AsyncCall<I, T>> callFactory;
+
+    /** Builder for progress bar rendering. */
+    private final ProgressBarBuilder progressBarBuilder;
+
+    AsyncCallExecutionPipeline(
+            Function<ProgressTracker, AsyncCall<I, T>> callFactory,
+            ProgressBarBuilder progressBarBuilder,
+            PrintWriter output,
+            PrintWriter errOutput,
+            ExceptionHandlers exceptionHandlers,
+            Decorator<T, TerminalOutput> decorator,
+            Supplier<I> inputProvider,
+            boolean verbose
+    ) {
+        super(output, errOutput, exceptionHandlers, decorator, inputProvider, 
verbose);
+        this.callFactory = callFactory;
+        this.progressBarBuilder = progressBarBuilder;
+    }
+
+    @Override
+    public int runPipelineInternal() {
+        I callInput = inputProvider.get();
+
+        progressBarBuilder.setConsumer(new 
DelegatingProgressBarConsumer(this::print));
+        ProgressBar progressBar = progressBarBuilder.build();
+
+        try {
+            CallOutput<T> result = callFactory.apply(new 
ProgressBarTracker(progressBar))
+                    .execute(callInput)
+                    .whenComplete((el, err) -> progressBar.close())
+                    .join();
+
+            // move carriage to the next line
+            output.println();
+
+            return handleResult(result);
+        } catch (CompletionException e) {
+            return handleException(e.getCause());
+        } catch (Exception e) {
+            return handleException(e);
+        }
+    }
+
+    private void print(String s) {
+        output.print("\r" + s); // carriage return to the beginning of the 
line to overwrite the previous progress bar
+        output.flush();
+    }
+}
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
new file mode 100644
index 0000000000..9279bff18b
--- /dev/null
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.time.temporal.ChronoUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import me.tongfei.progressbar.ProgressBarBuilder;
+import me.tongfei.progressbar.ProgressBarStyle;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandler;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+import 
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers;
+import org.apache.ignite.internal.cli.decorators.DefaultDecorator;
+
+/** Builder for {@link AsyncCallExecutionPipeline}. */
+public class AsyncCallExecutionPipelineBuilder<I extends CallInput, T> {
+
+    private final Function<ProgressTracker, AsyncCall<I, T>> callFactory;
+
+    private final ProgressBarBuilder progressBarBuilder = new 
ProgressBarBuilder()
+            .setStyle(ProgressBarStyle.UNICODE_BLOCK)
+            .continuousUpdate()
+            .setSpeedUnit(ChronoUnit.SECONDS)
+            .setInitialMax(100)
+            .hideETA()
+            .setTaskName("")
+            .showSpeed();
+
+    private final ExceptionHandlers exceptionHandlers = new 
DefaultExceptionHandlers();
+
+    private Supplier<I> inputProvider;
+
+    private PrintWriter output = wrapOutputStream(System.out);
+
+    private PrintWriter errOutput = wrapOutputStream(System.err);
+
+    private Decorator<T, TerminalOutput> decorator = new DefaultDecorator<>();
+
+    private boolean verbose;
+
+    AsyncCallExecutionPipelineBuilder(Function<ProgressTracker, AsyncCall<I, 
T>> callFactory) {
+        this.callFactory = callFactory;
+    }
+
+    private static PrintWriter wrapOutputStream(OutputStream output) {
+        return new PrintWriter(output, true, getStdoutEncoding());
+    }
+
+    private static Charset getStdoutEncoding() {
+        String encoding = System.getProperty("sun.stdout.encoding");
+        return encoding != null ? Charset.forName(encoding) : 
Charset.defaultCharset();
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I> 
inputProvider) {
+        this.inputProvider = inputProvider;
+        return this;
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> output(PrintWriter output) {
+        this.output = output;
+        return this;
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> output(OutputStream output) 
{
+        return output(wrapOutputStream(output));
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> errOutput(PrintWriter 
errOutput) {
+        this.errOutput = errOutput;
+        return this;
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> errOutput(OutputStream 
output) {
+        return errOutput(wrapOutputStream(output));
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> 
exceptionHandler(ExceptionHandler<?> exceptionHandler) {
+        exceptionHandlers.addExceptionHandler(exceptionHandler);
+        return this;
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> 
exceptionHandlers(ExceptionHandlers exceptionHandlers) {
+        this.exceptionHandlers.addExceptionHandlers(exceptionHandlers);
+        return this;
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> decorator(Decorator<T, 
TerminalOutput> decorator) {
+        this.decorator = decorator;
+        return this;
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
+        this.verbose = verbose;
+        return this;
+    }
+
+    public AsyncCallExecutionPipelineBuilder<I, T> name(String name) {
+        this.progressBarBuilder.setTaskName(name);
+        return this;
+    }
+
+    /** Builds {@link AsyncCallExecutionPipeline}. */
+    public CallExecutionPipeline<I, T> build() {
+        return new AsyncCallExecutionPipeline<>(
+                callFactory, progressBarBuilder, output, errOutput, 
exceptionHandlers, decorator, inputProvider, verbose
+        );
+    }
+}
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
index 1870d732c0..4ae4568125 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
@@ -24,6 +24,7 @@ package org.apache.ignite.internal.cli.core.call;
  * @param <IT> Input for the call.
  * @param <OT> Output of the call.
  */
+@FunctionalInterface
 public interface Call<IT extends CallInput, OT> {
     CallOutput<OT> execute(IT input);
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
index 5d265b0d87..a8fe9db3fb 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
@@ -17,182 +17,31 @@
 
 package org.apache.ignite.internal.cli.core.call;
 
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.nio.charset.Charset;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.cli.core.decorator.Decorator;
-import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
-import org.apache.ignite.internal.cli.core.exception.ExceptionHandler;
-import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
-import org.apache.ignite.internal.cli.core.exception.ExceptionWriter;
-import 
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers;
-import org.apache.ignite.internal.cli.decorators.DefaultDecorator;
-import org.apache.ignite.internal.cli.logger.CliLoggers;
-
-/**
- * Call execution pipeline.
- *
- * @param <I> Call input type.
- * @param <T> Call output's body type.
- */
-public class CallExecutionPipeline<I extends CallInput, T> {
-    /** Call to execute. */
-    private final Call<I, T> call;
-
-    /** Writer for execution output. */
-    private final PrintWriter output;
-
-    /** Writer for error execution output. */
-    private final PrintWriter errOutput;
-
-    /** Decorator that decorates call's output. */
-    private final Decorator<T, TerminalOutput> decorator;
-
-    /** Handlers for any exceptions. */
-    private final ExceptionHandlers exceptionHandlers;
-
-    /** Provider for call's input. */
-    private final Supplier<I> inputProvider;
-
-    /** If {@code true}, debug output will be printed to console. */
-    private final boolean verbose;
-
-    private CallExecutionPipeline(Call<I, T> call,
-            PrintWriter output,
-            PrintWriter errOutput,
-            ExceptionHandlers exceptionHandlers,
-            Decorator<T, TerminalOutput> decorator,
-            Supplier<I> inputProvider,
-            boolean verbose
-    ) {
-        this.call = call;
-        this.output = output;
-        this.exceptionHandlers = exceptionHandlers;
-        this.errOutput = errOutput;
-        this.decorator = decorator;
-        this.inputProvider = inputProvider;
-        this.verbose = verbose;
-    }
+import java.util.function.Function;
 
+/** Pipeline that executes a call. */
+@FunctionalInterface
+public interface CallExecutionPipeline<I extends CallInput, T> {
     /**
      * Builder helper method.
      *
      * @return builder for {@link CallExecutionPipeline}.
      */
-    public static <I extends CallInput, T> CallExecutionPipelineBuilder<I, T> 
builder(Call<I, T> call) {
+    static <I extends CallInput, T> CallExecutionPipelineBuilder<I, T> 
builder(Call<I, T> call) {
         return new CallExecutionPipelineBuilder<>(call);
     }
 
+    /** Builder helper method. */
+    static <I extends CallInput, T> AsyncCallExecutionPipelineBuilder<I, T> 
asyncBuilder(
+            Function<ProgressTracker, AsyncCall<I, T>> callFactory
+    ) {
+        return new AsyncCallExecutionPipelineBuilder<>(callFactory);
+    }
+
     /**
      * Runs the pipeline.
      *
      * @return exit code.
      */
-    public int runPipeline() {
-        try {
-            if (verbose) {
-                CliLoggers.startOutputRedirect(errOutput);
-            }
-            return runPipelineInternal();
-        } finally {
-            if (verbose) {
-                CliLoggers.stopOutputRedirect();
-            }
-        }
-    }
-
-    private int runPipelineInternal() {
-        I callInput = inputProvider.get();
-
-        CallOutput<T> callOutput = call.execute(callInput);
-
-        if (callOutput.hasError()) {
-            return 
exceptionHandlers.handleException(ExceptionWriter.fromPrintWriter(errOutput), 
callOutput.errorCause());
-        }
-
-        if (!callOutput.isEmpty()) {
-            TerminalOutput decoratedOutput = 
decorator.decorate(callOutput.body());
-            output.println(decoratedOutput.toTerminalString());
-        }
-        return 0;
-    }
-
-    /** Builder for {@link CallExecutionPipeline}. */
-    public static class CallExecutionPipelineBuilder<I extends CallInput, T> {
-
-        private final Call<I, T> call;
-
-        private final ExceptionHandlers exceptionHandlers = new 
DefaultExceptionHandlers();
-
-        private Supplier<I> inputProvider;
-
-        private PrintWriter output = wrapOutputStream(System.out);
-
-        private PrintWriter errOutput = wrapOutputStream(System.err);
-
-        private Decorator<T, TerminalOutput> decorator = new 
DefaultDecorator<>();
-
-        private boolean verbose;
-
-        public CallExecutionPipelineBuilder(Call<I, T> call) {
-            this.call = call;
-        }
-
-        public CallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I> 
inputProvider) {
-            this.inputProvider = inputProvider;
-            return this;
-        }
-
-        public CallExecutionPipelineBuilder<I, T> output(PrintWriter output) {
-            this.output = output;
-            return this;
-        }
-
-        public CallExecutionPipelineBuilder<I, T> output(OutputStream output) {
-            return output(wrapOutputStream(output));
-        }
-
-        public CallExecutionPipelineBuilder<I, T> errOutput(PrintWriter 
errOutput) {
-            this.errOutput = errOutput;
-            return this;
-        }
-
-        public CallExecutionPipelineBuilder<I, T> errOutput(OutputStream 
output) {
-            return errOutput(wrapOutputStream(output));
-        }
-
-        public CallExecutionPipelineBuilder<I, T> 
exceptionHandler(ExceptionHandler<?> exceptionHandler) {
-            exceptionHandlers.addExceptionHandler(exceptionHandler);
-            return this;
-        }
-
-        public CallExecutionPipelineBuilder<I, T> 
exceptionHandlers(ExceptionHandlers exceptionHandlers) {
-            this.exceptionHandlers.addExceptionHandlers(exceptionHandlers);
-            return this;
-        }
-
-        public CallExecutionPipelineBuilder<I, T> decorator(Decorator<T, 
TerminalOutput> decorator) {
-            this.decorator = decorator;
-            return this;
-        }
-
-        public CallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
-            this.verbose = verbose;
-            return this;
-        }
-
-        public CallExecutionPipeline<I, T> build() {
-            return new CallExecutionPipeline<>(call, output, errOutput, 
exceptionHandlers, decorator, inputProvider, verbose);
-        }
-
-        private static PrintWriter wrapOutputStream(OutputStream output) {
-            return new PrintWriter(output, true, getStdoutEncoding());
-        }
-
-        private static Charset getStdoutEncoding() {
-            String encoding = System.getProperty("sun.stdout.encoding");
-            return encoding != null ? Charset.forName(encoding) : 
Charset.defaultCharset();
-        }
-    }
+    int runPipeline();
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
new file mode 100644
index 0000000000..321ac7e391
--- /dev/null
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandler;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+import 
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers;
+import org.apache.ignite.internal.cli.decorators.DefaultDecorator;
+
+/** Builder for {@link CallExecutionPipeline}. */
+public class CallExecutionPipelineBuilder<I extends CallInput, T> {
+
+    private final Call<I, T> call;
+
+    private final ExceptionHandlers exceptionHandlers = new 
DefaultExceptionHandlers();
+
+    private Supplier<I> inputProvider;
+
+    private PrintWriter output = wrapOutputStream(System.out);
+
+    private PrintWriter errOutput = wrapOutputStream(System.err);
+
+    private Decorator<T, TerminalOutput> decorator = new DefaultDecorator<>();
+
+    private boolean verbose;
+
+    CallExecutionPipelineBuilder(Call<I, T> call) {
+        this.call = call;
+    }
+
+    private static PrintWriter wrapOutputStream(OutputStream output) {
+        return new PrintWriter(output, true, getStdoutEncoding());
+    }
+
+    private static Charset getStdoutEncoding() {
+        String encoding = System.getProperty("sun.stdout.encoding");
+        return encoding != null ? Charset.forName(encoding) : 
Charset.defaultCharset();
+    }
+
+    public CallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I> 
inputProvider) {
+        this.inputProvider = inputProvider;
+        return this;
+    }
+
+    public CallExecutionPipelineBuilder<I, T> output(PrintWriter output) {
+        this.output = output;
+        return this;
+    }
+
+    public CallExecutionPipelineBuilder<I, T> output(OutputStream output) {
+        return output(wrapOutputStream(output));
+    }
+
+    public CallExecutionPipelineBuilder<I, T> errOutput(PrintWriter errOutput) 
{
+        this.errOutput = errOutput;
+        return this;
+    }
+
+    public CallExecutionPipelineBuilder<I, T> errOutput(OutputStream output) {
+        return errOutput(wrapOutputStream(output));
+    }
+
+    public CallExecutionPipelineBuilder<I, T> 
exceptionHandler(ExceptionHandler<?> exceptionHandler) {
+        exceptionHandlers.addExceptionHandler(exceptionHandler);
+        return this;
+    }
+
+    public CallExecutionPipelineBuilder<I, T> 
exceptionHandlers(ExceptionHandlers exceptionHandlers) {
+        this.exceptionHandlers.addExceptionHandlers(exceptionHandlers);
+        return this;
+    }
+
+    public CallExecutionPipelineBuilder<I, T> decorator(Decorator<T, 
TerminalOutput> decorator) {
+        this.decorator = decorator;
+        return this;
+    }
+
+    public CallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
+        this.verbose = verbose;
+        return this;
+    }
+
+    public CallExecutionPipeline<I, T> build() {
+        return new SingleCallExecutionPipeline<>(call, output, errOutput, 
exceptionHandlers, decorator, inputProvider, verbose);
+    }
+}
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressBarTracker.java
similarity index 68%
copy from 
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
copy to 
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressBarTracker.java
index 1870d732c0..a1beaa70eb 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressBarTracker.java
@@ -17,13 +17,19 @@
 
 package org.apache.ignite.internal.cli.core.call;
 
-/**
- * Call that represents an action that can be performed given an input.
- * It can be rest call, dictionary lookup or whatever.
- *
- * @param <IT> Input for the call.
- * @param <OT> Output of the call.
- */
-public interface Call<IT extends CallInput, OT> {
-    CallOutput<OT> execute(IT input);
+import me.tongfei.progressbar.ProgressBar;
+
+/** {@link ProgressBar} based tracker. */
+public class ProgressBarTracker implements ProgressTracker {
+    private final ProgressBar progressBar;
+
+    public ProgressBarTracker(ProgressBar progressBar) {
+        this.progressBar = progressBar;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void track() {
+        progressBar.step();
+    }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressTracker.java
similarity index 74%
copy from 
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
copy to 
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressTracker.java
index 1870d732c0..9e127230cb 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressTracker.java
@@ -17,13 +17,8 @@
 
 package org.apache.ignite.internal.cli.core.call;
 
-/**
- * Call that represents an action that can be performed given an input.
- * It can be rest call, dictionary lookup or whatever.
- *
- * @param <IT> Input for the call.
- * @param <OT> Output of the call.
- */
-public interface Call<IT extends CallInput, OT> {
-    CallOutput<OT> execute(IT input);
+/** Progress tracker that will be called periodically during the call 
execution. */
+public interface ProgressTracker {
+    /** Tracks that the step is performed. */
+    void track(); // todo: add the increment parameter in 
https://issues.apache.org/jira/browse/IGNITE-18731
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipeline.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipeline.java
new file mode 100644
index 0000000000..23e2b5e307
--- /dev/null
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipeline.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.PrintWriter;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+
+/**
+ * Call execution pipeline that executes a single call.
+ *
+ * @param <I> Call input type.
+ * @param <T> Call output's body type.
+ */
+public class SingleCallExecutionPipeline<I extends CallInput, T> extends 
AbstractCallExecutionPipeline<I, T> {
+    /** Call to execute. */
+    private final Call<I, T> call;
+
+    SingleCallExecutionPipeline(
+            Call<I, T> call,
+            PrintWriter output,
+            PrintWriter errOutput,
+            ExceptionHandlers exceptionHandlers,
+            Decorator<T, TerminalOutput> decorator,
+            Supplier<I> inputProvider,
+            boolean verbose
+    ) {
+        super(output, errOutput, exceptionHandlers, decorator, inputProvider, 
verbose);
+        this.call = call;
+    }
+
+
+    @Override
+    public int runPipelineInternal() {
+        I callInput = inputProvider.get();
+
+        CallOutput<T> callOutput = call.execute(callInput);
+
+        return handleResult(callOutput);
+    }
+}
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java
index 85906f7f7a..5924efeb85 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.cli.sql;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * SQL schema provider.
@@ -30,9 +32,9 @@ public class SqlSchemaProvider implements SchemaProvider {
 
     private final int schemaUpdateTimeout;
 
-    private SqlSchema schema;
+    private final AtomicReference<SqlSchema> schema = new 
AtomicReference<>(null);
 
-    private Instant lastUpdate;
+    private final AtomicReference<Instant> lastUpdate = new 
AtomicReference<>(Instant.now());
 
     public SqlSchemaProvider(MetadataSupplier metadataSupplier) {
         this(metadataSupplier, SCHEMA_UPDATE_TIMEOUT);
@@ -45,11 +47,22 @@ public class SqlSchemaProvider implements SchemaProvider {
 
     @Override
     public SqlSchema getSchema() {
-        if (schema == null || Duration.between(lastUpdate, 
Instant.now()).toSeconds() >= schemaUpdateTimeout) {
-            schema = sqlSchemaLoader.loadSchema();
-            lastUpdate = Instant.now();
+        if (schema.compareAndSet(null, sqlSchemaLoader.loadSchema())) {
+            lastUpdate.set(Instant.now());
+            return schema.get();
+        } else if (Duration.between(lastUpdate.get(), 
Instant.now()).toSeconds() >= schemaUpdateTimeout) {
+            CompletableFuture.supplyAsync(() -> {
+                schema.set(sqlSchemaLoader.loadSchema());
+                lastUpdate.set(Instant.now());
+                return schema.get();
+            });
         }
-        return schema;
+
+        return schema.get();
     }
 
+    public void initStateAsync() {
+        // trigger schema loading in background
+        CompletableFuture.supplyAsync(this::getSchema);
+    }
 }
diff --git 
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java
 
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java
index a1aac24059..d34b2b1cb9 100644
--- 
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java
+++ 
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java
@@ -24,17 +24,24 @@ import static org.hamcrest.Matchers.startsWith;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.cli.core.exception.TestExceptionHandler;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 class PipelineTest {
 
+    private StringWriter out = new StringWriter();
+    private StringWriter errOut = new StringWriter();
+
+    @BeforeEach
+    void setUp() {
+        out = new StringWriter();
+        errOut = new StringWriter();
+    }
+
     @Test
     void verboseTest() {
-        // Given
-        StringWriter out = new StringWriter();
-        StringWriter errOut = new StringWriter();
-
         // When start pipeline with verbose
         CallExecutionPipeline.builder(new ThrowingStrCall())
                 .inputProvider(StringCallInput::new)
@@ -49,4 +56,44 @@ class PipelineTest {
         assertThat(errOut.toString(), startsWith("Ooops!" + 
System.lineSeparator()));
         assertThat(errOut.toString(), containsString("verbose output"));
     }
+
+    @Test
+    void asyncCallFailedFuture() {
+        // Given async call that retuns failed future
+        AsyncCall<StringCallInput, ?> asyncCall = callInput -> 
CompletableFuture.failedFuture(new RuntimeException("Ooops!"));
+
+        // When start async pipeline with verbose
+        CallExecutionPipeline.asyncBuilder(ignoredProgressTracker -> asyncCall)
+                .inputProvider(StringCallInput::new)
+                .exceptionHandler(new TestExceptionHandler())
+                .output(new PrintWriter(out))
+                .errOutput(new PrintWriter(errOut))
+                .verbose(true)
+                .build().runPipeline();
+
+        // Then error output contains the message from exception and contains 
verbose output
+        assertThat(errOut.toString(), containsString("Ooops!" + 
System.lineSeparator()));
+        assertThat(errOut.toString(), containsString("verbose output"));
+    }
+
+    @Test
+    void asyncCallThrowingMethod() {
+        // Given async call that throws an exception
+        AsyncCall<StringCallInput, ?> asyncCall = callInput -> {
+            throw new RuntimeException("Ooops!");
+        };
+
+        // When start async pipeline with verbose
+        CallExecutionPipeline.asyncBuilder(ignoredProgressTracker -> asyncCall)
+                .inputProvider(StringCallInput::new)
+                .exceptionHandler(new TestExceptionHandler())
+                .output(new PrintWriter(out))
+                .errOutput(new PrintWriter(errOut))
+                .verbose(true)
+                .build().runPipeline();
+
+        // Then error output contains the message from exception and contains 
verbose output
+        assertThat(errOut.toString(), containsString("Ooops!" + 
System.lineSeparator()));
+        assertThat(errOut.toString(), containsString("verbose output"));
+    }
 }
diff --git 
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java
 
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java
index e96ecc35ca..37030c8b77 100644
--- 
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java
+++ 
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java
@@ -44,16 +44,26 @@ class SqlSchemaProviderTest {
     }
 
     @Test
-    public void testProviderWithoutTimeout() {
+    public void testProviderWithoutTimeout() throws InterruptedException {
         SqlSchemaProvider provider = new SqlSchemaProvider(supplier, 0);
-        Assertions.assertNotEquals(provider.getSchema(), provider.getSchema());
+
+        SqlSchema firstSchema = provider.getSchema();
+        provider.getSchema(); // trigger update
+        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+
+        Assertions.assertNotEquals(firstSchema, provider.getSchema());
     }
 
     @Test
     public void testProviderWith1secTimeout() throws InterruptedException {
         SqlSchemaProvider provider = new SqlSchemaProvider(supplier, 1);
+
+        provider.initStateAsync();
+        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+
         SqlSchema schema = provider.getSchema();
         Thread.sleep(TimeUnit.SECONDS.toMillis(2));
+
         Assertions.assertNotEquals(schema, provider.getSchema());
     }
 }


Reply via email to