[GEARPUMP-217] Add Gearpump rel, rule and examples. Author: Buddhi Ayesha <[email protected]> Author: Buddhi Rathnayaka <[email protected]>
Closes #217 from buddhiayesha2015/upstream_sql. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/54686e0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/54686e0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/54686e0e Branch: refs/heads/sql Commit: 54686e0e28efcca4901fe06589d2736daad2e606 Parents: e04df0d Author: Buddhi Ayesha <[email protected]> Authored: Sat Aug 19 08:24:49 2017 +0800 Committer: manuzhang <[email protected]> Committed: Sat Aug 19 08:25:49 2017 +0800 ---------------------------------------------------------------------- .../examples/wordcountjava/WordCountSpec.scala | 13 +- experiments/sql/README.md | 10 +- .../main/java/org/apache/calcite/SQLNode.java | 112 ------ .../org/apache/calcite/planner/Connection.java | 294 -------------- .../org/apache/calcite/planner/LogicalPlan.java | 43 -- .../java/org/apache/calcite/planner/Query.java | 79 ---- .../calcite/planner/StreamQueryPlanner.java | 91 ----- .../calcite/table/TransactionsTableFactory.java | 88 ---- .../utils/CalciteFrameworkConfiguration.java | 58 --- .../calcite/validator/CalciteSqlValidator.java | 50 --- .../java/org/apache/gearpump/sql/SQLNode.java | 127 ++++++ .../apache/gearpump/sql/planner/Connection.java | 292 ++++++++++++++ .../sql/planner/GearRelDataTypeSystem.java | 41 ++ .../gearpump/sql/planner/GearRuleSets.java | 60 +++ .../gearpump/sql/planner/LogicalPlan.java | 43 ++ .../org/apache/gearpump/sql/planner/Query.java | 80 ++++ .../sql/planner/StreamQueryPlanner.java | 96 +++++ .../gearpump/sql/rel/GearAggregationRel.java | 120 ++++++ .../apache/gearpump/sql/rel/GearFilterRel.java | 47 +++ .../apache/gearpump/sql/rel/GearFlatMapRel.java | 112 ++++++ .../apache/gearpump/sql/rel/GearIOSinkRel.java | 52 +++ .../gearpump/sql/rel/GearIOSourceRel.java | 39 ++ .../gearpump/sql/rel/GearIntersectRel.java | 54 +++ .../apache/gearpump/sql/rel/GearJoinRel.java | 94 +++++ .../gearpump/sql/rel/GearLogicalConvention.java | 65 +++ .../apache/gearpump/sql/rel/GearMinusRel.java | 51 +++ .../apache/gearpump/sql/rel/GearProjectRel.java | 50 +++ .../apache/gearpump/sql/rel/GearRelNode.java | 30 ++ .../sql/rel/GearSetOperatorRelBase.java | 47 +++ .../apache/gearpump/sql/rel/GearSortRel.java | 95 +++++ .../gearpump/sql/rel/GearSqlRelUtils.java | 71 ++++ .../apache/gearpump/sql/rel/GearUnionRel.java | 55 +++ .../apache/gearpump/sql/rel/GearValuesRel.java | 42 ++ .../gearpump/sql/rule/GearAggregationRule.java | 147 +++++++ .../gearpump/sql/rule/GearFilterRule.java | 48 +++ .../gearpump/sql/rule/GearFlatMapRule.java | 52 +++ .../gearpump/sql/rule/GearIOSinkRule.java | 79 ++++ .../gearpump/sql/rule/GearIOSourceRule.java | 46 +++ .../gearpump/sql/rule/GearIntersectRule.java | 51 +++ .../apache/gearpump/sql/rule/GearJoinRule.java | 53 +++ .../apache/gearpump/sql/rule/GearMinusRule.java | 51 +++ .../gearpump/sql/rule/GearProjectRule.java | 48 +++ .../apache/gearpump/sql/rule/GearSortRule.java | 51 +++ .../apache/gearpump/sql/rule/GearUnionRule.java | 49 +++ .../gearpump/sql/rule/GearValuesRule.java | 48 +++ .../apache/gearpump/sql/table/SampleString.java | 45 +++ .../gearpump/sql/table/SampleTransactions.java | 60 +++ .../sql/table/TransactionsTableFactory.java | 88 ++++ .../utils/CalciteFrameworkConfiguration.java | 58 +++ .../gearpump/sql/utils/GearConfiguration.java | 49 +++ .../sql/validator/CalciteSqlValidator.java | 50 +++ .../gearpump/experiments/sql/Connection.scala | 3 - .../org/apache/calcite/planner/CalciteTest.java | 323 --------------- .../org/apache/calcite/planner/QueryTest.java | 64 --- .../gearpump/sql/example/SqlWordCountTest.java | 125 ++++++ .../gearpump/sql/planner/CalciteTest.java | 397 +++++++++++++++++++ .../apache/gearpump/sql/planner/QueryTest.java | 65 +++ experiments/sql/src/test/resources/model.json | 2 +- 58 files changed, 3438 insertions(+), 1215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala index 3736c86..7df8651 100644 --- a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala +++ b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala @@ -18,16 +18,15 @@ package org.apache.gearpump.streaming.examples.wordcountjava -import scala.concurrent.Future -import scala.util.Success - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult import org.apache.gearpump.cluster.{MasterHarness, TestUtil} -import org.apache.gearpump.streaming.examples.wordcountjava.WordCount +import org.apache.gearpump.streaming.examples.wordcountjava.dsl.WordCount +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import scala.concurrent.Future +import scala.util.Success class WordCountSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/README.md ---------------------------------------------------------------------- diff --git a/experiments/sql/README.md b/experiments/sql/README.md index 6e9d490..9880dff 100644 --- a/experiments/sql/README.md +++ b/experiments/sql/README.md @@ -1,2 +1,8 @@ -SQL Support -=========== \ No newline at end of file +# SQL Support +This project is about building a SQL layer with Apache Calcite to help those who are unfamiliar with Scala/Java to use Gearpump. + +## Build +- Build [GearPump SQL](https://github.com/buddhiayesha2015/incubator-gearpump/tree/sql/experiments/sql) + +## Test +- Run [SQL WordCount example](https://github.com/buddhiayesha2015/incubator-gearpump/blob/sql/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java b/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java deleted file mode 100644 index c092bf0..0000000 --- a/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java +++ /dev/null @@ -1,112 +0,0 @@ -package org.apache.calcite; - -import org.apache.calcite.avatica.util.Casing; -import org.apache.calcite.avatica.util.Quoting; -import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.fun.SqlCase; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.sql.parser.impl.SqlParserImpl; -import org.apache.calcite.sql.validate.SqlConformanceEnum; -import org.apache.calcite.sql.validate.SqlValidator; -import org.apache.calcite.sql.validate.SqlValidatorScope; -import org.apache.calcite.util.Util; -import org.junit.ComparisonFailure; - -import java.util.regex.Pattern; - -/** - * Created by Buddhi on 6/8/2017. - */ -public class SQLNode { - - private static final Pattern LINE_BREAK_PATTERN = Pattern.compile("\r\n|\r|\n"); - - private static final Pattern TAB_PATTERN = Pattern.compile("\t"); - - private static final String LINE_BREAK = "\\\\n\"" + Util.LINE_SEPARATOR + " + \""; - - private static final ThreadLocal<boolean[]> LINUXIFY = new ThreadLocal<boolean[]>() { - @Override - protected boolean[] initialValue() { - return new boolean[]{true}; - } - }; - - - protected SqlParser getSqlParser(String sql) { - return SqlParser.create(sql, - SqlParser.configBuilder() - .setParserFactory(SqlParserImpl.FACTORY) - .setQuoting(Quoting.DOUBLE_QUOTE) - .setUnquotedCasing(Casing.TO_UPPER) - .setQuotedCasing(Casing.UNCHANGED) - .setConformance(SqlConformanceEnum.DEFAULT) - .build()); - } - - public static String toJavaString(String s) { - s = Util.replace(s, "\"", "\\\""); - s = LINE_BREAK_PATTERN.matcher(s).replaceAll(LINE_BREAK); - s = TAB_PATTERN.matcher(s).replaceAll("\\\\t"); - s = "\"" + s + "\""; - String spurious = "\n \\+ \"\""; - if (s.endsWith(spurious)) { - s = s.substring(0, s.length() - spurious.length()); - } - return s; - } - - public static void assertEqualsVerbose(String expected, String actual) { - if (actual == null) { - if (expected == null) { - return; - } else { - String message = "Expected:\n" + expected + "\nActual: null"; - throw new ComparisonFailure(message, expected, null); - } - } - if ((expected != null) && expected.equals(actual)) { - return; - } - String s = toJavaString(actual); - String message = "Expected:\n" + expected + "\nActual:\n" + - actual + "\nActual java:\n" + s + '\n'; - - throw new ComparisonFailure(message, expected, actual); - } - - public void check(String sql, String expected) { - final SqlNode sqlNode; - try { - sqlNode = getSqlParser(sql).parseStmt(); - } catch (SqlParseException e) { - throw new RuntimeException("Error while parsing SQL: " + sql, e); - } - - String actual = sqlNode.toSqlString(null, true).getSql(); - if (LINUXIFY.get()[0]) { - actual = Util.toLinux(actual); - } - assertEqualsVerbose(expected, actual); - } - - public void validateCall(SqlCall call, SqlValidator validator, SqlValidatorScope operandScope) { - SqlCase sqlCase = (SqlCase) call; - SqlNodeList whenOperands = sqlCase.getWhenOperands(); - SqlNodeList thenOperands = sqlCase.getThenOperands(); - SqlNode elseOperand = sqlCase.getElseOperand(); - for (SqlNode operand : whenOperands) { - operand.validateExpr(validator, operandScope); - } - for (SqlNode operand : thenOperands) { - operand.validateExpr(validator, operandScope); - } - if (elseOperand != null) { - elseOperand.validateExpr(validator, operandScope); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java b/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java deleted file mode 100644 index 1b8648d..0000000 --- a/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.calcite.planner; - -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.config.CalciteConnectionConfig; -import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.linq4j.Queryable; -import org.apache.calcite.linq4j.tree.Expression; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.tools.Frameworks; -import org.apache.log4j.Logger; - -import java.lang.reflect.Type; -import java.sql.*; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Executor; - -public class Connection implements CalciteConnection { - - private final static Logger logger = Logger.getLogger(Connection.class); - private final SchemaPlus rootSchema = Frameworks.createRootSchema(true); - private String schema = null; - - public SchemaPlus getRootSchema() { - return rootSchema; - } - - public JavaTypeFactory getTypeFactory() { - return null; - } - - public Properties getProperties() { - return null; - } - - public Statement createStatement() throws SQLException { - return null; - } - - public PreparedStatement prepareStatement(String sql) throws SQLException { - return null; - } - - public CallableStatement prepareCall(String sql) throws SQLException { - return null; - } - - public String nativeSQL(String sql) throws SQLException { - return null; - } - - public void setAutoCommit(boolean autoCommit) throws SQLException { - - } - - public boolean getAutoCommit() throws SQLException { - return false; - } - - public void commit() throws SQLException { - - } - - public void rollback() throws SQLException { - - } - - public void close() throws SQLException { - - } - - public boolean isClosed() throws SQLException { - return false; - } - - public DatabaseMetaData getMetaData() throws SQLException { - return null; - } - - public void setReadOnly(boolean readOnly) throws SQLException { - - } - - public boolean isReadOnly() throws SQLException { - return false; - } - - public void setCatalog(String catalog) throws SQLException { - - } - - public String getCatalog() throws SQLException { - return null; - } - - public void setTransactionIsolation(int level) throws SQLException { - - } - - public int getTransactionIsolation() throws SQLException { - return 0; - } - - public SQLWarning getWarnings() throws SQLException { - return null; - } - - public void clearWarnings() throws SQLException { - - } - - public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { - return null; - } - - public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { - return null; - } - - public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { - return null; - } - - public Map<String, Class<?>> getTypeMap() throws SQLException { - return null; - } - - public void setTypeMap(Map<String, Class<?>> map) throws SQLException { - - } - - public void setHoldability(int holdability) throws SQLException { - - } - - public int getHoldability() throws SQLException { - return 0; - } - - public Savepoint setSavepoint() throws SQLException { - return null; - } - - public Savepoint setSavepoint(String name) throws SQLException { - return null; - } - - public void rollback(Savepoint savepoint) throws SQLException { - - } - - public void releaseSavepoint(Savepoint savepoint) throws SQLException { - - } - - public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { - return null; - } - - public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { - return null; - } - - public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { - return null; - } - - public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { - return null; - } - - public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { - return null; - } - - public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { - return null; - } - - public Clob createClob() throws SQLException { - return null; - } - - public Blob createBlob() throws SQLException { - return null; - } - - public NClob createNClob() throws SQLException { - return null; - } - - public SQLXML createSQLXML() throws SQLException { - return null; - } - - public boolean isValid(int timeout) throws SQLException { - return false; - } - - public void setClientInfo(String name, String value) throws SQLClientInfoException { - - } - - public void setClientInfo(Properties properties) throws SQLClientInfoException { - - } - - public String getClientInfo(String name) throws SQLException { - return null; - } - - public Properties getClientInfo() throws SQLException { - return null; - } - - public Array createArrayOf(String typeName, Object[] elements) throws SQLException { - return null; - } - - public Struct createStruct(String typeName, Object[] attributes) throws SQLException { - return null; - } - - public void setSchema(String s) throws SQLException { - schema = s; - } - - public String getSchema() throws SQLException { - return schema; - } - - public void abort(Executor executor) throws SQLException { - - } - - public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { - - } - - public int getNetworkTimeout() throws SQLException { - return 0; - } - - public CalciteConnectionConfig config() { - return null; - } - - public <T> T unwrap(Class<T> iface) throws SQLException { - return null; - } - - public boolean isWrapperFor(Class<?> iface) throws SQLException { - return false; - } - - public <T> Queryable<T> createQuery(Expression expression, Class<T> aClass) { - return null; - } - - public <T> Queryable<T> createQuery(Expression expression, Type type) { - return null; - } - - public <T> T execute(Expression expression, Class<T> aClass) { - return null; - } - - public <T> T execute(Expression expression, Type type) { - return null; - } - - public <T> Enumerator<T> executeQuery(Queryable<T> queryable) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java b/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java deleted file mode 100644 index 516ebe1..0000000 --- a/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.calcite.planner; - -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.tools.Planner; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; - -public class LogicalPlan { - - public static RelNode getLogicalPlan(String query, Planner planner) throws ValidationException, - RelConversionException { - SqlNode sqlNode; - - try { - sqlNode = planner.parse(query); - } catch (SqlParseException e) { - throw new RuntimeException("SQL query parsing error", e); - } - SqlNode validatedSqlNode = planner.validate(sqlNode); - - return planner.rel(validatedSqlNode).project(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java b/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java deleted file mode 100644 index 9008a16..0000000 --- a/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.calcite.planner; - -import org.apache.calcite.config.Lex; -import org.apache.calcite.plan.Contexts; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.rel.RelCollationTraitDef; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.tools.*; -import org.apache.log4j.Logger; - -import java.util.ArrayList; -import java.util.List; - -/** - * This Class is intended to test functions of Apache Calcite - */ -public class Query { - - private final static Logger logger = Logger.getLogger(Query.class); - private final Planner queryPlanner; - - public Query(SchemaPlus schema) { - - final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); - - traitDefs.add(ConventionTraitDef.INSTANCE); - traitDefs.add(RelCollationTraitDef.INSTANCE); - - FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder() - .parserConfig(SqlParser.configBuilder() - .setLex(Lex.MYSQL) - .build()) - .defaultSchema(schema) - .traitDefs(traitDefs) - .context(Contexts.EMPTY_CONTEXT) - .ruleSets(RuleSets.ofList()) - .costFactory(null) - .typeSystem(RelDataTypeSystem.DEFAULT) - .build(); - this.queryPlanner = Frameworks.getPlanner(calciteFrameworkConfig); - } - - public RelNode getLogicalPlan(String query) throws ValidationException, RelConversionException { - SqlNode sqlNode = null; - try { - sqlNode = queryPlanner.parse(query); - } catch (SqlParseException e) { - logger.error("SQL Parse Exception", e); - } - - SqlNode validatedSqlNode = queryPlanner.validate(sqlNode); - return queryPlanner.rel(validatedSqlNode).project(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java b/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java deleted file mode 100644 index 0603827..0000000 --- a/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.calcite.planner; - -import org.apache.calcite.adapter.java.ReflectiveSchema; -import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.tools.*; -import org.apache.calcite.utils.CalciteFrameworkConfiguration; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -public class StreamQueryPlanner { - - public static void main(String[] args) throws ClassNotFoundException, SQLException, ValidationException, RelConversionException { - - Class.forName("org.apache.calcite.jdbc.Driver"); - Connection connection = DriverManager.getConnection("jdbc:calcite:"); - CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); - SchemaPlus rootSchema = calciteConnection.getRootSchema(); - rootSchema.add("t", new ReflectiveSchema(new Transactions())); - - FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema); - Planner planner = Frameworks.getPlanner(frameworkConfig); - - String query = "select t.orders.id, name, max(quantity)*0.5 from t.orders, t.products " - + "where t.orders.id = t.products.id group by t.orders.id, name " - + "having sum(quantity) > 5 order by sum(quantity) "; - - RelNode logicalPlan = LogicalPlan.getLogicalPlan(query, planner); - System.out.println(RelOptUtil.toString(logicalPlan)); - } - - public static class Transactions { - - public final Order[] orders = { - new Order("001", 3), - new Order("002", 5), - new Order("003", 8), - new Order("004", 15), - }; - - public final Product[] products = { - new Product("001", "Book"), - new Product("002", "Pen"), - new Product("003", "Pencil"), - new Product("004", "Ruler"), - }; - } - - public static class Order { - public final String id; - public final int quantity; - - public Order(String id, int quantity) { - this.id = id; - this.quantity = quantity; - } - } - - public static class Product { - public final String id; - public final String name; - - public Product(String id, String name) { - this.id = id; - this.name = name; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java b/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java deleted file mode 100644 index 42c2f92..0000000 --- a/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.calcite.table; - -import com.google.common.collect.ImmutableList; -import org.apache.calcite.DataContext; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Linq4j; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.schema.*; -import org.apache.calcite.sql.type.SqlTypeName; - -import java.util.Map; - -public class TransactionsTableFactory implements TableFactory<Table> { - - @Override - public Table create(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) { - final Object[][] rows = { - {100, "I001", "item1", 3}, - {101, "I002", "item2", 5}, - {102, "I003", "item3", 8}, - {103, "I004", "item4", 33}, - {104, "I005", "item5", 23} - }; - - return new TransactionsTable(ImmutableList.copyOf(rows)); - } - - public static class TransactionsTable implements ScannableTable { - - protected final RelProtoDataType protoRowType = new RelProtoDataType() { - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder() - .add("timeStamp", SqlTypeName.TIMESTAMP) - .add("id", SqlTypeName.VARCHAR, 10) - .add("item", SqlTypeName.VARCHAR, 50) - .add("quantity", SqlTypeName.INTEGER) - .build(); - } - }; - - private final ImmutableList<Object[]> rows; - - public TransactionsTable(ImmutableList<Object[]> rows) { - this.rows = rows; - } - - public Enumerable<Object[]> scan(DataContext root) { - return Linq4j.asEnumerable(rows); - } - - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return protoRowType.apply(typeFactory); - } - - @Override - public Statistic getStatistic() { - return Statistics.UNKNOWN; - } - - @Override - public Schema.TableType getJdbcTableType() { - return Schema.TableType.TABLE; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java b/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java deleted file mode 100644 index e6be58a..0000000 --- a/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.calcite.utils; - -import org.apache.calcite.config.Lex; -import org.apache.calcite.plan.Contexts; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.rel.RelCollationTraitDef; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.RuleSets; - -import java.util.ArrayList; -import java.util.List; - -public class CalciteFrameworkConfiguration { - - public static FrameworkConfig getDefaultconfig(SchemaPlus schema) { - final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); - - traitDefs.add(ConventionTraitDef.INSTANCE); - traitDefs.add(RelCollationTraitDef.INSTANCE); - - FrameworkConfig frameworkConfiguration = Frameworks.newConfigBuilder() - .parserConfig(SqlParser.configBuilder() - .setLex(Lex.JAVA) - .build()) - .defaultSchema(schema) - .traitDefs(traitDefs) - .context(Contexts.EMPTY_CONTEXT) - .ruleSets(RuleSets.ofList()) - .costFactory(null) - .typeSystem(RelDataTypeSystem.DEFAULT) - .build(); - - return frameworkConfiguration; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java b/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java deleted file mode 100644 index fb132e2..0000000 --- a/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.calcite.validator; - -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.prepare.CalciteCatalogReader; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.sql.SqlInsert; -import org.apache.calcite.sql.SqlOperatorTable; -import org.apache.calcite.sql.validate.SqlConformance; -import org.apache.calcite.sql.validate.SqlValidatorImpl; - -public class CalciteSqlValidator extends SqlValidatorImpl { - public CalciteSqlValidator(SqlOperatorTable opTab, - CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory, - SqlConformance conformance) { - super(opTab, catalogReader, typeFactory, conformance); - } - - @Override - protected RelDataType getLogicalSourceRowType( - RelDataType sourceRowType, SqlInsert insert) { - final RelDataType superType = - super.getLogicalSourceRowType(sourceRowType, insert); - return ((JavaTypeFactory) typeFactory).toSql(superType); - } - - @Override - protected RelDataType getLogicalTargetRowType( - RelDataType targetRowType, SqlInsert insert) { - final RelDataType superType = - super.getLogicalTargetRowType(targetRowType, insert); - return ((JavaTypeFactory) typeFactory).toSql(superType); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java new file mode 100644 index 0000000..7fcc3a1 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql; + +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.fun.SqlCase; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.impl.SqlParserImpl; +import org.apache.calcite.sql.validate.SqlConformanceEnum; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorScope; +import org.apache.calcite.util.Util; +import org.junit.ComparisonFailure; + +import java.util.regex.Pattern; + +public class SQLNode { + + private static final Pattern LINE_BREAK_PATTERN = Pattern.compile("\r\n|\r|\n"); + + private static final Pattern TAB_PATTERN = Pattern.compile("\t"); + + private static final String LINE_BREAK = "\\\\n\"" + Util.LINE_SEPARATOR + " + \""; + + private static final ThreadLocal<boolean[]> LINUXIFY = new ThreadLocal<boolean[]>() { + @Override + protected boolean[] initialValue() { + return new boolean[]{true}; + } + }; + + + protected SqlParser getSqlParser(String sql) { + return SqlParser.create(sql, + SqlParser.configBuilder() + .setParserFactory(SqlParserImpl.FACTORY) + .setQuoting(Quoting.DOUBLE_QUOTE) + .setUnquotedCasing(Casing.TO_UPPER) + .setQuotedCasing(Casing.UNCHANGED) + .setConformance(SqlConformanceEnum.DEFAULT) + .build()); + } + + public static String toJavaString(String s) { + s = Util.replace(s, "\"", "\\\""); + s = LINE_BREAK_PATTERN.matcher(s).replaceAll(LINE_BREAK); + s = TAB_PATTERN.matcher(s).replaceAll("\\\\t"); + s = "\"" + s + "\""; + String spurious = "\n \\+ \"\""; + if (s.endsWith(spurious)) { + s = s.substring(0, s.length() - spurious.length()); + } + return s; + } + + public static void assertEqualsVerbose(String expected, String actual) { + if (actual == null) { + if (expected == null) { + return; + } else { + String message = "Expected:\n" + expected + "\nActual: null"; + throw new ComparisonFailure(message, expected, null); + } + } + if ((expected != null) && expected.equals(actual)) { + return; + } + String s = toJavaString(actual); + String message = "Expected:\n" + expected + "\nActual:\n" + + actual + "\nActual java:\n" + s + '\n'; + + throw new ComparisonFailure(message, expected, actual); + } + + public void check(String sql, String expected) { + final SqlNode sqlNode; + try { + sqlNode = getSqlParser(sql).parseStmt(); + } catch (SqlParseException e) { + throw new RuntimeException("Error while parsing SQL: " + sql, e); + } + + String actual = sqlNode.toSqlString(null, true).getSql(); + if (LINUXIFY.get()[0]) { + actual = Util.toLinux(actual); + } + assertEqualsVerbose(expected, actual); + } + + public void validateCall(SqlCall call, SqlValidator validator, SqlValidatorScope operandScope) { + SqlCase sqlCase = (SqlCase) call; + SqlNodeList whenOperands = sqlCase.getWhenOperands(); + SqlNodeList thenOperands = sqlCase.getThenOperands(); + SqlNode elseOperand = sqlCase.getElseOperand(); + for (SqlNode operand : whenOperands) { + operand.validateExpr(validator, operandScope); + } + for (SqlNode operand : thenOperands) { + operand.validateExpr(validator, operandScope); + } + if (elseOperand != null) { + elseOperand.validateExpr(validator, operandScope); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java new file mode 100644 index 0000000..e5954f0 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.Frameworks; + +import java.lang.reflect.Type; +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +public class Connection implements CalciteConnection { + + private final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + private String schema = null; + + public SchemaPlus getRootSchema() { + return rootSchema; + } + + public JavaTypeFactory getTypeFactory() { + return null; + } + + public Properties getProperties() { + return null; + } + + public Statement createStatement() throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql) throws SQLException { + return null; + } + + public CallableStatement prepareCall(String sql) throws SQLException { + return null; + } + + public String nativeSQL(String sql) throws SQLException { + return null; + } + + public void setAutoCommit(boolean autoCommit) throws SQLException { + + } + + public boolean getAutoCommit() throws SQLException { + return false; + } + + public void commit() throws SQLException { + + } + + public void rollback() throws SQLException { + + } + + public void close() throws SQLException { + + } + + public boolean isClosed() throws SQLException { + return false; + } + + public DatabaseMetaData getMetaData() throws SQLException { + return null; + } + + public void setReadOnly(boolean readOnly) throws SQLException { + + } + + public boolean isReadOnly() throws SQLException { + return false; + } + + public void setCatalog(String catalog) throws SQLException { + + } + + public String getCatalog() throws SQLException { + return null; + } + + public void setTransactionIsolation(int level) throws SQLException { + + } + + public int getTransactionIsolation() throws SQLException { + return 0; + } + + public SQLWarning getWarnings() throws SQLException { + return null; + } + + public void clearWarnings() throws SQLException { + + } + + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + public Map<String, Class<?>> getTypeMap() throws SQLException { + return null; + } + + public void setTypeMap(Map<String, Class<?>> map) throws SQLException { + + } + + public void setHoldability(int holdability) throws SQLException { + + } + + public int getHoldability() throws SQLException { + return 0; + } + + public Savepoint setSavepoint() throws SQLException { + return null; + } + + public Savepoint setSavepoint(String name) throws SQLException { + return null; + } + + public void rollback(Savepoint savepoint) throws SQLException { + + } + + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + + } + + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return null; + } + + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return null; + } + + public Clob createClob() throws SQLException { + return null; + } + + public Blob createBlob() throws SQLException { + return null; + } + + public NClob createNClob() throws SQLException { + return null; + } + + public SQLXML createSQLXML() throws SQLException { + return null; + } + + public boolean isValid(int timeout) throws SQLException { + return false; + } + + public void setClientInfo(String name, String value) throws SQLClientInfoException { + + } + + public void setClientInfo(Properties properties) throws SQLClientInfoException { + + } + + public String getClientInfo(String name) throws SQLException { + return null; + } + + public Properties getClientInfo() throws SQLException { + return null; + } + + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return null; + } + + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return null; + } + + public void setSchema(String s) throws SQLException { + schema = s; + } + + public String getSchema() throws SQLException { + return schema; + } + + public void abort(Executor executor) throws SQLException { + + } + + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + + } + + public int getNetworkTimeout() throws SQLException { + return 0; + } + + public CalciteConnectionConfig config() { + return null; + } + + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } + + public <T> Queryable<T> createQuery(Expression expression, Class<T> aClass) { + return null; + } + + public <T> Queryable<T> createQuery(Expression expression, Type type) { + return null; + } + + public <T> T execute(Expression expression, Class<T> aClass) { + return null; + } + + public <T> T execute(Expression expression, Type type) { + return null; + } + + public <T> Enumerator<T> executeQuery(Queryable<T> queryable) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java new file mode 100644 index 0000000..a640e12 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; + +/** + * customized data types. + */ +public class GearRelDataTypeSystem extends RelDataTypeSystemImpl { + + public static final RelDataTypeSystem GEAR_REL_DATATYPE_SYSTEM = new GearRelDataTypeSystem(); + + @Override + public int getMaxNumericScale() { + return 38; + } + + @Override + public int getMaxNumericPrecision() { + return 38; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java new file mode 100644 index 0000000..a962ff1 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.tools.RuleSet; +import org.apache.gearpump.sql.rule.*; + +import java.util.Iterator; + +public class GearRuleSets { + private static final ImmutableSet<RelOptRule> calciteToGearConversionRules = ImmutableSet + .<RelOptRule>builder().add(GearIOSourceRule.INSTANCE, GearProjectRule.INSTANCE, + GearFilterRule.INSTANCE, GearIOSinkRule.INSTANCE, + GearAggregationRule.INSTANCE, GearSortRule.INSTANCE, GearValuesRule.INSTANCE, + GearIntersectRule.INSTANCE, GearMinusRule.INSTANCE, GearUnionRule.INSTANCE, + GearJoinRule.INSTANCE) + .build(); + + public static RuleSet[] getRuleSets() { + return new RuleSet[]{new GearRuleSet( + ImmutableSet.<RelOptRule>builder().addAll(calciteToGearConversionRules).build())}; + } + + private static class GearRuleSet implements RuleSet { + final ImmutableSet<RelOptRule> rules; + + public GearRuleSet(ImmutableSet<RelOptRule> rules) { + this.rules = rules; + } + + public GearRuleSet(ImmutableList<RelOptRule> rules) { + this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build(); + } + + @Override + public Iterator<RelOptRule> iterator() { + return rules.iterator(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java new file mode 100644 index 0000000..1448a71 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; + +public class LogicalPlan { + + public static RelNode getLogicalPlan(String query, Planner planner) throws ValidationException, + RelConversionException { + SqlNode sqlNode; + + try { + sqlNode = planner.parse(query); + } catch (SqlParseException e) { + throw new RuntimeException("SQL query parsing error", e); + } + SqlNode validatedSqlNode = planner.validate(sqlNode); + + return planner.rel(validatedSqlNode).project(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java new file mode 100644 index 0000000..c18b8b5 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * This Class is intended to test functions of Apache Calcite + */ +public class Query { + + private static final Logger LOG = LoggerFactory.getLogger(Query.class); + private final Planner queryPlanner; + + public Query(SchemaPlus schema) { + + final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); + + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder() + .setLex(Lex.MYSQL) + .build()) + .defaultSchema(schema) + .traitDefs(traitDefs) + .context(Contexts.EMPTY_CONTEXT) + .ruleSets(RuleSets.ofList()) + .costFactory(null) + .typeSystem(RelDataTypeSystem.DEFAULT) + .build(); + this.queryPlanner = Frameworks.getPlanner(calciteFrameworkConfig); + } + + public RelNode getLogicalPlan(String query) throws ValidationException, RelConversionException { + SqlNode sqlNode = null; + try { + sqlNode = queryPlanner.parse(query); + } catch (SqlParseException e) { + LOG.error(e.getMessage()); + } + + SqlNode validatedSqlNode = queryPlanner.validate(sqlNode); + return queryPlanner.rel(validatedSqlNode).project(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java new file mode 100644 index 0000000..10f9cbd --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import org.apache.calcite.adapter.java.ReflectiveSchema; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.*; +import org.apache.gearpump.sql.utils.CalciteFrameworkConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class StreamQueryPlanner { + private static final Logger LOG = LoggerFactory.getLogger(StreamQueryPlanner.class); + + public static void main(String[] args) throws ClassNotFoundException, + SQLException, ValidationException, RelConversionException { + + Class.forName("org.apache.calcite.jdbc.Driver"); + Connection connection = DriverManager.getConnection("jdbc:calcite:"); + CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + SchemaPlus rootSchema = calciteConnection.getRootSchema(); + rootSchema.add("t", new ReflectiveSchema(new Transactions())); + + FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema); + Planner planner = Frameworks.getPlanner(frameworkConfig); + + String query = "select t.orders.id, name, max(quantity)*0.5 from t.orders, t.products " + + "where t.orders.id = t.products.id group by t.orders.id, name " + + "having sum(quantity) > 5 order by sum(quantity) "; + + RelNode logicalPlan = LogicalPlan.getLogicalPlan(query, planner); + LOG.info("Relational Expression:- \n\n" + RelOptUtil.toString(logicalPlan)); + + } + + public static class Transactions { + + public final Order[] orders = { + new Order("001", 3), + new Order("002", 5), + new Order("003", 8), + new Order("004", 15), + }; + + public final Product[] products = { + new Product("001", "Book"), + new Product("002", "Pen"), + new Product("003", "Pencil"), + new Product("004", "Ruler"), + }; + } + + public static class Order { + public final String id; + public final int quantity; + + public Order(String id, int quantity) { + this.id = id; + this.quantity = quantity; + } + } + + public static class Product { + public final String id; + public final String name; + + public Product(String id, String name) { + this.id = id; + this.name = name; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java new file mode 100644 index 0000000..46dc15b --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.apache.gearpump.sql.table.SampleString; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction; +import org.apache.gearpump.streaming.dsl.window.api.Trigger; +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.time.Duration; +import java.util.List; + +public class GearAggregationRel extends Aggregate implements GearRelNode { + + private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRel.class); + private int windowFieldIdx = -1; + private WindowFunction windowFn; + private Trigger trigger; + private Duration allowedLatence = Duration.ZERO; + + public GearAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, + boolean indicator, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + } + + @Override + public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator, + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, + List<AggregateCall> aggCalls) { + return null; + } + + public RelWriter explainTerms(RelWriter pw) { + pw.item("group", groupSet) + .itemIf("window", windowFn, windowFn != null) + .itemIf("trigger", trigger, trigger != null) + .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1) + .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE) + .itemIf("indicator", indicator, indicator) + .itemIf("aggs", aggCalls, pw.nest()); + if (!pw.nest()) { + for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) { + pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e); + } + } + return pw; + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, + JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + LOG.debug("Adding Map"); + JavaStream<Tuple2<String, Integer>> ones = SampleString.WORDS.map(new Ones(), "map"); + + LOG.debug("Adding GroupBy"); + JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(), + 1, "groupBy"); +// groupedOnes.log(); + LOG.debug("Adding Reduce"); + JavaStream<Tuple2<String, Integer>> wordCount = groupedOnes.reduce(new Count(), "reduce"); + wordCount.log(); + + return wordCount; + } + + private static class Ones extends MapFunction<String, Tuple2<String, Integer>> { + @Override + public Tuple2<String, Integer> map(String s) { + return new Tuple2<>(s, 1); + } + } + + private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> { + @Override + public String groupBy(Tuple2<String, Integer> tuple) { + return tuple._1(); + } + } + + private static class Count extends ReduceFunction<Tuple2<String, Integer>> { + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { + return new Tuple2<>(t1._1(), t1._2() + t2._2()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java new file mode 100644 index 0000000..53a07d9 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +public class GearFilterRel extends Filter implements GearRelNode { + + public GearFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, + RexNode condition) { + super(cluster, traits, child, condition); + } + + @Override + public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new GearFilterRel(getCluster(), traitSet, input, condition); + } + + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java new file mode 100644 index 0000000..d4c55fb --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.DefaultMessage; +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.sql.table.SampleString; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; +import org.apache.gearpump.streaming.source.DataSource; +import org.apache.gearpump.streaming.source.Watermark; +import org.apache.gearpump.streaming.task.TaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Iterator; + +public class GearFlatMapRel extends Filter implements GearRelNode { + + private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRel.class); + + public GearFlatMapRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) { + super(cluster, traits, child, condition); + } + + public GearFlatMapRel() { + super(null, null, null, null); + } + + @Override + public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new GearFlatMapRel(getCluster(), traitSet, input, condition); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, + JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + LOG.debug("Adding Source"); + JavaStream<String> sentence = app.source(new StringSource(SampleString.Stream.getKV()), + 1, UserConfig.empty(), "source"); + LOG.debug("Adding flatMap"); + SampleString.WORDS = sentence.flatMap(new Split(), "flatMap"); + return null; + } + + private static class StringSource implements DataSource { + private final String str; + private boolean hasNext = true; + + StringSource(String str) { + this.str = str; + } + + @Override + public void open(TaskContext context, Instant startTime) { + } + + @Override + public Message read() { + Message msg = new DefaultMessage(str, Instant.now()); + hasNext = false; + return msg; + } + + @Override + public void close() { + } + + @Override + public Instant getWatermark() { + if (hasNext) { + return Instant.now(); + } else { + return Watermark.MAX(); + } + } + } + + private static class Split extends FlatMapFunction<String, String> { + @Override + public Iterator<String> flatMap(String s) { + return Arrays.asList(s.split("\\s+")).iterator(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java new file mode 100644 index 0000000..1d61baf --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearIOSinkRel extends TableModify implements GearRelNode { + public GearIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, + Prepare.CatalogReader catalogReader, RelNode child, Operation operation, + List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { + super(cluster, traits, table, catalogReader, child, operation, updateColumnList, + sourceExpressionList, flattened); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new GearIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), + getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java new file mode 100644 index 0000000..6641c35 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.TableScan; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +public class GearIOSourceRel extends TableScan implements GearRelNode { + + public GearIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + super(cluster, traitSet, table); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java new file mode 100644 index 0000000..6888a26 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rel; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.SetOp; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import scala.Tuple2; + +import java.util.List; + +public class GearIntersectRel extends Intersect implements GearRelNode { + private GearSetOperatorRelBase delegate; + + public GearIntersectRel( + RelOptCluster cluster, + RelTraitSet traits, + List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new GearSetOperatorRelBase(this, + GearSetOperatorRelBase.OpType.INTERSECT, inputs, all); + } + + @Override + public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new GearIntersectRel(getCluster(), traitSet, inputs, all); + } + + @Override + public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception { + return null; + } +}
