[GEARPUMP-217] Add SQL support Author: Buddhi Ayesha <[email protected]> Author: manuzhang <[email protected]>
Closes #221 from manuzhang/merge_sql_into_master. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/995c8cc0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/995c8cc0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/995c8cc0 Branch: refs/heads/master Commit: 995c8cc0cc0b18dd30cbbe218972b0bded297ff0 Parents: f75fb19 Author: Buddhi Ayesha <[email protected]> Authored: Wed Aug 30 22:00:41 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Aug 30 22:01:00 2017 +0800 ---------------------------------------------------------------------- .../examples/wordcountjava/WordCountSpec.scala | 13 +- experiments/sql/README.md | 8 + .../apache/gearpump/sql/planner/Connection.java | 292 ++++++++++++++ .../sql/planner/GearRelDataTypeSystem.java | 41 ++ .../gearpump/sql/planner/GearRuleSets.java | 60 +++ .../gearpump/sql/planner/LogicalPlan.java | 43 ++ .../org/apache/gearpump/sql/planner/Query.java | 80 ++++ .../sql/planner/StreamQueryPlanner.java | 96 +++++ .../gearpump/sql/rel/GearAggregationRel.java | 120 ++++++ .../apache/gearpump/sql/rel/GearFilterRel.java | 47 +++ .../apache/gearpump/sql/rel/GearFlatMapRel.java | 112 ++++++ .../apache/gearpump/sql/rel/GearIOSinkRel.java | 52 +++ .../gearpump/sql/rel/GearIOSourceRel.java | 39 ++ .../gearpump/sql/rel/GearIntersectRel.java | 54 +++ .../apache/gearpump/sql/rel/GearJoinRel.java | 94 +++++ .../gearpump/sql/rel/GearLogicalConvention.java | 65 +++ .../apache/gearpump/sql/rel/GearMinusRel.java | 51 +++ .../apache/gearpump/sql/rel/GearProjectRel.java | 50 +++ .../apache/gearpump/sql/rel/GearRelNode.java | 30 ++ .../sql/rel/GearSetOperatorRelBase.java | 47 +++ .../apache/gearpump/sql/rel/GearSortRel.java | 95 +++++ .../gearpump/sql/rel/GearSqlRelUtils.java | 71 ++++ .../apache/gearpump/sql/rel/GearUnionRel.java | 55 +++ .../apache/gearpump/sql/rel/GearValuesRel.java | 42 ++ .../gearpump/sql/rule/GearAggregationRule.java | 147 +++++++ .../gearpump/sql/rule/GearFilterRule.java | 48 +++ .../gearpump/sql/rule/GearFlatMapRule.java | 52 +++ .../gearpump/sql/rule/GearIOSinkRule.java | 79 ++++ .../gearpump/sql/rule/GearIOSourceRule.java | 46 +++ .../gearpump/sql/rule/GearIntersectRule.java | 51 +++ .../apache/gearpump/sql/rule/GearJoinRule.java | 53 +++ .../apache/gearpump/sql/rule/GearMinusRule.java | 51 +++ .../gearpump/sql/rule/GearProjectRule.java | 48 +++ .../apache/gearpump/sql/rule/GearSortRule.java | 51 +++ .../apache/gearpump/sql/rule/GearUnionRule.java | 49 +++ .../gearpump/sql/rule/GearValuesRule.java | 48 +++ .../apache/gearpump/sql/table/SampleString.java | 45 +++ .../gearpump/sql/table/SampleTransactions.java | 60 +++ .../sql/table/TransactionsTableFactory.java | 88 ++++ .../utils/CalciteFrameworkConfiguration.java | 58 +++ .../gearpump/sql/utils/GearConfiguration.java | 49 +++ .../sql/validator/CalciteSqlValidator.java | 50 +++ .../sql/src/main/resources/log4j.properties | 28 ++ .../gearpump/sql/example/SqlWordCountTest.java | 125 ++++++ .../gearpump/sql/planner/CalciteTest.java | 397 +++++++++++++++++++ .../apache/gearpump/sql/planner/QueryTest.java | 65 +++ experiments/sql/src/test/resources/model.json | 39 ++ .../src/test/resources/sales/Transactions.csv | 6 + project/BuildExperiments.scala | 12 + project/Dependencies.scala | 1 + 50 files changed, 3396 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala index 3736c86..7df8651 100644 --- a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala +++ b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala @@ -18,16 +18,15 @@ package org.apache.gearpump.streaming.examples.wordcountjava -import scala.concurrent.Future -import scala.util.Success - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult import org.apache.gearpump.cluster.{MasterHarness, TestUtil} -import org.apache.gearpump.streaming.examples.wordcountjava.WordCount +import org.apache.gearpump.streaming.examples.wordcountjava.dsl.WordCount +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import scala.concurrent.Future +import scala.util.Success class WordCountSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/README.md ---------------------------------------------------------------------- diff --git a/experiments/sql/README.md b/experiments/sql/README.md new file mode 100644 index 0000000..28b6d2c --- /dev/null +++ b/experiments/sql/README.md @@ -0,0 +1,8 @@ +# SQL Support +This project is about building a SQL layer with [Apache Calcite](https://calcite.apache.org/) to help those who are unfamiliar with Scala/Java to use [Apache Gearpump](http://gearpump.apache.org/). + +## Build +- Build [GearPump SQL](/experiments/sql) + +## Example +- Run [SQL WordCount example](/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java new file mode 100644 index 0000000..e5954f0 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.Frameworks; + +import java.lang.reflect.Type; +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +public class Connection implements CalciteConnection { + + private final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + private String schema = null; + + public SchemaPlus getRootSchema() { + return rootSchema; + } + + public JavaTypeFactory getTypeFactory() { + return null; + } + + public Properties getProperties() { + return null; + } + + public Statement createStatement() throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql) throws SQLException { + return null; + } + + public CallableStatement prepareCall(String sql) throws SQLException { + return null; + } + + public String nativeSQL(String sql) throws SQLException { + return null; + } + + public void setAutoCommit(boolean autoCommit) throws SQLException { + + } + + public boolean getAutoCommit() throws SQLException { + return false; + } + + public void commit() throws SQLException { + + } + + public void rollback() throws SQLException { + + } + + public void close() throws SQLException { + + } + + public boolean isClosed() throws SQLException { + return false; + } + + public DatabaseMetaData getMetaData() throws SQLException { + return null; + } + + public void setReadOnly(boolean readOnly) throws SQLException { + + } + + public boolean isReadOnly() throws SQLException { + return false; + } + + public void setCatalog(String catalog) throws SQLException { + + } + + public String getCatalog() throws SQLException { + return null; + } + + public void setTransactionIsolation(int level) throws SQLException { + + } + + public int getTransactionIsolation() throws SQLException { + return 0; + } + + public SQLWarning getWarnings() throws SQLException { + return null; + } + + public void clearWarnings() throws SQLException { + + } + + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + public Map<String, Class<?>> getTypeMap() throws SQLException { + return null; + } + + public void setTypeMap(Map<String, Class<?>> map) throws SQLException { + + } + + public void setHoldability(int holdability) throws SQLException { + + } + + public int getHoldability() throws SQLException { + return 0; + } + + public Savepoint setSavepoint() throws SQLException { + return null; + } + + public Savepoint setSavepoint(String name) throws SQLException { + return null; + } + + public void rollback(Savepoint savepoint) throws SQLException { + + } + + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + + } + + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return null; + } + + public Clob createClob() throws SQLException { + return null; + } + + public Blob createBlob() throws SQLException { + return null; + } + + public NClob createNClob() throws SQLException { + return null; + } + + public SQLXML createSQLXML() throws SQLException { + return null; + } + + public boolean isValid(int timeout) throws SQLException { + return false; + } + + public void setClientInfo(String name, String value) throws SQLClientInfoException { + + } + + public void setClientInfo(Properties properties) throws SQLClientInfoException { + + } + + public String getClientInfo(String name) throws SQLException { + return null; + } + + public Properties getClientInfo() throws SQLException { + return null; + } + + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return null; + } + + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return null; + } + + public void setSchema(String s) throws SQLException { + schema = s; + } + + public String getSchema() throws SQLException { + return schema; + } + + public void abort(Executor executor) throws SQLException { + + } + + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + + } + + public int getNetworkTimeout() throws SQLException { + return 0; + } + + public CalciteConnectionConfig config() { + return null; + } + + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } + + public <T> Queryable<T> createQuery(Expression expression, Class<T> aClass) { + return null; + } + + public <T> Queryable<T> createQuery(Expression expression, Type type) { + return null; + } + + public <T> T execute(Expression expression, Class<T> aClass) { + return null; + } + + public <T> T execute(Expression expression, Type type) { + return null; + } + + public <T> Enumerator<T> executeQuery(Queryable<T> queryable) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java new file mode 100644 index 0000000..a640e12 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; + +/** + * customized data types. + */ +public class GearRelDataTypeSystem extends RelDataTypeSystemImpl { + + public static final RelDataTypeSystem GEAR_REL_DATATYPE_SYSTEM = new GearRelDataTypeSystem(); + + @Override + public int getMaxNumericScale() { + return 38; + } + + @Override + public int getMaxNumericPrecision() { + return 38; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java new file mode 100644 index 0000000..a962ff1 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.tools.RuleSet; +import org.apache.gearpump.sql.rule.*; + +import java.util.Iterator; + +public class GearRuleSets { + private static final ImmutableSet<RelOptRule> calciteToGearConversionRules = ImmutableSet + .<RelOptRule>builder().add(GearIOSourceRule.INSTANCE, GearProjectRule.INSTANCE, + GearFilterRule.INSTANCE, GearIOSinkRule.INSTANCE, + GearAggregationRule.INSTANCE, GearSortRule.INSTANCE, GearValuesRule.INSTANCE, + GearIntersectRule.INSTANCE, GearMinusRule.INSTANCE, GearUnionRule.INSTANCE, + GearJoinRule.INSTANCE) + .build(); + + public static RuleSet[] getRuleSets() { + return new RuleSet[]{new GearRuleSet( + ImmutableSet.<RelOptRule>builder().addAll(calciteToGearConversionRules).build())}; + } + + private static class GearRuleSet implements RuleSet { + final ImmutableSet<RelOptRule> rules; + + public GearRuleSet(ImmutableSet<RelOptRule> rules) { + this.rules = rules; + } + + public GearRuleSet(ImmutableList<RelOptRule> rules) { + this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build(); + } + + @Override + public Iterator<RelOptRule> iterator() { + return rules.iterator(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java new file mode 100644 index 0000000..1448a71 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; + +public class LogicalPlan { + + public static RelNode getLogicalPlan(String query, Planner planner) throws ValidationException, + RelConversionException { + SqlNode sqlNode; + + try { + sqlNode = planner.parse(query); + } catch (SqlParseException e) { + throw new RuntimeException("SQL query parsing error", e); + } + SqlNode validatedSqlNode = planner.validate(sqlNode); + + return planner.rel(validatedSqlNode).project(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java new file mode 100644 index 0000000..c18b8b5 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * This Class is intended to test functions of Apache Calcite + */ +public class Query { + + private static final Logger LOG = LoggerFactory.getLogger(Query.class); + private final Planner queryPlanner; + + public Query(SchemaPlus schema) { + + final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); + + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder() + .setLex(Lex.MYSQL) + .build()) + .defaultSchema(schema) + .traitDefs(traitDefs) + .context(Contexts.EMPTY_CONTEXT) + .ruleSets(RuleSets.ofList()) + .costFactory(null) + .typeSystem(RelDataTypeSystem.DEFAULT) + .build(); + this.queryPlanner = Frameworks.getPlanner(calciteFrameworkConfig); + } + + public RelNode getLogicalPlan(String query) throws ValidationException, RelConversionException { + SqlNode sqlNode = null; + try { + sqlNode = queryPlanner.parse(query); + } catch (SqlParseException e) { + LOG.error(e.getMessage()); + } + + SqlNode validatedSqlNode = queryPlanner.validate(sqlNode); + return queryPlanner.rel(validatedSqlNode).project(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java new file mode 100644 index 0000000..10f9cbd --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.adapter.java.ReflectiveSchema; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.*; +import org.apache.gearpump.sql.utils.CalciteFrameworkConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class StreamQueryPlanner { + private static final Logger LOG = LoggerFactory.getLogger(StreamQueryPlanner.class); + + public static void main(String[] args) throws ClassNotFoundException, + SQLException, ValidationException, RelConversionException { + + Class.forName("org.apache.calcite.jdbc.Driver"); + Connection connection = DriverManager.getConnection("jdbc:calcite:"); + CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + SchemaPlus rootSchema = calciteConnection.getRootSchema(); + rootSchema.add("t", new ReflectiveSchema(new Transactions())); + + FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema); + Planner planner = Frameworks.getPlanner(frameworkConfig); + + String query = "select t.orders.id, name, max(quantity)*0.5 from t.orders, t.products " + + "where t.orders.id = t.products.id group by t.orders.id, name " + + "having sum(quantity) > 5 order by sum(quantity) "; + + RelNode logicalPlan = LogicalPlan.getLogicalPlan(query, planner); + LOG.info("Relational Expression:- \n\n" + RelOptUtil.toString(logicalPlan)); + + } + + public static class Transactions { + + public final Order[] orders = { + new Order("001", 3), + new Order("002", 5), + new Order("003", 8), + new Order("004", 15), + }; + + public final Product[] products = { + new Product("001", "Book"), + new Product("002", "Pen"), + new Product("003", "Pencil"), + new Product("004", "Ruler"), + }; + } + + public static class Order { + public final String id; + public final int quantity; + + public Order(String id, int quantity) { + this.id = id; + this.quantity = quantity; + } + } + + public static class Product { + public final String id; + public final String name; + + public Product(String id, String name) { + this.id = id; + this.name = name; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java new file mode 100644 index 0000000..46dc15b --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.apache.gearpump.sql.table.SampleString; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction; +import org.apache.gearpump.streaming.dsl.window.api.Trigger; +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.time.Duration; +import java.util.List; + +public class GearAggregationRel extends Aggregate implements GearRelNode { + + private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRel.class); + private int windowFieldIdx = -1; + private WindowFunction windowFn; + private Trigger trigger; + private Duration allowedLatence = Duration.ZERO; + + public GearAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, + boolean indicator, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + } + + @Override + public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator, + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, + List<AggregateCall> aggCalls) { + return null; + } + + public RelWriter explainTerms(RelWriter pw) { + pw.item("group", groupSet) + .itemIf("window", windowFn, windowFn != null) + .itemIf("trigger", trigger, trigger != null) + .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1) + .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE) + .itemIf("indicator", indicator, indicator) + .itemIf("aggs", aggCalls, pw.nest()); + if (!pw.nest()) { + for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) { + pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e); + } + } + return pw; + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, + JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + LOG.debug("Adding Map"); + JavaStream<Tuple2<String, Integer>> ones = SampleString.WORDS.map(new Ones(), "map"); + + LOG.debug("Adding GroupBy"); + JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(), + 1, "groupBy"); +// groupedOnes.log(); + LOG.debug("Adding Reduce"); + JavaStream<Tuple2<String, Integer>> wordCount = groupedOnes.reduce(new Count(), "reduce"); + wordCount.log(); + + return wordCount; + } + + private static class Ones extends MapFunction<String, Tuple2<String, Integer>> { + @Override + public Tuple2<String, Integer> map(String s) { + return new Tuple2<>(s, 1); + } + } + + private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> { + @Override + public String groupBy(Tuple2<String, Integer> tuple) { + return tuple._1(); + } + } + + private static class Count extends ReduceFunction<Tuple2<String, Integer>> { + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { + return new Tuple2<>(t1._1(), t1._2() + t2._2()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java new file mode 100644 index 0000000..53a07d9 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +public class GearFilterRel extends Filter implements GearRelNode { + + public GearFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, + RexNode condition) { + super(cluster, traits, child, condition); + } + + @Override + public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new GearFilterRel(getCluster(), traitSet, input, condition); + } + + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java new file mode 100644 index 0000000..d4c55fb --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.DefaultMessage; +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.sql.table.SampleString; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; +import org.apache.gearpump.streaming.source.DataSource; +import org.apache.gearpump.streaming.source.Watermark; +import org.apache.gearpump.streaming.task.TaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Iterator; + +public class GearFlatMapRel extends Filter implements GearRelNode { + + private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRel.class); + + public GearFlatMapRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) { + super(cluster, traits, child, condition); + } + + public GearFlatMapRel() { + super(null, null, null, null); + } + + @Override + public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new GearFlatMapRel(getCluster(), traitSet, input, condition); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, + JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + LOG.debug("Adding Source"); + JavaStream<String> sentence = app.source(new StringSource(SampleString.Stream.getKV()), + 1, UserConfig.empty(), "source"); + LOG.debug("Adding flatMap"); + SampleString.WORDS = sentence.flatMap(new Split(), "flatMap"); + return null; + } + + private static class StringSource implements DataSource { + private final String str; + private boolean hasNext = true; + + StringSource(String str) { + this.str = str; + } + + @Override + public void open(TaskContext context, Instant startTime) { + } + + @Override + public Message read() { + Message msg = new DefaultMessage(str, Instant.now()); + hasNext = false; + return msg; + } + + @Override + public void close() { + } + + @Override + public Instant getWatermark() { + if (hasNext) { + return Instant.now(); + } else { + return Watermark.MAX(); + } + } + } + + private static class Split extends FlatMapFunction<String, String> { + @Override + public Iterator<String> flatMap(String s) { + return Arrays.asList(s.split("\\s+")).iterator(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java new file mode 100644 index 0000000..1d61baf --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearIOSinkRel extends TableModify implements GearRelNode { + public GearIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, + Prepare.CatalogReader catalogReader, RelNode child, Operation operation, + List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { + super(cluster, traits, table, catalogReader, child, operation, updateColumnList, + sourceExpressionList, flattened); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new GearIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), + getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java new file mode 100644 index 0000000..6641c35 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.TableScan; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +public class GearIOSourceRel extends TableScan implements GearRelNode { + + public GearIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + super(cluster, traitSet, table); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java new file mode 100644 index 0000000..6888a26 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.SetOp; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearIntersectRel extends Intersect implements GearRelNode { + private GearSetOperatorRelBase delegate; + + public GearIntersectRel( + RelOptCluster cluster, + RelTraitSet traits, + List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new GearSetOperatorRelBase(this, + GearSetOperatorRelBase.OpType.INTERSECT, inputs, all); + } + + @Override + public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new GearIntersectRel(getCluster(), traitSet, inputs, all); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java new file mode 100644 index 0000000..1b168fb --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class GearJoinRel extends Join implements GearRelNode { + public GearJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) { + super(cluster, traits, left, right, condition, variablesSet, joinType); + } + + @Override + public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, + RelNode right, JoinRelType joinType, boolean semiJoinDone) { + return new GearJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet, + joinType); + } + + private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) { + // it's a CROSS JOIN because: condition == true + if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) { + throw new UnsupportedOperationException("CROSS JOIN is not supported!"); + } + + RexCall call = (RexCall) condition; + List<Pair<Integer, Integer>> pairs = new ArrayList<>(); + if ("AND".equals(call.getOperator().getName())) { + List<RexNode> operands = call.getOperands(); + for (RexNode rexNode : operands) { + Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount); + pairs.add(pair); + } + } else if ("=".equals(call.getOperator().getName())) { + pairs.add(extractOneJoinColumn(call, leftRowColumnCount)); + } else { + throw new UnsupportedOperationException( + "Operator " + call.getOperator().getName() + " is not supported in join condition"); + } + + return pairs; + } + + private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition, + int leftRowColumnCount) { + List<RexNode> operands = oneCondition.getOperands(); + final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + + final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + final int rightIndex = rightIndex1 - leftRowColumnCount; + + return new Pair<>(leftIndex, rightIndex); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java new file mode 100644 index 0000000..ced38c3 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.*; + +public enum GearLogicalConvention implements Convention { + INSTANCE; + + @Override + public Class getInterface() { + return GearRelNode.class; + } + + @Override + public String getName() { + return "GEAR_LOGICAL"; + } + + @Override + public RelTraitDef getTraitDef() { + return ConventionTraitDef.INSTANCE; + } + + @Override + public boolean satisfies(RelTrait trait) { + return this == trait; + } + + @Override + public void register(RelOptPlanner planner) { + } + + @Override + public String toString() { + return getName(); + } + + @Override + public boolean canConvertConvention(Convention toConvention) { + return false; + } + + @Override + public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java new file mode 100644 index 0000000..1ca972a --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.SetOp; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearMinusRel extends Minus implements GearRelNode { + + private GearSetOperatorRelBase delegate; + + public GearMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.MINUS, inputs, all); + } + + @Override + public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new GearMinusRel(getCluster(), traitSet, inputs, all); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java new file mode 100644 index 0000000..f09dc8c --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearProjectRel extends Project implements GearRelNode { + + public GearProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, + List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, + RelDataType rowType) { + return new GearProjectRel(getCluster(), traitSet, input, projects, rowType); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java new file mode 100644 index 0000000..042c767 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.rel.RelNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +public interface GearRelNode extends RelNode { + + JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java new file mode 100644 index 0000000..ee59753 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.rel.RelNode; + +import java.io.Serializable; +import java.util.List; + +public class GearSetOperatorRelBase { + + public enum OpType implements Serializable { + UNION, + INTERSECT, + MINUS + } + + private GearRelNode gearRelNode; + private List<RelNode> inputs; + private boolean all; + private OpType opType; + + public GearSetOperatorRelBase(GearRelNode gearRelNode, OpType opType, + List<RelNode> inputs, boolean all) { + this.gearRelNode = gearRelNode; + this.opType = opType; + this.inputs = inputs; + this.all = all; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java new file mode 100644 index 0000000..f70481b --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationImpl; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +public class GearSortRel extends Sort implements GearRelNode { + + private List<Integer> fieldIndices = new ArrayList<>(); + private List<Boolean> orientation = new ArrayList<>(); + private List<Boolean> nullsFirst = new ArrayList<>(); + + private int startIndex = 0; + private int count; + + public GearSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, + RexNode offset, RexNode fetch) { + super(cluster, traits, child, collation, offset, fetch); + + List<RexNode> fieldExps = getChildExps(); + RelCollationImpl collationImpl = (RelCollationImpl) collation; + List<RelFieldCollation> collations = collationImpl.getFieldCollations(); + for (int i = 0; i < fieldExps.size(); i++) { + RexNode fieldExp = fieldExps.get(i); + RexInputRef inputRef = (RexInputRef) fieldExp; + fieldIndices.add(inputRef.getIndex()); + orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING); + + RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection; + if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) { + rawNullDirection = collations.get(i).getDirection().defaultNullDirection(); + } + nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST); + } + + if (fetch == null) { + throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!"); + } + + RexLiteral fetchLiteral = (RexLiteral) fetch; + count = ((BigDecimal) fetchLiteral.getValue()).intValue(); + + if (offset != null) { + RexLiteral offsetLiteral = (RexLiteral) offset; + startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue(); + } + } + + @Override + public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, + RexNode offset, RexNode fetch) { + return new GearSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch); + } + + public static <T extends Number & Comparable> int numberCompare(T a, T b) { + return a.compareTo(b); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java new file mode 100644 index 0000000..54a6bbb --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlExplainLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + +class GearSqlRelUtils { + private static final Logger LOG = LoggerFactory.getLogger(GearSqlRelUtils.class); + + private static final AtomicInteger sequence = new AtomicInteger(0); + private static final AtomicInteger classSequence = new AtomicInteger(0); + + public static String getStageName(GearRelNode relNode) { + return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + + sequence.getAndIncrement(); + } + + public static String getClassName(GearRelNode relNode) { + return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + + "_" + classSequence.getAndIncrement(); + } + + public static GearRelNode getGearRelInput(RelNode input) { + if (input instanceof RelSubset) { + // go with known best input + input = ((RelSubset) input).getBest(); + } + return (GearRelNode) input; + } + + public static String explain(final RelNode rel) { + return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); + } + + public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { + String explain = ""; + try { + explain = RelOptUtil.toString(rel); + } catch (StackOverflowError e) { + LOG.error("StackOverflowError occurred while extracting plan. " + + "Please report it to the dev@ mailing list."); + LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); + LOG.error("Forcing plan to empty string and continue... " + + "SQL Runner may not working properly after."); + } + return explain; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java new file mode 100644 index 0000000..431368d --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.Union; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearUnionRel extends Union implements GearRelNode { + + private GearSetOperatorRelBase delegate; + + public GearUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) { + super(cluster, traits, inputs, all); + this.delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.UNION, inputs, all); + } + + public GearUnionRel(RelInput input) { + super(input); + } + + @Override + public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new GearUnionRel(getCluster(), traitSet, inputs, all); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java new file mode 100644 index 0000000..6bd9403 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +public class GearValuesRel extends Values implements GearRelNode { + + public GearValuesRel(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples, + RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java new file mode 100644 index 0000000..c1b1602 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.*; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.gearpump.sql.rel.GearAggregationRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.utils.GearConfiguration; +import org.apache.gearpump.streaming.dsl.window.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.GregorianCalendar; +import java.util.List; + +public class GearAggregationRule extends RelOptRule { + + private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRule.class); + public static final GearAggregationRule INSTANCE = + new GearAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER); + + public GearAggregationRule(Class<? extends Aggregate> aggregateClass, + Class<? extends Project> projectClass, + RelBuilderFactory relBuilderFactory) { + super(operand(aggregateClass, operand(projectClass, any())), relBuilderFactory, null); + } + + public GearAggregationRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final Aggregate aggregate = call.rel(0); + final Project project = call.rel(1); + updateWindowTrigger(call, aggregate, project); + } + + private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, Project project) { + ImmutableBitSet groupByFields = aggregate.getGroupSet(); + List<RexNode> projectMapping = project.getProjects(); + + WindowFunction windowFn = new GlobalWindowFunction(); + Trigger triggerFn; + int windowFieldIdx = -1; + Duration allowedLatence = Duration.ZERO; + + for (int groupField : groupByFields.asList()) { + RexNode projNode = projectMapping.get(groupField); + if (projNode instanceof RexCall) { + SqlOperator op = ((RexCall) projNode).op; + ImmutableList<RexNode> parameters = ((RexCall) projNode).operands; + String functionName = op.getName(); + switch (functionName) { + case "TUMBLE": + windowFieldIdx = groupField; + windowFn = (WindowFunction) FixedWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis())); + } + break; + case "HOP": + windowFieldIdx = groupField; + windowFn = (WindowFunction) SlidingWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))), Duration.ofMillis(getWindowParameterAsMillis(parameters.get(2)))); + + if (parameters.size() == 4) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis())); + } + break; + case "SESSION": + windowFieldIdx = groupField; + windowFn = (WindowFunction) SessionWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis())); + } + break; + default: + break; + } + } + } + + try { + GearAggregationRel gearRel = new GearAggregationRel(aggregate.getCluster(), + aggregate.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(aggregate.getInput(), + aggregate.getInput().getTraitSet().replace(GearLogicalConvention.INSTANCE)), + aggregate.indicator, + aggregate.getGroupSet(), + aggregate.getGroupSets(), + aggregate.getAggCallList()); + gearRel.buildGearPipeline(GearConfiguration.app, null); + GearConfiguration.app.submit().waitUntilFinish(); + } catch (Exception e) { + LOG.error(e.getMessage()); + } + + } + + private Trigger createTriggerWithDelay(GregorianCalendar delayTime) { + return null; + } + + private long getWindowParameterAsMillis(RexNode parameterNode) { + if (parameterNode instanceof RexLiteral) { + return RexLiteral.intValue(parameterNode); + } else { + throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java new file mode 100644 index 0000000..4817ee8 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.gearpump.sql.rel.GearFilterRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +public class GearFilterRule extends ConverterRule { + + public static final GearFilterRule INSTANCE = new GearFilterRule(); + + private GearFilterRule() { + super(LogicalFilter.class, Convention.NONE, GearLogicalConvention.INSTANCE, "GearFilterRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Filter filter = (Filter) rel; + final RelNode input = filter.getInput(); + + GearFilterRel gearRel = new GearFilterRel(filter.getCluster(), + filter.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)), + filter.getCondition()); + return gearRel; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java new file mode 100644 index 0000000..e81f948 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearFlatMapRel; +import org.apache.gearpump.sql.utils.GearConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GearFlatMapRule extends ConverterRule { + + private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRule.class); + public static final GearFlatMapRule INSTANCE = new GearFlatMapRule(Aggregate.class, Convention.NONE); + + public GearFlatMapRule(Class<? extends Aggregate> aggregateClass, RelTrait projectIn) { + super(aggregateClass, projectIn, GearLogicalConvention.INSTANCE, "GearFlatMapRule"); + } + + @Override + public RelNode convert(RelNode rel) { + try { + GearFlatMapRel flatRel = new GearFlatMapRel(); + flatRel.buildGearPipeline(GearConfiguration.app, null); + } catch (Exception e) { + LOG.error(e.getMessage()); + } + return null; + } + +}
