Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2443#discussion_r154631668
--- Diff:
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
---
@@ -27,50 +30,46 @@
import org.apache.calcite.rex.RexNode;
import org.apache.storm.sql.planner.StormRelUtils;
import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-import org.apache.storm.tuple.Fields;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import
org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Values;
-import java.util.List;
+public class StreamsStreamInsertRel extends StormStreamInsertRelBase
implements StreamsRel {
+ private final int primaryKeyIndex;
-public class TridentStreamInsertRel extends StormStreamInsertRelBase
implements TridentRel {
- public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet
traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child,
Operation operation, List<String> updateColumnList, List<RexNode>
sourceExpressionList, boolean flattened) {
+ public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet
traits, RelOptTable table, Prepare.CatalogReader catalogReader,
+ RelNode child, Operation operation,
List<String> updateColumnList, List<RexNode> sourceExpressionList,
+ boolean flattened, int primaryKeyIndex) {
super(cluster, traits, table, catalogReader, child, operation,
updateColumnList, sourceExpressionList, flattened);
+ this.primaryKeyIndex = primaryKeyIndex;
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new TridentStreamInsertRel(getCluster(), traitSet,
getTable(), getCatalogReader(),
- sole(inputs), getOperation(), getUpdateColumnList(),
getSourceExpressionList(), isFlattened());
+ return new StreamsStreamInsertRel(getCluster(), traitSet,
getTable(), getCatalogReader(),
+ sole(inputs), getOperation(), getUpdateColumnList(),
getSourceExpressionList(), isFlattened(),
+ primaryKeyIndex);
}
@Override
- public void tridentPlan(TridentPlanCreator planCreator) throws
Exception {
+ public void streamsPlan(StreamsPlanCreator planCreator) throws
Exception {
// SingleRel
RelNode input = getInput();
- StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
- Stream inputStream = planCreator.pop().toStream();
-
- String stageName = StormRelUtils.getStageName(this);
+ StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+ Stream<Values> inputStream = planCreator.pop();
Preconditions.checkArgument(isInsert(), "Only INSERT statement is
supported.");
- List<String> inputFields = this.input.getRowType().getFieldNames();
- List<String> outputFields = getRowType().getFieldNames();
+ // Calcite ensures that the value is structurized to the table
definition
+ // hence we can use PK index directly
- // FIXME: this should be really different...
String tableName =
Joiner.on('.').join(getTable().getQualifiedName());
- ISqlTridentDataSource.SqlTridentConsumer consumer =
planCreator.getSources().get(tableName).getConsumer();
+ IRichBolt consumer =
planCreator.getSources().get(tableName).getConsumer();
- // In fact this is normally the end of stream, but I'm still not
sure so I open new streams based on State values
- IAggregatableStream finalStream = inputStream
- .partitionPersist(consumer.getStateFactory(), new
Fields(inputFields), consumer.getStateUpdater(),
- new Fields(outputFields))
- .newValuesStream().name(stageName);
+ inputStream.mapToPair(new
StreamInsertMapToPairFunction(primaryKeyIndex)).to(consumer);
--- End diff --
To make logic simple, it assumes that all the tables have one PK (which it
should be extended to support composed key), and provides
PairStream(KeyedStream) to consumer bolt.
---