zkaoudi commented on code in PR #708:
URL: https://github.com/apache/wayang/pull/708#discussion_r3241094848
##########
wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java:
##########
@@ -86,40 +75,58 @@ public Tuple<Collection<ExecutionLineageNode>,
Collection<ChannelInstance>> eval
ChannelInstance[] outputs,
JavaExecutor executor,
OptimizationContext.OperatorContext operatorContext) {
- // Cast the inputs and outputs.
+
final SqlQueryChannel.Instance input = (SqlQueryChannel.Instance)
inputs[0];
final StreamChannel.Instance output = (StreamChannel.Instance)
outputs[0];
- GenericJdbcPlatform producerPlatform = (GenericJdbcPlatform)
input.getChannel().getProducer().getPlatform();
+ GenericJdbcPlatform producerPlatform =
+ (GenericJdbcPlatform)
input.getChannel().getProducer().getPlatform();
+
+ // Fix: safely resolve JDBC name
Review Comment:
What is this fix? a TODO?
##########
wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java:
##########
@@ -86,40 +75,58 @@ public Tuple<Collection<ExecutionLineageNode>,
Collection<ChannelInstance>> eval
ChannelInstance[] outputs,
JavaExecutor executor,
OptimizationContext.OperatorContext operatorContext) {
- // Cast the inputs and outputs.
+
final SqlQueryChannel.Instance input = (SqlQueryChannel.Instance)
inputs[0];
final StreamChannel.Instance output = (StreamChannel.Instance)
outputs[0];
- GenericJdbcPlatform producerPlatform = (GenericJdbcPlatform)
input.getChannel().getProducer().getPlatform();
+ GenericJdbcPlatform producerPlatform =
+ (GenericJdbcPlatform)
input.getChannel().getProducer().getPlatform();
+
+ // Fix: safely resolve JDBC name
+ String jdbcName = input.getJdbcName();
+ if (jdbcName == null || jdbcName.trim().isEmpty()) {
+ jdbcName = producerPlatform.getPlatformId();
+ }
+
final Connection connection = producerPlatform
-
.createDatabaseDescriptor(executor.getConfiguration(),input.getJdbcName())
+ .createDatabaseDescriptor(executor.getConfiguration(),
jdbcName)
.createJdbcConnection();
Iterator<Record> resultSetIterator = new ResultSetIterator(connection,
input.getSqlQuery());
- Spliterator<Record> resultSetSpliterator =
Spliterators.spliteratorUnknownSize(resultSetIterator, 0);
- Stream<Record> resultSetStream =
StreamSupport.stream(resultSetSpliterator, false);
+ Spliterator<Record> resultSetSpliterator =
+ Spliterators.spliteratorUnknownSize(resultSetIterator, 0);
+ Stream<Record> resultSetStream =
+ StreamSupport.stream(resultSetSpliterator, false);
output.accept(resultSetStream);
ExecutionLineageNode queryLineageNode = new
ExecutionLineageNode(operatorContext);
- queryLineageNode.add(LoadProfileEstimators.createFromSpecification(
- String.format("wayang.%s.sqltostream.load.query",
this.jdbcPlatform.getPlatformId()),
+ queryLineageNode.add(
+ LoadProfileEstimators.createFromSpecification(
+ String.format("wayang.%s.sqltostream.load.query",
+ this.jdbcPlatform.getPlatformId()),
executor.getConfiguration()
- ));
+ )
+ );
queryLineageNode.addPredecessor(input.getLineage());
+
ExecutionLineageNode outputLineageNode = new
ExecutionLineageNode(operatorContext);
- outputLineageNode.add(LoadProfileEstimators.createFromSpecification(
- String.format("wayang.%s.sqltostream.load.output",
this.jdbcPlatform.getPlatformId()),
- executor.getConfiguration()
- ));
+ outputLineageNode.add(
Review Comment:
please revert back all changes you did regarding line spaces and formatting.
The reasons for that is (1) it is not necessary and (2) makes code reviewing
much harder with so many diffs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]