github-actions[bot] opened a new issue, #383:
URL: https://github.com/apache/incubator-wayang/issues/383
Load ChannelInstances from executionState? (as of now there is no input into
PostgreSQL).
this.connection.close();
} catch (SQLException e) {
this.logger.error("Could not close JDBC connection to PostgreSQL
correctly.", e);
}
https://github.com/apache/incubator-wayang/blob/5e6a07edaae3e7ebac9ac73814f95bafe741845d/wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java#L80
```java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.genericjdbc.execution;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.platform.ExecutionState;
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.ExecutorTemplate;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
import org.apache.wayang.genericjdbc.operators.GenericJdbcExecutionOperator;
import org.apache.wayang.genericjdbc.operators.GenericJdbcFilterOperator;
import org.apache.wayang.genericjdbc.operators.GenericJdbcProjectionOperator;
import org.apache.wayang.genericjdbc.operators.GenericJdbcTableSource;
import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform;
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
/**
* {@link Executor} implementation for the {@link GenericJdbcPlatform}.
*/
public class GenericJdbcExecutor extends ExecutorTemplate {
private final GenericJdbcPlatform platform;
private final Connection connection = null;
private final Logger logger = LogManager.getLogger(this.getClass());
private final FunctionCompiler functionCompiler = new FunctionCompiler();
public GenericJdbcExecutor(GenericJdbcPlatform platform, Job job) {
super(job.getCrossPlatformExecutor());
this.platform = platform;
// this.connection =
this.platform.createDatabaseDescriptor(job.getConfiguration()).createJdbcConnection();
}
@Override
public void execute(ExecutionStage stage, OptimizationContext
optimizationContext, ExecutionState executionState) {
// TODO: Load ChannelInstances from executionState? (as of now there
is no input into PostgreSQL).
Collection<?> startTasks = stage.getStartTasks();
Collection<?> termTasks = stage.getTerminalTasks();
// Verify that we can handle this instance.
assert startTasks.size() == 1 : "Invalid jdbc stage: multiple
sources are not currently supported";
ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0];
assert termTasks.size() == 1 : "Invalid JDBC stage: multiple
terminal tasks are not currently supported.";
ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0];
assert startTask.getOperator() instanceof GenericJdbcTableSource :
"Invalid JDBC stage: Start task has to be a TableSource";
// Extract the different types of ExecutionOperators from the stage.
GenericJdbcTableSource tableOp = (GenericJdbcTableSource)
startTask.getOperator();
SqlQueryChannel.Instance tipChannelInstance =
this.instantiateOutboundChannel(startTask, optimizationContext);
Collection<ExecutionTask> filterTasks = new ArrayList<>(4);
ExecutionTask projectionTask = null;
Set<ExecutionTask> allTasks = stage.getAllTasks();
assert allTasks.size() <= 3;
ExecutionTask nextTask =
this.findGenericJdbcExecutionOperatorTaskInStage(startTask, stage);
while (nextTask != null) {
// Evaluate the nextTask.
if (nextTask.getOperator() instanceof GenericJdbcFilterOperator)
{
filterTasks.add(nextTask);
} else if (nextTask.getOperator() instanceof
GenericJdbcProjectionOperator) {
assert projectionTask == null; //Allow one projection
operator per stage for now.
projectionTask = nextTask;
} else {
throw new WayangException(String.format("Unsupported JDBC
execution task %s", nextTask.toString()));
}
// Move the tipChannelInstance.
tipChannelInstance = this.instantiateOutboundChannel(nextTask,
optimizationContext, tipChannelInstance);
// Go to the next nextTask.
nextTask =
this.findGenericJdbcExecutionOperatorTaskInStage(nextTask, stage);
}
// Create the SQL query.
String tableName = this.getSqlClause(tableOp);
String jdbcName = tableOp.jdbcName;
Collection<String> conditions = filterTasks.stream()
.map(ExecutionTask::getOperator)
.map(this::getSqlClause)
.collect(Collectors.toList());
String projection = projectionTask == null ? "*" :
this.getSqlClause(projectionTask.getOperator());
String query = this.createSqlQuery(tableName, conditions,
projection);
tipChannelInstance.setSqlQuery(query);
tipChannelInstance.setJdbcName(jdbcName);
// Return the tipChannelInstance.
executionState.register(tipChannelInstance);
}
/**
* Retrieves the follow-up {@link ExecutionTask} of the given {@code
task} unless it is not comprising a
* {@link GenericJdbcExecutionOperator} and/or not in the given {@link
ExecutionStage}.
*
* @param task whose follow-up {@link ExecutionTask} is requested;
should have a single follower
* @param stage in which the follow-up {@link ExecutionTask} should be
* @return the said follow-up {@link ExecutionTask} or {@code null} if
none
*/
private ExecutionTask
findGenericJdbcExecutionOperatorTaskInStage(ExecutionTask task, ExecutionStage
stage) {
assert task.getNumOuputChannels() == 1;
final Channel outputChannel = task.getOutputChannel(0);
final ExecutionTask consumer =
WayangCollections.getSingle(outputChannel.getConsumers());
return consumer.getStage() == stage && consumer.getOperator()
instanceof GenericJdbcExecutionOperator ?
consumer :
null;
}
/**
* Instantiates the outbound {@link SqlQueryChannel} of an {@link
ExecutionTask}.
*
* @param task whose outbound {@link SqlQueryChannel}
should be instantiated
* @param optimizationContext provides information about the {@link
ExecutionTask}
* @return the {@link SqlQueryChannel.Instance}
*/
private SqlQueryChannel.Instance
instantiateOutboundChannel(ExecutionTask task,
OptimizationContext optimizationContext) {
assert task.getNumOuputChannels() == 1 : String.format("Illegal
task: %s.", task);
assert task.getOutputChannel(0) instanceof SqlQueryChannel :
String.format("Illegal task: %s.", task);
SqlQueryChannel outputChannel = (SqlQueryChannel)
task.getOutputChannel(0);
OptimizationContext.OperatorContext operatorContext =
optimizationContext.getOperatorContext(task.getOperator());
return outputChannel.createInstance(this, operatorContext, 0);
}
/**
* Instantiates the outbound {@link SqlQueryChannel} of an {@link
ExecutionTask}.
*
* @param task whose outbound {@link
SqlQueryChannel} should be instantiated
* @param optimizationContext provides information about the
{@link ExecutionTask}
* @param predecessorChannelInstance preceeding {@link
SqlQueryChannel.Instance} to keep track of lineage
* @return the {@link SqlQueryChannel.Instance}
*/
private SqlQueryChannel.Instance
instantiateOutboundChannel(ExecutionTask task,
OptimizationContext optimizationContext,
SqlQueryChannel.Instance predecessorChannelInstance) {
final SqlQueryChannel.Instance newInstance =
this.instantiateOutboundChannel(task, optimizationContext);
newInstance.getLineage().addPredecessor(predecessorChannelInstance.getLineage());
return newInstance;
}
/**
* Creates a SQL query.
*
* @param tableName the table to be queried
* @param conditions conditions for the {@code WHERE} clause
* @param projection projection for the {@code SELECT} clause
* @return the SQL query
*/
protected String createSqlQuery(String tableName, Collection<String>
conditions, String projection) {
StringBuilder sb = new StringBuilder(1000);
sb.append("SELECT ").append(projection).append(" FROM
").append(tableName);
if (!conditions.isEmpty()) {
sb.append(" WHERE ");
String separator = "";
for (String condition : conditions) {
sb.append(separator).append(condition);
separator = " AND ";
}
}
sb.append(';');
return sb.toString();
}
/**
* Creates a SQL clause that corresponds to the given {@link Operator}.
*
* @param operator for that the SQL clause should be generated
* @return the SQL clause
*/
private String getSqlClause(Operator operator) {
return ((GenericJdbcExecutionOperator)
operator).createSqlClause(this.connection, this.functionCompiler);
}
@Override
public void dispose() {
// try {
// this.connection.close();
// } catch (SQLException e) {
// this.logger.error("Could not close JDBC connection to
PostgreSQL correctly.", e);
// }
return;
}
@Override
public Platform getPlatform() {
return this.platform;
}
private void saveResult(FileChannel.Instance outputFileChannelInstance,
ResultSet rs) throws IOException, SQLException {
// Output results.
final FileSystem outFs =
FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get();
try (final OutputStreamWriter writer = new
OutputStreamWriter(outFs.create(outputFileChannelInstance.getSinglePath()))) {
while (rs.next()) {
//System.out.println(rs.getInt(1) + " " + rs.getString(2));
ResultSetMetaData rsmd = rs.getMetaData();
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
writer.write(rs.getString(i));
if (i < rsmd.getColumnCount()) {
writer.write('\t');
}
}
if (!rs.isLast()) {
writer.write('\n');
}
}
} catch (UncheckedIOException e) {
throw e.getCause();
}
}
}
```
7a42fa55604535c1107f2822d76f2fccb089c23e
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]