Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r154631668 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java --- @@ -27,50 +30,46 @@ import org.apache.calcite.rex.RexNode; import org.apache.storm.sql.planner.StormRelUtils; import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase; -import org.apache.storm.sql.planner.trident.TridentPlanCreator; -import org.apache.storm.sql.runtime.ISqlTridentDataSource; -import org.apache.storm.trident.Stream; -import org.apache.storm.trident.fluent.IAggregatableStream; -import org.apache.storm.tuple.Fields; +import org.apache.storm.sql.planner.streams.StreamsPlanCreator; +import org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction; +import org.apache.storm.streams.Stream; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.tuple.Values; -import java.util.List; +public class StreamsStreamInsertRel extends StormStreamInsertRelBase implements StreamsRel { + private final int primaryKeyIndex; -public class TridentStreamInsertRel extends StormStreamInsertRelBase implements TridentRel { - public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { + public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, + RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, + boolean flattened, int primaryKeyIndex) { super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened); + this.primaryKeyIndex = primaryKeyIndex; } @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new TridentStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(), - sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); + return new StreamsStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(), + sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), + primaryKeyIndex); } @Override - public void tridentPlan(TridentPlanCreator planCreator) throws Exception { + public void streamsPlan(StreamsPlanCreator planCreator) throws Exception { // SingleRel RelNode input = getInput(); - StormRelUtils.getStormRelInput(input).tridentPlan(planCreator); - Stream inputStream = planCreator.pop().toStream(); - - String stageName = StormRelUtils.getStageName(this); + StormRelUtils.getStormRelInput(input).streamsPlan(planCreator); + Stream<Values> inputStream = planCreator.pop(); Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported."); - List<String> inputFields = this.input.getRowType().getFieldNames(); - List<String> outputFields = getRowType().getFieldNames(); + // Calcite ensures that the value is structurized to the table definition + // hence we can use PK index directly - // FIXME: this should be really different... String tableName = Joiner.on('.').join(getTable().getQualifiedName()); - ISqlTridentDataSource.SqlTridentConsumer consumer = planCreator.getSources().get(tableName).getConsumer(); + IRichBolt consumer = planCreator.getSources().get(tableName).getConsumer(); - // In fact this is normally the end of stream, but I'm still not sure so I open new streams based on State values - IAggregatableStream finalStream = inputStream - .partitionPersist(consumer.getStateFactory(), new Fields(inputFields), consumer.getStateUpdater(), - new Fields(outputFields)) - .newValuesStream().name(stageName); + inputStream.mapToPair(new StreamInsertMapToPairFunction(primaryKeyIndex)).to(consumer); --- End diff -- To make logic simple, it assumes that all the tables have one PK (which it should be extended to support composed key), and provides PairStream(KeyedStream) to consumer bolt.
---