This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new d8885b22d0 IGNITE-18674 Introduce async logic to CLI (#1731) d8885b22d0 is described below commit d8885b22d05231af53996ace5bbf9fb0564bedc9 Author: Aleksandr Pakhomov <apk...@gmail.com> AuthorDate: Thu Mar 2 18:52:17 2023 +0400 IGNITE-18674 Introduce async logic to CLI (#1731) Co-authored-by: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> --- gradle/libs.versions.toml | 3 + modules/cli/build.gradle | 1 + .../internal/cli/commands/sql/SqlReplCommand.java | 5 +- .../core/call/AbstractCallExecutionPipeline.java | 108 +++++++++++++ .../cli/core/call/{Call.java => AsyncCall.java} | 14 +- .../cli/core/call/AsyncCallExecutionPipeline.java | 82 ++++++++++ .../call/AsyncCallExecutionPipelineBuilder.java | 128 +++++++++++++++ .../apache/ignite/internal/cli/core/call/Call.java | 1 + .../cli/core/call/CallExecutionPipeline.java | 177 ++------------------- .../core/call/CallExecutionPipelineBuilder.java | 107 +++++++++++++ .../call/{Call.java => ProgressBarTracker.java} | 24 +-- .../core/call/{Call.java => ProgressTracker.java} | 13 +- .../cli/core/call/SingleCallExecutionPipeline.java | 58 +++++++ .../ignite/internal/cli/sql/SqlSchemaProvider.java | 25 ++- .../internal/cli/core/call/PipelineTest.java | 55 ++++++- .../internal/cli/sql/SqlSchemaProviderTest.java | 14 +- 16 files changed, 613 insertions(+), 202 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ddc93a2355..96a73f7daf 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -72,6 +72,7 @@ testkit = "1.8.2" openapi = "3.2.0" autoService = "1.0.1" awaitility = "4.2.0" +progressBar = "0.9.4" #Tools pmdTool = "6.28.0" @@ -229,3 +230,5 @@ auto-service = { module = "com.google.auto.service:auto-service", version.ref = auto-service-annotations = { module = "com.google.auto.service:auto-service-annotations", version.ref = "autoService" } awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" } + +progressBar = { module = "me.tongfei:progressbar", version.ref = "progressBar" } diff --git a/modules/cli/build.gradle b/modules/cli/build.gradle index ea83e10ab1..aa408e17f9 100644 --- a/modules/cli/build.gradle +++ b/modules/cli/build.gradle @@ -60,6 +60,7 @@ dependencies { implementation libs.okhttp.logging implementation libs.threetenbp implementation libs.swagger.legacy.annotations + implementation libs.progressBar testAnnotationProcessor libs.picocli.annotation.processor testAnnotationProcessor libs.micronaut.inject.annotation.processor diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java index 71cf1948b4..911760001a 100644 --- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java @@ -100,7 +100,10 @@ public class SqlReplCommand extends BaseCommand implements Runnable { try (SqlManager sqlManager = new SqlManager(jdbc)) { // When passing white space to this command, picocli will treat it as a positional argument if (execOptions == null || (execOptions.command != null && execOptions.command.isBlank())) { - SqlCompleter sqlCompleter = new SqlCompleter(new SqlSchemaProvider(sqlManager::getMetadata)); + SqlSchemaProvider schemaProvider = new SqlSchemaProvider(sqlManager::getMetadata); + schemaProvider.initStateAsync(); + + SqlCompleter sqlCompleter = new SqlCompleter(schemaProvider); IgniteSqlCommandCompleter sqlCommandCompleter = new IgniteSqlCommandCompleter(); replExecutorProvider.get().execute(Repl.builder() .withPromptProvider(() -> ansi(fg(Color.GREEN).mark("sql-cli> "))) diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AbstractCallExecutionPipeline.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AbstractCallExecutionPipeline.java new file mode 100644 index 0000000000..0022c4dcf9 --- /dev/null +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AbstractCallExecutionPipeline.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.core.call; + +import java.io.PrintWriter; +import java.util.function.Supplier; +import org.apache.ignite.internal.cli.core.decorator.Decorator; +import org.apache.ignite.internal.cli.core.decorator.TerminalOutput; +import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers; +import org.apache.ignite.internal.cli.core.exception.ExceptionWriter; +import org.apache.ignite.internal.cli.logger.CliLoggers; + +/** + * Abstract implementation of {@link CallExecutionPipeline}. + * Implements common error handling logic and verbose output redirection. + */ +public abstract class AbstractCallExecutionPipeline<I extends CallInput, T> implements CallExecutionPipeline<I, T> { + + /** Writer for execution output. */ + protected final PrintWriter output; + + /** Writer for error execution output. */ + protected final PrintWriter errOutput; + + /** Decorator that decorates call's output. */ + protected final Decorator<T, TerminalOutput> decorator; + + /** Handlers for any exceptions. */ + protected final ExceptionHandlers exceptionHandlers; + + /** Provider for call's input. */ + protected final Supplier<I> inputProvider; + + /** If {@code true}, debug output will be printed to console. */ + protected final boolean verbose; + + AbstractCallExecutionPipeline( + PrintWriter output, + PrintWriter errOutput, + ExceptionHandlers exceptionHandlers, + Decorator<T, TerminalOutput> decorator, + Supplier<I> inputProvider, + boolean verbose + ) { + this.output = output; + this.exceptionHandlers = exceptionHandlers; + this.errOutput = errOutput; + this.decorator = decorator; + this.inputProvider = inputProvider; + this.verbose = verbose; + } + + /** + * Runs the pipeline. + * + * @return exit code. + */ + @Override + public int runPipeline() { + try { + if (verbose) { + CliLoggers.startOutputRedirect(errOutput); + } + return runPipelineInternal(); + } finally { + if (verbose) { + CliLoggers.stopOutputRedirect(); + } + } + } + + int handleResult(CallOutput<T> callOutput) { + if (callOutput.hasError()) { + return handleException(callOutput.errorCause()); + } + + if (!callOutput.isEmpty()) { + TerminalOutput decoratedOutput = decorator.decorate(callOutput.body()); + output.println(decoratedOutput.toTerminalString()); + } + + return 0; + } + + /** + * Runs the pipeline and blocks the thread until the result is ready. + */ + protected abstract int runPipelineInternal(); + + int handleException(Throwable error) { + return exceptionHandlers.handleException(ExceptionWriter.fromPrintWriter(errOutput), error); + } +} diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCall.java similarity index 72% copy from modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java copy to modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCall.java index 1870d732c0..26c060e644 100644 --- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCall.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.cli.core.call; +import java.util.concurrent.CompletableFuture; + /** - * Call that represents an action that can be performed given an input. - * It can be rest call, dictionary lookup or whatever. - * - * @param <IT> Input for the call. - * @param <OT> Output of the call. + * Call that represents an asynchronous action that can be performed given an input. + * It can be REST call, dictionary lookup or whatever. */ -public interface Call<IT extends CallInput, OT> { - CallOutput<OT> execute(IT input); +@FunctionalInterface +public interface AsyncCall<IT extends CallInput, OT> { + CompletableFuture<CallOutput<OT>> execute(IT input); } diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipeline.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipeline.java new file mode 100644 index 0000000000..9eb9db7740 --- /dev/null +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipeline.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.core.call; + +import java.io.PrintWriter; +import java.util.concurrent.CompletionException; +import java.util.function.Function; +import java.util.function.Supplier; +import me.tongfei.progressbar.DelegatingProgressBarConsumer; +import me.tongfei.progressbar.ProgressBar; +import me.tongfei.progressbar.ProgressBarBuilder; +import org.apache.ignite.internal.cli.core.decorator.Decorator; +import org.apache.ignite.internal.cli.core.decorator.TerminalOutput; +import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers; + +/** Call execution pipeline that executes an async call and displays progress bar. */ +public class AsyncCallExecutionPipeline<I extends CallInput, T> extends AbstractCallExecutionPipeline<I, T> { + /** Async call factory. */ + private final Function<ProgressTracker, AsyncCall<I, T>> callFactory; + + /** Builder for progress bar rendering. */ + private final ProgressBarBuilder progressBarBuilder; + + AsyncCallExecutionPipeline( + Function<ProgressTracker, AsyncCall<I, T>> callFactory, + ProgressBarBuilder progressBarBuilder, + PrintWriter output, + PrintWriter errOutput, + ExceptionHandlers exceptionHandlers, + Decorator<T, TerminalOutput> decorator, + Supplier<I> inputProvider, + boolean verbose + ) { + super(output, errOutput, exceptionHandlers, decorator, inputProvider, verbose); + this.callFactory = callFactory; + this.progressBarBuilder = progressBarBuilder; + } + + @Override + public int runPipelineInternal() { + I callInput = inputProvider.get(); + + progressBarBuilder.setConsumer(new DelegatingProgressBarConsumer(this::print)); + ProgressBar progressBar = progressBarBuilder.build(); + + try { + CallOutput<T> result = callFactory.apply(new ProgressBarTracker(progressBar)) + .execute(callInput) + .whenComplete((el, err) -> progressBar.close()) + .join(); + + // move carriage to the next line + output.println(); + + return handleResult(result); + } catch (CompletionException e) { + return handleException(e.getCause()); + } catch (Exception e) { + return handleException(e); + } + } + + private void print(String s) { + output.print("\r" + s); // carriage return to the beginning of the line to overwrite the previous progress bar + output.flush(); + } +} diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java new file mode 100644 index 0000000000..9279bff18b --- /dev/null +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.core.call; + +import java.io.OutputStream; +import java.io.PrintWriter; +import java.nio.charset.Charset; +import java.time.temporal.ChronoUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import me.tongfei.progressbar.ProgressBarBuilder; +import me.tongfei.progressbar.ProgressBarStyle; +import org.apache.ignite.internal.cli.core.decorator.Decorator; +import org.apache.ignite.internal.cli.core.decorator.TerminalOutput; +import org.apache.ignite.internal.cli.core.exception.ExceptionHandler; +import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers; +import org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers; +import org.apache.ignite.internal.cli.decorators.DefaultDecorator; + +/** Builder for {@link AsyncCallExecutionPipeline}. */ +public class AsyncCallExecutionPipelineBuilder<I extends CallInput, T> { + + private final Function<ProgressTracker, AsyncCall<I, T>> callFactory; + + private final ProgressBarBuilder progressBarBuilder = new ProgressBarBuilder() + .setStyle(ProgressBarStyle.UNICODE_BLOCK) + .continuousUpdate() + .setSpeedUnit(ChronoUnit.SECONDS) + .setInitialMax(100) + .hideETA() + .setTaskName("") + .showSpeed(); + + private final ExceptionHandlers exceptionHandlers = new DefaultExceptionHandlers(); + + private Supplier<I> inputProvider; + + private PrintWriter output = wrapOutputStream(System.out); + + private PrintWriter errOutput = wrapOutputStream(System.err); + + private Decorator<T, TerminalOutput> decorator = new DefaultDecorator<>(); + + private boolean verbose; + + AsyncCallExecutionPipelineBuilder(Function<ProgressTracker, AsyncCall<I, T>> callFactory) { + this.callFactory = callFactory; + } + + private static PrintWriter wrapOutputStream(OutputStream output) { + return new PrintWriter(output, true, getStdoutEncoding()); + } + + private static Charset getStdoutEncoding() { + String encoding = System.getProperty("sun.stdout.encoding"); + return encoding != null ? Charset.forName(encoding) : Charset.defaultCharset(); + } + + public AsyncCallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I> inputProvider) { + this.inputProvider = inputProvider; + return this; + } + + public AsyncCallExecutionPipelineBuilder<I, T> output(PrintWriter output) { + this.output = output; + return this; + } + + public AsyncCallExecutionPipelineBuilder<I, T> output(OutputStream output) { + return output(wrapOutputStream(output)); + } + + public AsyncCallExecutionPipelineBuilder<I, T> errOutput(PrintWriter errOutput) { + this.errOutput = errOutput; + return this; + } + + public AsyncCallExecutionPipelineBuilder<I, T> errOutput(OutputStream output) { + return errOutput(wrapOutputStream(output)); + } + + public AsyncCallExecutionPipelineBuilder<I, T> exceptionHandler(ExceptionHandler<?> exceptionHandler) { + exceptionHandlers.addExceptionHandler(exceptionHandler); + return this; + } + + public AsyncCallExecutionPipelineBuilder<I, T> exceptionHandlers(ExceptionHandlers exceptionHandlers) { + this.exceptionHandlers.addExceptionHandlers(exceptionHandlers); + return this; + } + + public AsyncCallExecutionPipelineBuilder<I, T> decorator(Decorator<T, TerminalOutput> decorator) { + this.decorator = decorator; + return this; + } + + public AsyncCallExecutionPipelineBuilder<I, T> verbose(boolean verbose) { + this.verbose = verbose; + return this; + } + + public AsyncCallExecutionPipelineBuilder<I, T> name(String name) { + this.progressBarBuilder.setTaskName(name); + return this; + } + + /** Builds {@link AsyncCallExecutionPipeline}. */ + public CallExecutionPipeline<I, T> build() { + return new AsyncCallExecutionPipeline<>( + callFactory, progressBarBuilder, output, errOutput, exceptionHandlers, decorator, inputProvider, verbose + ); + } +} diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java index 1870d732c0..4ae4568125 100644 --- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java @@ -24,6 +24,7 @@ package org.apache.ignite.internal.cli.core.call; * @param <IT> Input for the call. * @param <OT> Output of the call. */ +@FunctionalInterface public interface Call<IT extends CallInput, OT> { CallOutput<OT> execute(IT input); } diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java index 5d265b0d87..a8fe9db3fb 100644 --- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java @@ -17,182 +17,31 @@ package org.apache.ignite.internal.cli.core.call; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.nio.charset.Charset; -import java.util.function.Supplier; -import org.apache.ignite.internal.cli.core.decorator.Decorator; -import org.apache.ignite.internal.cli.core.decorator.TerminalOutput; -import org.apache.ignite.internal.cli.core.exception.ExceptionHandler; -import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers; -import org.apache.ignite.internal.cli.core.exception.ExceptionWriter; -import org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers; -import org.apache.ignite.internal.cli.decorators.DefaultDecorator; -import org.apache.ignite.internal.cli.logger.CliLoggers; - -/** - * Call execution pipeline. - * - * @param <I> Call input type. - * @param <T> Call output's body type. - */ -public class CallExecutionPipeline<I extends CallInput, T> { - /** Call to execute. */ - private final Call<I, T> call; - - /** Writer for execution output. */ - private final PrintWriter output; - - /** Writer for error execution output. */ - private final PrintWriter errOutput; - - /** Decorator that decorates call's output. */ - private final Decorator<T, TerminalOutput> decorator; - - /** Handlers for any exceptions. */ - private final ExceptionHandlers exceptionHandlers; - - /** Provider for call's input. */ - private final Supplier<I> inputProvider; - - /** If {@code true}, debug output will be printed to console. */ - private final boolean verbose; - - private CallExecutionPipeline(Call<I, T> call, - PrintWriter output, - PrintWriter errOutput, - ExceptionHandlers exceptionHandlers, - Decorator<T, TerminalOutput> decorator, - Supplier<I> inputProvider, - boolean verbose - ) { - this.call = call; - this.output = output; - this.exceptionHandlers = exceptionHandlers; - this.errOutput = errOutput; - this.decorator = decorator; - this.inputProvider = inputProvider; - this.verbose = verbose; - } +import java.util.function.Function; +/** Pipeline that executes a call. */ +@FunctionalInterface +public interface CallExecutionPipeline<I extends CallInput, T> { /** * Builder helper method. * * @return builder for {@link CallExecutionPipeline}. */ - public static <I extends CallInput, T> CallExecutionPipelineBuilder<I, T> builder(Call<I, T> call) { + static <I extends CallInput, T> CallExecutionPipelineBuilder<I, T> builder(Call<I, T> call) { return new CallExecutionPipelineBuilder<>(call); } + /** Builder helper method. */ + static <I extends CallInput, T> AsyncCallExecutionPipelineBuilder<I, T> asyncBuilder( + Function<ProgressTracker, AsyncCall<I, T>> callFactory + ) { + return new AsyncCallExecutionPipelineBuilder<>(callFactory); + } + /** * Runs the pipeline. * * @return exit code. */ - public int runPipeline() { - try { - if (verbose) { - CliLoggers.startOutputRedirect(errOutput); - } - return runPipelineInternal(); - } finally { - if (verbose) { - CliLoggers.stopOutputRedirect(); - } - } - } - - private int runPipelineInternal() { - I callInput = inputProvider.get(); - - CallOutput<T> callOutput = call.execute(callInput); - - if (callOutput.hasError()) { - return exceptionHandlers.handleException(ExceptionWriter.fromPrintWriter(errOutput), callOutput.errorCause()); - } - - if (!callOutput.isEmpty()) { - TerminalOutput decoratedOutput = decorator.decorate(callOutput.body()); - output.println(decoratedOutput.toTerminalString()); - } - return 0; - } - - /** Builder for {@link CallExecutionPipeline}. */ - public static class CallExecutionPipelineBuilder<I extends CallInput, T> { - - private final Call<I, T> call; - - private final ExceptionHandlers exceptionHandlers = new DefaultExceptionHandlers(); - - private Supplier<I> inputProvider; - - private PrintWriter output = wrapOutputStream(System.out); - - private PrintWriter errOutput = wrapOutputStream(System.err); - - private Decorator<T, TerminalOutput> decorator = new DefaultDecorator<>(); - - private boolean verbose; - - public CallExecutionPipelineBuilder(Call<I, T> call) { - this.call = call; - } - - public CallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I> inputProvider) { - this.inputProvider = inputProvider; - return this; - } - - public CallExecutionPipelineBuilder<I, T> output(PrintWriter output) { - this.output = output; - return this; - } - - public CallExecutionPipelineBuilder<I, T> output(OutputStream output) { - return output(wrapOutputStream(output)); - } - - public CallExecutionPipelineBuilder<I, T> errOutput(PrintWriter errOutput) { - this.errOutput = errOutput; - return this; - } - - public CallExecutionPipelineBuilder<I, T> errOutput(OutputStream output) { - return errOutput(wrapOutputStream(output)); - } - - public CallExecutionPipelineBuilder<I, T> exceptionHandler(ExceptionHandler<?> exceptionHandler) { - exceptionHandlers.addExceptionHandler(exceptionHandler); - return this; - } - - public CallExecutionPipelineBuilder<I, T> exceptionHandlers(ExceptionHandlers exceptionHandlers) { - this.exceptionHandlers.addExceptionHandlers(exceptionHandlers); - return this; - } - - public CallExecutionPipelineBuilder<I, T> decorator(Decorator<T, TerminalOutput> decorator) { - this.decorator = decorator; - return this; - } - - public CallExecutionPipelineBuilder<I, T> verbose(boolean verbose) { - this.verbose = verbose; - return this; - } - - public CallExecutionPipeline<I, T> build() { - return new CallExecutionPipeline<>(call, output, errOutput, exceptionHandlers, decorator, inputProvider, verbose); - } - - private static PrintWriter wrapOutputStream(OutputStream output) { - return new PrintWriter(output, true, getStdoutEncoding()); - } - - private static Charset getStdoutEncoding() { - String encoding = System.getProperty("sun.stdout.encoding"); - return encoding != null ? Charset.forName(encoding) : Charset.defaultCharset(); - } - } + int runPipeline(); } diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java new file mode 100644 index 0000000000..321ac7e391 --- /dev/null +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.core.call; + +import java.io.OutputStream; +import java.io.PrintWriter; +import java.nio.charset.Charset; +import java.util.function.Supplier; +import org.apache.ignite.internal.cli.core.decorator.Decorator; +import org.apache.ignite.internal.cli.core.decorator.TerminalOutput; +import org.apache.ignite.internal.cli.core.exception.ExceptionHandler; +import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers; +import org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers; +import org.apache.ignite.internal.cli.decorators.DefaultDecorator; + +/** Builder for {@link CallExecutionPipeline}. */ +public class CallExecutionPipelineBuilder<I extends CallInput, T> { + + private final Call<I, T> call; + + private final ExceptionHandlers exceptionHandlers = new DefaultExceptionHandlers(); + + private Supplier<I> inputProvider; + + private PrintWriter output = wrapOutputStream(System.out); + + private PrintWriter errOutput = wrapOutputStream(System.err); + + private Decorator<T, TerminalOutput> decorator = new DefaultDecorator<>(); + + private boolean verbose; + + CallExecutionPipelineBuilder(Call<I, T> call) { + this.call = call; + } + + private static PrintWriter wrapOutputStream(OutputStream output) { + return new PrintWriter(output, true, getStdoutEncoding()); + } + + private static Charset getStdoutEncoding() { + String encoding = System.getProperty("sun.stdout.encoding"); + return encoding != null ? Charset.forName(encoding) : Charset.defaultCharset(); + } + + public CallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I> inputProvider) { + this.inputProvider = inputProvider; + return this; + } + + public CallExecutionPipelineBuilder<I, T> output(PrintWriter output) { + this.output = output; + return this; + } + + public CallExecutionPipelineBuilder<I, T> output(OutputStream output) { + return output(wrapOutputStream(output)); + } + + public CallExecutionPipelineBuilder<I, T> errOutput(PrintWriter errOutput) { + this.errOutput = errOutput; + return this; + } + + public CallExecutionPipelineBuilder<I, T> errOutput(OutputStream output) { + return errOutput(wrapOutputStream(output)); + } + + public CallExecutionPipelineBuilder<I, T> exceptionHandler(ExceptionHandler<?> exceptionHandler) { + exceptionHandlers.addExceptionHandler(exceptionHandler); + return this; + } + + public CallExecutionPipelineBuilder<I, T> exceptionHandlers(ExceptionHandlers exceptionHandlers) { + this.exceptionHandlers.addExceptionHandlers(exceptionHandlers); + return this; + } + + public CallExecutionPipelineBuilder<I, T> decorator(Decorator<T, TerminalOutput> decorator) { + this.decorator = decorator; + return this; + } + + public CallExecutionPipelineBuilder<I, T> verbose(boolean verbose) { + this.verbose = verbose; + return this; + } + + public CallExecutionPipeline<I, T> build() { + return new SingleCallExecutionPipeline<>(call, output, errOutput, exceptionHandlers, decorator, inputProvider, verbose); + } +} diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressBarTracker.java similarity index 68% copy from modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java copy to modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressBarTracker.java index 1870d732c0..a1beaa70eb 100644 --- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressBarTracker.java @@ -17,13 +17,19 @@ package org.apache.ignite.internal.cli.core.call; -/** - * Call that represents an action that can be performed given an input. - * It can be rest call, dictionary lookup or whatever. - * - * @param <IT> Input for the call. - * @param <OT> Output of the call. - */ -public interface Call<IT extends CallInput, OT> { - CallOutput<OT> execute(IT input); +import me.tongfei.progressbar.ProgressBar; + +/** {@link ProgressBar} based tracker. */ +public class ProgressBarTracker implements ProgressTracker { + private final ProgressBar progressBar; + + public ProgressBarTracker(ProgressBar progressBar) { + this.progressBar = progressBar; + } + + /** {@inheritDoc} */ + @Override + public void track() { + progressBar.step(); + } } diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressTracker.java similarity index 74% copy from modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java copy to modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressTracker.java index 1870d732c0..9e127230cb 100644 --- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressTracker.java @@ -17,13 +17,8 @@ package org.apache.ignite.internal.cli.core.call; -/** - * Call that represents an action that can be performed given an input. - * It can be rest call, dictionary lookup or whatever. - * - * @param <IT> Input for the call. - * @param <OT> Output of the call. - */ -public interface Call<IT extends CallInput, OT> { - CallOutput<OT> execute(IT input); +/** Progress tracker that will be called periodically during the call execution. */ +public interface ProgressTracker { + /** Tracks that the step is performed. */ + void track(); // todo: add the increment parameter in https://issues.apache.org/jira/browse/IGNITE-18731 } diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipeline.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipeline.java new file mode 100644 index 0000000000..23e2b5e307 --- /dev/null +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipeline.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cli.core.call; + +import java.io.PrintWriter; +import java.util.function.Supplier; +import org.apache.ignite.internal.cli.core.decorator.Decorator; +import org.apache.ignite.internal.cli.core.decorator.TerminalOutput; +import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers; + +/** + * Call execution pipeline that executes a single call. + * + * @param <I> Call input type. + * @param <T> Call output's body type. + */ +public class SingleCallExecutionPipeline<I extends CallInput, T> extends AbstractCallExecutionPipeline<I, T> { + /** Call to execute. */ + private final Call<I, T> call; + + SingleCallExecutionPipeline( + Call<I, T> call, + PrintWriter output, + PrintWriter errOutput, + ExceptionHandlers exceptionHandlers, + Decorator<T, TerminalOutput> decorator, + Supplier<I> inputProvider, + boolean verbose + ) { + super(output, errOutput, exceptionHandlers, decorator, inputProvider, verbose); + this.call = call; + } + + + @Override + public int runPipelineInternal() { + I callInput = inputProvider.get(); + + CallOutput<T> callOutput = call.execute(callInput); + + return handleResult(callOutput); + } +} diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java index 85906f7f7a..5924efeb85 100644 --- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.cli.sql; import java.time.Duration; import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; /** * SQL schema provider. @@ -30,9 +32,9 @@ public class SqlSchemaProvider implements SchemaProvider { private final int schemaUpdateTimeout; - private SqlSchema schema; + private final AtomicReference<SqlSchema> schema = new AtomicReference<>(null); - private Instant lastUpdate; + private final AtomicReference<Instant> lastUpdate = new AtomicReference<>(Instant.now()); public SqlSchemaProvider(MetadataSupplier metadataSupplier) { this(metadataSupplier, SCHEMA_UPDATE_TIMEOUT); @@ -45,11 +47,22 @@ public class SqlSchemaProvider implements SchemaProvider { @Override public SqlSchema getSchema() { - if (schema == null || Duration.between(lastUpdate, Instant.now()).toSeconds() >= schemaUpdateTimeout) { - schema = sqlSchemaLoader.loadSchema(); - lastUpdate = Instant.now(); + if (schema.compareAndSet(null, sqlSchemaLoader.loadSchema())) { + lastUpdate.set(Instant.now()); + return schema.get(); + } else if (Duration.between(lastUpdate.get(), Instant.now()).toSeconds() >= schemaUpdateTimeout) { + CompletableFuture.supplyAsync(() -> { + schema.set(sqlSchemaLoader.loadSchema()); + lastUpdate.set(Instant.now()); + return schema.get(); + }); } - return schema; + + return schema.get(); } + public void initStateAsync() { + // trigger schema loading in background + CompletableFuture.supplyAsync(this::getSchema); + } } diff --git a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java index a1aac24059..d34b2b1cb9 100644 --- a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java +++ b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java @@ -24,17 +24,24 @@ import static org.hamcrest.Matchers.startsWith; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.cli.core.exception.TestExceptionHandler; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class PipelineTest { + private StringWriter out = new StringWriter(); + private StringWriter errOut = new StringWriter(); + + @BeforeEach + void setUp() { + out = new StringWriter(); + errOut = new StringWriter(); + } + @Test void verboseTest() { - // Given - StringWriter out = new StringWriter(); - StringWriter errOut = new StringWriter(); - // When start pipeline with verbose CallExecutionPipeline.builder(new ThrowingStrCall()) .inputProvider(StringCallInput::new) @@ -49,4 +56,44 @@ class PipelineTest { assertThat(errOut.toString(), startsWith("Ooops!" + System.lineSeparator())); assertThat(errOut.toString(), containsString("verbose output")); } + + @Test + void asyncCallFailedFuture() { + // Given async call that retuns failed future + AsyncCall<StringCallInput, ?> asyncCall = callInput -> CompletableFuture.failedFuture(new RuntimeException("Ooops!")); + + // When start async pipeline with verbose + CallExecutionPipeline.asyncBuilder(ignoredProgressTracker -> asyncCall) + .inputProvider(StringCallInput::new) + .exceptionHandler(new TestExceptionHandler()) + .output(new PrintWriter(out)) + .errOutput(new PrintWriter(errOut)) + .verbose(true) + .build().runPipeline(); + + // Then error output contains the message from exception and contains verbose output + assertThat(errOut.toString(), containsString("Ooops!" + System.lineSeparator())); + assertThat(errOut.toString(), containsString("verbose output")); + } + + @Test + void asyncCallThrowingMethod() { + // Given async call that throws an exception + AsyncCall<StringCallInput, ?> asyncCall = callInput -> { + throw new RuntimeException("Ooops!"); + }; + + // When start async pipeline with verbose + CallExecutionPipeline.asyncBuilder(ignoredProgressTracker -> asyncCall) + .inputProvider(StringCallInput::new) + .exceptionHandler(new TestExceptionHandler()) + .output(new PrintWriter(out)) + .errOutput(new PrintWriter(errOut)) + .verbose(true) + .build().runPipeline(); + + // Then error output contains the message from exception and contains verbose output + assertThat(errOut.toString(), containsString("Ooops!" + System.lineSeparator())); + assertThat(errOut.toString(), containsString("verbose output")); + } } diff --git a/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java b/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java index e96ecc35ca..37030c8b77 100644 --- a/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java +++ b/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java @@ -44,16 +44,26 @@ class SqlSchemaProviderTest { } @Test - public void testProviderWithoutTimeout() { + public void testProviderWithoutTimeout() throws InterruptedException { SqlSchemaProvider provider = new SqlSchemaProvider(supplier, 0); - Assertions.assertNotEquals(provider.getSchema(), provider.getSchema()); + + SqlSchema firstSchema = provider.getSchema(); + provider.getSchema(); // trigger update + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + + Assertions.assertNotEquals(firstSchema, provider.getSchema()); } @Test public void testProviderWith1secTimeout() throws InterruptedException { SqlSchemaProvider provider = new SqlSchemaProvider(supplier, 1); + + provider.initStateAsync(); + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + SqlSchema schema = provider.getSchema(); Thread.sleep(TimeUnit.SECONDS.toMillis(2)); + Assertions.assertNotEquals(schema, provider.getSchema()); } }