Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167939768 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List<URL> dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List<URL> jars, List<URL> libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration + flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir); + } catch (Exception e) { + throw new SqlClientException("Could not load Flink configuration.", e); + } + + // try to find a default environment + if (defaultEnv == null) { + final String defaultFilePath = flinkConfigDir + "/" + DEFAULT_ENV_FILE; + System.out.println("No default environment specified."); + System.out.print("Searching for '" + defaultFilePath + "'..."); + final File file = new File(defaultFilePath); + if (file.exists()) { + System.out.println("found."); + try { + defaultEnv = Path.fromLocalFile(file).toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } else { + System.out.println("not found."); + } + } + + // inform user + if (defaultEnv != null) { + System.out.println("Reading default environment from: " + defaultEnv); + try { + environment = Environment.parse(defaultEnv); + } catch (IOException e) { + throw new SqlClientException("Could not read default environment file at: " + defaultEnv, e); + } + } else { + environment = new Environment(); + } + + // discover dependencies + dependencies = new ArrayList<>(); + try { + // find jar files + for (URL url : jars) { + JobWithJars.checkJarFile(url); + dependencies.add(url); + } + + // find jar files in library directories + for (URL libUrl : libraries) { + final File dir = new File(libUrl.toURI()); + if (!dir.isDirectory() || !dir.canRead()) { + throw new SqlClientException("Directory cannot be read: " + dir); + } + final File[] files = dir.listFiles(); + if (files == null) { + throw new SqlClientException("Directory cannot be read: " + dir); + } + for (File f : files) { + // only consider jars + if (f.isFile() && f.getAbsolutePath().toLowerCase().endsWith(".jar")) { + final URL url = f.toURI().toURL(); + JobWithJars.checkJarFile(url); + dependencies.add(url); + } + } + } + } catch (Exception e) { + throw new SqlClientException("Could not load all required JAR files.", e); + } + + // prepare result store + resultStore = new ResultStore(flinkConfig); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public void start() { + // nothing to do yet + } + + @Override + public Map<String, String> getSessionProperties(SessionContext context) throws SqlExecutionException { + final Environment env = getEnvironment(context); + final Map<String, String> properties = new HashMap<>(); + properties.putAll(env.getExecution().toProperties()); + properties.putAll(env.getDeployment().toProperties()); + return properties; + } + + @Override + public List<String> listTables(SessionContext context) throws SqlExecutionException { + final Environment env = getEnvironment(context); + final TableEnvironment tableEnv = getTableEnvironment(env); + return Arrays.asList(tableEnv.listTables()); + } + + @Override + public TableSchema getTableSchema(SessionContext context, String name) throws SqlExecutionException { + final Environment env = getEnvironment(context); + final TableEnvironment tableEnv = getTableEnvironment(env); + try { + return tableEnv.scan(name).getSchema(); + } catch (TableException e) { + return null; // no table with this name found + } + } + + @Override + public String explainStatement(SessionContext context, String statement) throws SqlExecutionException { + final Environment env = getEnvironment(context); + final TableEnvironment tableEnv = getTableEnvironment(env); + try { + final Table table = tableEnv.sqlQuery(statement); + return tableEnv.explain(table); + } catch (Throwable t) { + // catch everything such that the query does not crash the executor + throw new SqlExecutionException("Invalid SQL statement.", t); + } + } + + @Override + public ResultDescriptor executeQuery(SessionContext context, String query) throws SqlExecutionException { + final Environment env = getEnvironment(context); + + // deployment + final ClusterClient<?> clusterClient = prepareDeployment(env.getDeployment()); + + // initialize result + final DynamicResult result = resultStore.createResult(env); + + // create plan with jars + final Tuple2<JobGraph, TableSchema> plan = createPlan(context.getName(), env, query, result.getTableSink(), clusterClient); + + // store the result with a unique id (the job id for now) + final String resultId = plan.f0.getJobID().toString(); + resultStore.storeResult(resultId, result); + + // create class loader + final ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader( + dependencies, + Collections.emptyList(), + this.getClass().getClassLoader()); + + // create execution + final Runnable program = () -> { + // we need to submit the job attached for now + // otherwise it is not possible to retrieve the reason why an execution failed + try { + clusterClient.run(plan.f0, classLoader); + } catch (ProgramInvocationException e) { + throw new SqlExecutionException("Could not execute table program.", e); + } finally { + try { + clusterClient.shutdown(); + } catch (Exception e) { + // ignore + } + } + }; + + // start result retrieval + result.startRetrieval(program); + + return new ResultDescriptor(resultId, plan.f1, result.isChangelog()); + } + + @Override + public TypedResult<Tuple2<Boolean, Row>> retrieveResultRecord(SessionContext context, String resultId) throws SqlExecutionException { --- End diff -- Why can changelog streams only be read one by one? Can't batch them together like an append table?
---