Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2443#discussion_r154631668
  
    --- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
 ---
    @@ -27,50 +30,46 @@
     import org.apache.calcite.rex.RexNode;
     import org.apache.storm.sql.planner.StormRelUtils;
     import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
    -import org.apache.storm.sql.planner.trident.TridentPlanCreator;
    -import org.apache.storm.sql.runtime.ISqlTridentDataSource;
    -import org.apache.storm.trident.Stream;
    -import org.apache.storm.trident.fluent.IAggregatableStream;
    -import org.apache.storm.tuple.Fields;
    +import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
    +import 
org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
    +import org.apache.storm.streams.Stream;
    +import org.apache.storm.topology.IRichBolt;
    +import org.apache.storm.tuple.Values;
     
    -import java.util.List;
    +public class StreamsStreamInsertRel extends StormStreamInsertRelBase 
implements StreamsRel {
    +    private final int primaryKeyIndex;
     
    -public class TridentStreamInsertRel extends StormStreamInsertRelBase 
implements TridentRel {
    -    public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet 
traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, 
Operation operation, List<String> updateColumnList, List<RexNode> 
sourceExpressionList, boolean flattened) {
    +    public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet 
traits, RelOptTable table, Prepare.CatalogReader catalogReader,
    +                                  RelNode child, Operation operation, 
List<String> updateColumnList, List<RexNode> sourceExpressionList,
    +                                  boolean flattened, int primaryKeyIndex) {
             super(cluster, traits, table, catalogReader, child, operation, 
updateColumnList, sourceExpressionList, flattened);
    +        this.primaryKeyIndex = primaryKeyIndex;
         }
     
         @Override
         public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
    -        return new TridentStreamInsertRel(getCluster(), traitSet, 
getTable(), getCatalogReader(),
    -                sole(inputs), getOperation(), getUpdateColumnList(), 
getSourceExpressionList(), isFlattened());
    +        return new StreamsStreamInsertRel(getCluster(), traitSet, 
getTable(), getCatalogReader(),
    +                sole(inputs), getOperation(), getUpdateColumnList(), 
getSourceExpressionList(), isFlattened(),
    +                primaryKeyIndex);
         }
     
         @Override
    -    public void tridentPlan(TridentPlanCreator planCreator) throws 
Exception {
    +    public void streamsPlan(StreamsPlanCreator planCreator) throws 
Exception {
             // SingleRel
             RelNode input = getInput();
    -        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
    -        Stream inputStream = planCreator.pop().toStream();
    -
    -        String stageName = StormRelUtils.getStageName(this);
    +        StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
    +        Stream<Values> inputStream = planCreator.pop();
     
             Preconditions.checkArgument(isInsert(), "Only INSERT statement is 
supported.");
     
    -        List<String> inputFields = this.input.getRowType().getFieldNames();
    -        List<String> outputFields = getRowType().getFieldNames();
    +        // Calcite ensures that the value is structurized to the table 
definition
    +        // hence we can use PK index directly
     
    -        // FIXME: this should be really different...
             String tableName = 
Joiner.on('.').join(getTable().getQualifiedName());
    -        ISqlTridentDataSource.SqlTridentConsumer consumer = 
planCreator.getSources().get(tableName).getConsumer();
    +        IRichBolt consumer = 
planCreator.getSources().get(tableName).getConsumer();
     
    -        // In fact this is normally the end of stream, but I'm still not 
sure so I open new streams based on State values
    -        IAggregatableStream finalStream = inputStream
    -                .partitionPersist(consumer.getStateFactory(), new 
Fields(inputFields), consumer.getStateUpdater(),
    -                        new Fields(outputFields))
    -                .newValuesStream().name(stageName);
    +        inputStream.mapToPair(new 
StreamInsertMapToPairFunction(primaryKeyIndex)).to(consumer);
    --- End diff --
    
    To make logic simple, it assumes that all the tables have one PK (which it 
should be extended to support composed key), and provides 
PairStream(KeyedStream) to consumer bolt.


---

Reply via email to