Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2443#discussion_r202633872
--- Diff:
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import
org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Values;
+
+public class StreamsStreamInsertRel extends StormStreamInsertRelBase
implements StreamsRel {
+ private final int primaryKeyIndex;
+
+ public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet
traits, RelOptTable table, Prepare.CatalogReader catalogReader,
+ RelNode child, Operation operation,
List<String> updateColumnList, List<RexNode> sourceExpressionList,
+ boolean flattened, int primaryKeyIndex) {
+ super(cluster, traits, table, catalogReader, child, operation,
updateColumnList, sourceExpressionList, flattened);
+ this.primaryKeyIndex = primaryKeyIndex;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new StreamsStreamInsertRel(getCluster(), traitSet,
getTable(), getCatalogReader(),
+ sole(inputs), getOperation(), getUpdateColumnList(),
getSourceExpressionList(), isFlattened(), primaryKeyIndex);
+ }
+
+ @Override
+ public void streamsPlan(StreamsPlanCreator planCreator) throws
Exception {
+ // SingleRel
+ RelNode input = getInput();
+ StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+ Stream<Values> inputStream = planCreator.pop();
+
+ Preconditions.checkArgument(isInsert(), "Only INSERT statement is
supported.");
+
+ // Calcite ensures that the value is structurized to the table
definition
+ // hence we can use PK index directly
+ // To elaborate, if table BAR is defined as ID INTEGER PK, NAME
VARCHAR, DEPTID INTEGER
+ // and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is
executed,
+ // Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to
the value before INSERT.
+
+ // FIXME: this should be really different...
--- End diff --
Removing the line would be right for now... since sadly I forgot why I add
FIXME line here.
---