Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2443#discussion_r202633872
  
    --- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
 ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.storm.sql.planner.streams.rel;
    +
    +import java.util.List;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.base.Preconditions;
    +import org.apache.calcite.plan.RelOptCluster;
    +import org.apache.calcite.plan.RelOptTable;
    +import org.apache.calcite.plan.RelTraitSet;
    +import org.apache.calcite.prepare.Prepare;
    +import org.apache.calcite.rel.RelNode;
    +import org.apache.calcite.rex.RexNode;
    +import org.apache.storm.sql.planner.StormRelUtils;
    +import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
    +import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
    +import 
org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
    +import org.apache.storm.streams.Stream;
    +import org.apache.storm.topology.IRichBolt;
    +import org.apache.storm.tuple.Values;
    +
    +public class StreamsStreamInsertRel extends StormStreamInsertRelBase 
implements StreamsRel {
    +    private final int primaryKeyIndex;
    +
    +    public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet 
traits, RelOptTable table, Prepare.CatalogReader catalogReader,
    +                                  RelNode child, Operation operation, 
List<String> updateColumnList, List<RexNode> sourceExpressionList,
    +                                  boolean flattened, int primaryKeyIndex) {
    +        super(cluster, traits, table, catalogReader, child, operation, 
updateColumnList, sourceExpressionList, flattened);
    +        this.primaryKeyIndex = primaryKeyIndex;
    +    }
    +
    +    @Override
    +    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
    +        return new StreamsStreamInsertRel(getCluster(), traitSet, 
getTable(), getCatalogReader(),
    +                sole(inputs), getOperation(), getUpdateColumnList(), 
getSourceExpressionList(), isFlattened(), primaryKeyIndex);
    +    }
    +
    +    @Override
    +    public void streamsPlan(StreamsPlanCreator planCreator) throws 
Exception {
    +        // SingleRel
    +        RelNode input = getInput();
    +        StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
    +        Stream<Values> inputStream = planCreator.pop();
    +
    +        Preconditions.checkArgument(isInsert(), "Only INSERT statement is 
supported.");
    +
    +        // Calcite ensures that the value is structurized to the table 
definition
    +        // hence we can use PK index directly
    +        // To elaborate, if table BAR is defined as ID INTEGER PK, NAME 
VARCHAR, DEPTID INTEGER
    +        // and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is 
executed,
    +        // Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to 
the value before INSERT.
    +
    +        // FIXME: this should be really different...
    --- End diff --
    
    Removing the line would be right for now... since sadly I forgot why I add 
FIXME line here.


---

Reply via email to