[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2443 ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202684941 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- Nice finding. Will fix it and also will attach source file of draw.io, but not sure where to add. I'll draw for second diagram as well. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202683106 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- The second diagram on the page also mentions Trident. I don't know if it makes sense to update, or just remove it. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202682665 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- Thanks, looks great. I think the dotted line box is missing the "StormSQL" label. Also maybe we should put the draw.io source file in the docs directory too, so we don't have to remake the diagram again if we need to change it later. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202678116 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- Just finished it. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202674112 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- Yeah right. I didn't think about that. Drawing a new one. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202653817 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- It would probably be pretty quick to duplicate with e.g. https://www.draw.io/, since it's just some boxes. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202650814 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.storm.shade.org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. Only available for Storm SQL. + * The class doesn't handle reconnection so you may not want to use this for production. --- End diff -- Okay. It's fine to keep it, just wanted to make sure we didn't want to e.g. move it to /test ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202650588 --- Diff: sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java --- @@ -47,54 +45,60 @@ * The properties are in JSON format which specifies the name of the MongoDB collection and etc. */ public class MongoDataSourcesProvider implements DataSourcesProvider { +public static final String SCHEME_NAME = "mongodb"; +public static final String VALUE_SERIALIZED_FIELD = "ser.field"; +public static final String TRIDENT_VALUE_SERIALIZED_FIELD = "trident.ser.field"; +public static final String DEFAULT_VALUE_SERIALIZED_FIELD = "tridentSerField"; +public static final String COLLECTION_NAME = "collection.name"; -private static class MongoTridentDataSource implements ISqlTridentDataSource { +private static class MongoStreamsDataSource implements ISqlStreamsDataSource { private final String url; private final Properties props; private final IOutputSerializer serializer; -private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) { +private MongoStreamsDataSource(String url, Properties props, IOutputSerializer serializer) { this.url = url; this.props = props; this.serializer = serializer; } @Override -public ITridentDataSource getProducer() { +public IRichSpout getProducer() { throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer"); } @Override -public SqlTridentConsumer getConsumer() { +public IRichBolt getConsumer() { Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config"); -String serField = props.getProperty("trident.ser.field", "tridentSerField"); -MongoMapper mapper = new TridentMongoMapper(serField, serializer); - -MongoState.Options options = new MongoState.Options() -.withUrl(url) - .withCollectionName(props.getProperty("collection.name")) -.withMapper(mapper); - -StateFactory stateFactory = new MongoStateFactory(options); -StateUpdater stateUpdater = new MongoStateUpdater(); - -return new SimpleSqlTridentConsumer(stateFactory, stateUpdater); +String serField; +if (props.contains(VALUE_SERIALIZED_FIELD)) { +serField = props.getProperty(VALUE_SERIALIZED_FIELD); +} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) { +// backward compatibility +serField = props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD); --- End diff -- Sure, that makes sense. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202650461 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java --- @@ -72,13 +72,13 @@ // merge and push unions rules UnionEliminatorRule.INSTANCE, -TridentScanRule.INSTANCE, -TridentFilterRule.INSTANCE, -TridentProjectRule.INSTANCE, -TridentAggregateRule.INSTANCE, -TridentJoinRule.INSTANCE, -TridentModifyRule.INSTANCE, -TridentCalcRule.INSTANCE +StreamsScanRule.INSTANCE, +StreamsFilterRule.INSTANCE, +StreamsProjectRule.INSTANCE, +StreamsAggregateRule.INSTANCE, --- End diff -- Makes sense. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202644352 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- We could even do some photo editing but not sure it is easy to do since I don't know which font is used for `storm-sql-internal-workflow.png`. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202639091 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +75,12 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); - -Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - -HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -HdfsFileOptions options = mock(HdfsFileOptions.class); -Field optionsField = state.getClass().getDeclaredField("options"); -optionsField.setAccessible(true); -optionsField.set(state, options); +IRichBolt consumer = ds.getConsumer(); -List tupleList = mockTupleList(); - -for (TridentTuple t : tupleList) { -stateUpdater.updateState(state, Collections.singletonList(t), null); -try { -verify(options).execute(Collections.singletonList(t)); -} catch (IOException e) { -throw new RuntimeException(e); -} -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; +Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- Thanks for the information! I'll think about applying it but let me address other comments first. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202635190 --- Diff: sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java --- @@ -47,54 +45,60 @@ * The properties are in JSON format which specifies the name of the MongoDB collection and etc. */ public class MongoDataSourcesProvider implements DataSourcesProvider { +public static final String SCHEME_NAME = "mongodb"; +public static final String VALUE_SERIALIZED_FIELD = "ser.field"; +public static final String TRIDENT_VALUE_SERIALIZED_FIELD = "trident.ser.field"; +public static final String DEFAULT_VALUE_SERIALIZED_FIELD = "tridentSerField"; +public static final String COLLECTION_NAME = "collection.name"; -private static class MongoTridentDataSource implements ISqlTridentDataSource { +private static class MongoStreamsDataSource implements ISqlStreamsDataSource { private final String url; private final Properties props; private final IOutputSerializer serializer; -private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) { +private MongoStreamsDataSource(String url, Properties props, IOutputSerializer serializer) { this.url = url; this.props = props; this.serializer = serializer; } @Override -public ITridentDataSource getProducer() { +public IRichSpout getProducer() { throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer"); } @Override -public SqlTridentConsumer getConsumer() { +public IRichBolt getConsumer() { Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config"); -String serField = props.getProperty("trident.ser.field", "tridentSerField"); -MongoMapper mapper = new TridentMongoMapper(serField, serializer); - -MongoState.Options options = new MongoState.Options() -.withUrl(url) - .withCollectionName(props.getProperty("collection.name")) -.withMapper(mapper); - -StateFactory stateFactory = new MongoStateFactory(options); -StateUpdater stateUpdater = new MongoStateUpdater(); - -return new SimpleSqlTridentConsumer(stateFactory, stateUpdater); +String serField; +if (props.contains(VALUE_SERIALIZED_FIELD)) { +serField = props.getProperty(VALUE_SERIALIZED_FIELD); +} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) { +// backward compatibility +serField = props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD); --- End diff -- Yeah right, but it has been existing one and easy to support backward compatibility, so doesn't matter if we support it. We didn't deprecate it either. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202631700 --- Diff: sql/README.md --- @@ -1,187 +1,8 @@ # Storm SQL -Compile SQL queries to Storm topologies. +Compile SQL queries to Storm topologies and run. -## Usage - -Run the ``storm sql`` command to compile SQL statements into Trident topology, and submit it to the Storm cluster - -``` -$ bin/storm sql -``` - -In which `sql-file` contains a list of SQL statements to be executed, and `topo-name` is the name of the topology. - -StormSQL activates `explain mode` and shows query plan instead of submitting topology when user specifies `topo-name` as `--explain`. -Detailed explanation is available from `Showing Query Plan (explain mode)` section. - -## Supported Features - -The following features are supported in the current repository: - -* Streaming from and to external data sources -* Filtering tuples -* Projections -* Aggregations (Grouping) -* User defined function (scalar and aggregate) -* Join (Inner, Left outer, Right outer, Full outer) - -## Specifying External Data Sources - -In StormSQL data is represented by external tables. Users can specify data sources using the `CREATE EXTERNAL TABLE` -statement. For example, the following statement specifies a Kafka spouts and sink: - -``` -CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' -``` - -The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in -[Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). - -`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout. -Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition). - -Default value is 1, and this option has no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.) - -## Plugging in External Data Sources - -Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using -the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the -tables. Please refer to the implementation of `storm-sql-kafka` for more details. - -## Specifying User Defined Function (UDF) - -Users can define user defined function (scalar or aggregate) using `CREATE FUNCTION` statement. -For example, the following statement defines `MYPLUS` function which uses `org.apache.storm.sql.TestUtils$MyPlus` class. - -``` -CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' -``` - -Storm SQL determines whether the function as scalar or aggregate by checking which methods are defined. -If the class defines `evaluate` method, Storm SQL treats the function as `scalar`, -and if the class defines `add` method, Storm SQL treats the function as `aggregate`. - -Example of class for scalar function is here: - -``` - public class MyPlus { -public static Integer evaluate(Integer x, Integer y) { - return x + y; -} - } - -``` - -and class for aggregate function is here: - -``` - public class MyConcat { -public static String init() { - return ""; -} -public static String add(String accumulator, String val) { - return accumulator + val; -} -public static String result(String accumulator) { - return accumulator; -} - } -``` - -If users don't define `result` method, result is the last return value of `add` method. -Users need to define `result` method only when we need to transform accumulated value. - -## Example: Filtering Kafka Stream - -Let's say there is a Kafka stream that represents the transactions of orders. Each message in the stream contains the id -of the order, the unit price of the product and the quantity of the orders. The goal is to filter orders where the -transactions are significant and to insert these orders into another Kafka stream for further analysis. - -The user can specify the following SQL statements in the SQL file: - -``` -CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders'
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202632910 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java --- @@ -72,13 +72,13 @@ // merge and push unions rules UnionEliminatorRule.INSTANCE, -TridentScanRule.INSTANCE, -TridentFilterRule.INSTANCE, -TridentProjectRule.INSTANCE, -TridentAggregateRule.INSTANCE, -TridentJoinRule.INSTANCE, -TridentModifyRule.INSTANCE, -TridentCalcRule.INSTANCE +StreamsScanRule.INSTANCE, +StreamsFilterRule.INSTANCE, +StreamsProjectRule.INSTANCE, +StreamsAggregateRule.INSTANCE, --- End diff -- This is to have control to warn to the end users when aggregate or join is used in users' query. I didn't test if we don't have our own rule and user query trigger the feature. Yeah it is also good to not forget about missing features. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202638476 --- Diff: sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java --- @@ -41,7 +26,46 @@ import java.util.Map; import java.util.PriorityQueue; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.sql.runtime.ISqlStreamsDataSource; +import org.apache.storm.streams.Pair; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.junit.rules.ExternalResource; + public class TestUtils { + public static final ExternalResource mockInsertBoltValueResource = new ExternalResource() { +@Override +protected void before() throws Throwable { + MockInsertBolt.getCollectedValues().clear(); +} + +@Override +protected void after() { + // no-op --- End diff -- Will fix. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202638239 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.storm.shade.org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. Only available for Storm SQL. + * The class doesn't handle reconnection so you may not want to use this for production. --- End diff -- You could do some experiments with your production query via replacing input table and/or output table to socket. We may also want to have console output table (it can't be input table for sure) but we could address it from follow-up issue. Please refer others' cases: Spark has couple of input sources and output sinks for testing, whereas Flink doesn't look like providing table sources and table sinks for testing. I'd rather feel better on Spark for this case. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202633872 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.planner.streams.rel; + +import java.util.List; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.storm.sql.planner.StormRelUtils; +import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase; +import org.apache.storm.sql.planner.streams.StreamsPlanCreator; +import org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction; +import org.apache.storm.streams.Stream; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.tuple.Values; + +public class StreamsStreamInsertRel extends StormStreamInsertRelBase implements StreamsRel { +private final int primaryKeyIndex; + +public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, + RelNode child, Operation operation, List updateColumnList, List sourceExpressionList, + boolean flattened, int primaryKeyIndex) { +super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened); +this.primaryKeyIndex = primaryKeyIndex; +} + +@Override +public RelNode copy(RelTraitSet traitSet, List inputs) { +return new StreamsStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(), +sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), primaryKeyIndex); +} + +@Override +public void streamsPlan(StreamsPlanCreator planCreator) throws Exception { +// SingleRel +RelNode input = getInput(); +StormRelUtils.getStormRelInput(input).streamsPlan(planCreator); +Stream inputStream = planCreator.pop(); + +Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported."); + +// Calcite ensures that the value is structurized to the table definition +// hence we can use PK index directly +// To elaborate, if table BAR is defined as ID INTEGER PK, NAME VARCHAR, DEPTID INTEGER +// and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is executed, +// Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to the value before INSERT. + +// FIXME: this should be really different... --- End diff -- Removing the line would be right for now... since sadly I forgot why I add FIXME line here. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202631939 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -118,16 +138,32 @@ public RelDataType getRowType( @Override public Statistic getStatistic() { return stat != null ? stat : Statistics.of(rows.size(), - ImmutableList.of()); +ImmutableList.of()); } @Override public Schema.TableType getJdbcTableType() { return Schema.TableType.STREAM; } + +@Override +public boolean isRolledUp(String s) { --- End diff -- OK will find origin methods and restore its origin names. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202638350 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.storm.shade.org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. Only available for Storm SQL. + * The class doesn't handle reconnection so you may not want to use this for production. + */ +public class SocketBolt implements IRichBolt { --- End diff -- Yeah nice finding! Will fix. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202631502 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- @srdo Agreed. Regarding updating the doc, I guess most parts of content are correct, but unfortunately I don't have resource for diagram so required to redraw one. I'd rather explain in content to avoid redraw one (like "Now Trident topology in a diagram is replaced to normal Storm topology"). What do you think? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202345310 --- Diff: sql/README.md --- @@ -1,187 +1,8 @@ # Storm SQL -Compile SQL queries to Storm topologies. +Compile SQL queries to Storm topologies and run. -## Usage - -Run the ``storm sql`` command to compile SQL statements into Trident topology, and submit it to the Storm cluster - -``` -$ bin/storm sql -``` - -In which `sql-file` contains a list of SQL statements to be executed, and `topo-name` is the name of the topology. - -StormSQL activates `explain mode` and shows query plan instead of submitting topology when user specifies `topo-name` as `--explain`. -Detailed explanation is available from `Showing Query Plan (explain mode)` section. - -## Supported Features - -The following features are supported in the current repository: - -* Streaming from and to external data sources -* Filtering tuples -* Projections -* Aggregations (Grouping) -* User defined function (scalar and aggregate) -* Join (Inner, Left outer, Right outer, Full outer) - -## Specifying External Data Sources - -In StormSQL data is represented by external tables. Users can specify data sources using the `CREATE EXTERNAL TABLE` -statement. For example, the following statement specifies a Kafka spouts and sink: - -``` -CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' -``` - -The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in -[Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). - -`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout. -Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition). - -Default value is 1, and this option has no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.) - -## Plugging in External Data Sources - -Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using -the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the -tables. Please refer to the implementation of `storm-sql-kafka` for more details. - -## Specifying User Defined Function (UDF) - -Users can define user defined function (scalar or aggregate) using `CREATE FUNCTION` statement. -For example, the following statement defines `MYPLUS` function which uses `org.apache.storm.sql.TestUtils$MyPlus` class. - -``` -CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' -``` - -Storm SQL determines whether the function as scalar or aggregate by checking which methods are defined. -If the class defines `evaluate` method, Storm SQL treats the function as `scalar`, -and if the class defines `add` method, Storm SQL treats the function as `aggregate`. - -Example of class for scalar function is here: - -``` - public class MyPlus { -public static Integer evaluate(Integer x, Integer y) { - return x + y; -} - } - -``` - -and class for aggregate function is here: - -``` - public class MyConcat { -public static String init() { - return ""; -} -public static String add(String accumulator, String val) { - return accumulator + val; -} -public static String result(String accumulator) { - return accumulator; -} - } -``` - -If users don't define `result` method, result is the last return value of `add` method. -Users need to define `result` method only when we need to transform accumulated value. - -## Example: Filtering Kafka Stream - -Let's say there is a Kafka stream that represents the transactions of orders. Each message in the stream contains the id -of the order, the unit price of the product and the quantity of the orders. The goal is to filter orders where the -transactions are significant and to insert these orders into another Kafka stream for further analysis. - -The user can specify the following SQL statements in the SQL file: - -``` -CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202356815 --- Diff: sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java --- @@ -47,54 +45,60 @@ * The properties are in JSON format which specifies the name of the MongoDB collection and etc. */ public class MongoDataSourcesProvider implements DataSourcesProvider { +public static final String SCHEME_NAME = "mongodb"; +public static final String VALUE_SERIALIZED_FIELD = "ser.field"; +public static final String TRIDENT_VALUE_SERIALIZED_FIELD = "trident.ser.field"; +public static final String DEFAULT_VALUE_SERIALIZED_FIELD = "tridentSerField"; +public static final String COLLECTION_NAME = "collection.name"; -private static class MongoTridentDataSource implements ISqlTridentDataSource { +private static class MongoStreamsDataSource implements ISqlStreamsDataSource { private final String url; private final Properties props; private final IOutputSerializer serializer; -private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) { +private MongoStreamsDataSource(String url, Properties props, IOutputSerializer serializer) { this.url = url; this.props = props; this.serializer = serializer; } @Override -public ITridentDataSource getProducer() { +public IRichSpout getProducer() { throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer"); } @Override -public SqlTridentConsumer getConsumer() { +public IRichBolt getConsumer() { Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config"); -String serField = props.getProperty("trident.ser.field", "tridentSerField"); -MongoMapper mapper = new TridentMongoMapper(serField, serializer); - -MongoState.Options options = new MongoState.Options() -.withUrl(url) - .withCollectionName(props.getProperty("collection.name")) -.withMapper(mapper); - -StateFactory stateFactory = new MongoStateFactory(options); -StateUpdater stateUpdater = new MongoStateUpdater(); - -return new SimpleSqlTridentConsumer(stateFactory, stateUpdater); +String serField; +if (props.contains(VALUE_SERIALIZED_FIELD)) { +serField = props.getProperty(VALUE_SERIALIZED_FIELD); +} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) { +// backward compatibility +serField = props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD); --- End diff -- Nit: Since this is targeting 2.0.0, it might be okay not to provide backward compatibility. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202355372 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +75,12 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); - -Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - -HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -HdfsFileOptions options = mock(HdfsFileOptions.class); -Field optionsField = state.getClass().getDeclaredField("options"); -optionsField.setAccessible(true); -optionsField.set(state, options); +IRichBolt consumer = ds.getConsumer(); -List tupleList = mockTupleList(); - -for (TridentTuple t : tupleList) { -stateUpdater.updateState(state, Collections.singletonList(t), null); -try { -verify(options).execute(Collections.singletonList(t)); -} catch (IOException e) { -throw new RuntimeException(e); -} -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; +Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- Looking at this again, I think the way to do it would be to use a factory like https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java to instantiate the HdfsBolt. You could pass the factory class name in through a property and instantiate it when the data source is created. The default factory would just call `new HdfsBolt()`. I'm not sure if it's worth it. Up to you. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202341073 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- I'm wondering if it would be worth updating this rather than removing it, or maybe merging parts of it into one of the other storm-sql docs (e.g. the overview doc). I think the page provides a nice overview of what storm-sql is, and how it works. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202347946 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -118,16 +138,32 @@ public RelDataType getRowType( @Override public Statistic getStatistic() { return stat != null ? stat : Statistics.of(rows.size(), - ImmutableList.of()); +ImmutableList.of()); } @Override public Schema.TableType getJdbcTableType() { return Schema.TableType.STREAM; } + +@Override +public boolean isRolledUp(String s) { --- End diff -- Nit: `s` doesn't really say much as a variable name, can we replace it with one that says what this string is? Same for the other methods here. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202359820 --- Diff: sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java --- @@ -41,7 +26,46 @@ import java.util.Map; import java.util.PriorityQueue; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.sql.runtime.ISqlStreamsDataSource; +import org.apache.storm.streams.Pair; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.junit.rules.ExternalResource; + public class TestUtils { + public static final ExternalResource mockInsertBoltValueResource = new ExternalResource() { +@Override +protected void before() throws Throwable { + MockInsertBolt.getCollectedValues().clear(); +} + +@Override +protected void after() { + // no-op --- End diff -- Nit: I think you can leave out this method entirely ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202358383 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.storm.shade.org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. Only available for Storm SQL. + * The class doesn't handle reconnection so you may not want to use this for production. --- End diff -- Does it make sense for us to include code like this if we know it isn't suitable for production use? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202349103 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java --- @@ -72,13 +72,13 @@ // merge and push unions rules UnionEliminatorRule.INSTANCE, -TridentScanRule.INSTANCE, -TridentFilterRule.INSTANCE, -TridentProjectRule.INSTANCE, -TridentAggregateRule.INSTANCE, -TridentJoinRule.INSTANCE, -TridentModifyRule.INSTANCE, -TridentCalcRule.INSTANCE +StreamsScanRule.INSTANCE, +StreamsFilterRule.INSTANCE, +StreamsProjectRule.INSTANCE, +StreamsAggregateRule.INSTANCE, --- End diff -- Are we holding on to aggregate and join so they'll be easy to implement later, or why are the aggregate and join rules here? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202350280 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.planner.streams.rel; + +import java.util.List; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.storm.sql.planner.StormRelUtils; +import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase; +import org.apache.storm.sql.planner.streams.StreamsPlanCreator; +import org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction; +import org.apache.storm.streams.Stream; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.tuple.Values; + +public class StreamsStreamInsertRel extends StormStreamInsertRelBase implements StreamsRel { +private final int primaryKeyIndex; + +public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, + RelNode child, Operation operation, List updateColumnList, List sourceExpressionList, + boolean flattened, int primaryKeyIndex) { +super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened); +this.primaryKeyIndex = primaryKeyIndex; +} + +@Override +public RelNode copy(RelTraitSet traitSet, List inputs) { +return new StreamsStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(), +sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), primaryKeyIndex); +} + +@Override +public void streamsPlan(StreamsPlanCreator planCreator) throws Exception { +// SingleRel +RelNode input = getInput(); +StormRelUtils.getStormRelInput(input).streamsPlan(planCreator); +Stream inputStream = planCreator.pop(); + +Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported."); + +// Calcite ensures that the value is structurized to the table definition +// hence we can use PK index directly +// To elaborate, if table BAR is defined as ID INTEGER PK, NAME VARCHAR, DEPTID INTEGER +// and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is executed, +// Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to the value before INSERT. + +// FIXME: this should be really different... --- End diff -- I think we should raise a followup issue immediately and remove the FIXME, or just fix it now. I don't really know what the FIXME is saying should be changed. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202344785 --- Diff: docs/storm-sql.md --- @@ -6,14 +6,14 @@ documentation: true The Storm SQL integration allows users to run SQL queries over streaming data in Storm. Not only the SQL interface allows faster development cycles on streaming analytics, but also opens up the opportunities to unify batch data processing like [Apache Hive](///hive.apache.org) and real-time streaming data analytics. -At a very high level StormSQL compiles the SQL queries to [Trident](Trident-API-Overview.html) topologies and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to the [this](storm-sql-internal.html) page. +At a very high level StormSQL compiles the SQL queries to Storm topologies leveraging Streams API and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to the [this](storm-sql-internal.html) page. --- End diff -- There's a link to storm-sql-internal in this line ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202357967 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.storm.shade.org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. Only available for Storm SQL. + * The class doesn't handle reconnection so you may not want to use this for production. + */ +public class SocketBolt implements IRichBolt { --- End diff -- Nit: Consider extending `BaseRichBolt` instead, so you don't have to implement the methods you don't want to customize ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202282820 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- I removed the document since it is documented based on Trident implementation. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155672497 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java --- @@ -26,59 +27,61 @@ import java.io.InputStreamReader; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import org.apache.storm.Config; import org.apache.storm.spout.Scheme; +import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IBatchSpout; -import org.apache.storm.tuple.Fields; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes. + * Spout for Socket data. Only available for Storm SQL. This doesn't guarantee at-least-once. --- End diff -- I found myself dumb, I already implemented replay of messages. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155661686 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -117,9 +138,21 @@ public TableBuilderInfo parallelismHint(int parallelismHint) { return this; } +// FIXME: we may want to separate Stream and Table, and let output table be Table instead of Stream --- End diff -- https://issues.apache.org/jira/browse/STORM-2848 ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155656225 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +90,13 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); +IRichBolt consumer = ds.getConsumer(); -Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - -HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -HdfsFileOptions options = mock(HdfsFileOptions.class); -Field optionsField = state.getClass().getDeclaredField("options"); -optionsField.setAccessible(true); -optionsField.set(state, options); - -List tupleList = mockTupleList(); - -for (TridentTuple t : tupleList) { -stateUpdater.updateState(state, Collections.singletonList(t), null); -try { -verify(options).execute(Collections.singletonList(t)); -} catch (IOException e) { -throw new RuntimeException(e); -} -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; +Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- Ok. Maybe the previous test isn't really relevant anymore anyway. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155655708 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +90,13 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); +IRichBolt consumer = ds.getConsumer(); -Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - -HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -HdfsFileOptions options = mock(HdfsFileOptions.class); -Field optionsField = state.getClass().getDeclaredField("options"); -optionsField.setAccessible(true); -optionsField.set(state, options); - -List tupleList = mockTupleList(); - -for (TridentTuple t : tupleList) { -stateUpdater.updateState(state, Collections.singletonList(t), null); -try { -verify(options).execute(Collections.singletonList(t)); -} catch (IOException e) { -throw new RuntimeException(e); -} -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; +Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- Nope. The bolt instance is already initialized. That's why I couldn't use mockito's feature. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155654493 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +90,13 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); +IRichBolt consumer = ds.getConsumer(); -Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - -HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -HdfsFileOptions options = mock(HdfsFileOptions.class); -Field optionsField = state.getClass().getDeclaredField("options"); -optionsField.setAccessible(true); -optionsField.set(state, options); - -List tupleList = mockTupleList(); - -for (TridentTuple t : tupleList) { -stateUpdater.updateState(state, Collections.singletonList(t), null); -try { -verify(options).execute(Collections.singletonList(t)); -} catch (IOException e) { -throw new RuntimeException(e); -} -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; +Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- You can put a spy (https://static.javadoc.io/org.mockito/mockito-core/2.13.0/org/mockito/Mockito.html#spy) around it, but that only works if the calls you're interested in happen after you get access to the bolt reference. I'm not sure if that's what you need? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155653625 --- Diff: sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java --- @@ -190,363 +191,213 @@ public void open(ChannelContext ctx) { } } - public static class MockState implements State { -/** - * Collect all values in a static variable as the instance will go through serialization and deserialization. - * NOTE: This should be cleared before or after running each test. - */ -private transient static final ListVALUES = new ArrayList<>(); + public static class MockSpout extends BaseRichSpout { -public static List
getCollectedValues() { - return VALUES; +private final List records; +private final Fields outputFields; +private boolean emitted = false; +private SpoutOutputCollector collector; + +public MockSpout(List records, Fields outputFields) { + this.records = records; + this.outputFields = outputFields; } @Override -public void beginCommit(Long txid) { - // NOOP +public void open(Map
conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; } @Override -public void commit(Long txid) { - // NOOP -} +public void nextTuple() { + if (emitted) { +return; + } -public void updateState(List tuples, TridentCollector collector) { - for (TridentTuple tuple : tuples) { -VALUES.add(tuple.getValues()); + for (Values r : records) { +collector.emit(r); } -} - } - public static class MockStateFactory implements StateFactory { + emitted = true; +} @Override -public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - return new MockState(); +public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(outputFields); } } - public static class MockStateUpdater implements StateUpdater { + public static class MockBolt extends BaseRichBolt { --- End diff -- Thanks for the information. I'll try to apply on that. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155651046 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelTable.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.calcite; + +public interface ParallelTable extends StormTable { + +/** + * Returns parallelism hint of this table. Returns null if don't know. --- End diff -- Will fix. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155650878 --- Diff: sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java --- @@ -68,62 +70,13 @@ @SuppressWarnings("unchecked") @Test public void testKafkaSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); +IRichBolt consumer = ds.getConsumer(); -Assert.assertEquals(TridentKafkaStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(TridentKafkaUpdater.class, consumer.getStateUpdater().getClass()); - -TridentKafkaState state = (TridentKafkaState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -KafkaProducer producer = mock(KafkaProducer.class); - doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class)); -Field producerField = state.getClass().getDeclaredField("producer"); -producerField.setAccessible(true); -producerField.set(state, producer); - -List tupleList = mockTupleList(); -for (TridentTuple t : tupleList) { -state.updateState(Collections.singletonList(t), null); -verify(producer).send(argThat(new KafkaMessageMatcher(t))); -} -verifyNoMoreInteractions(producer); -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; -} - -private static class KafkaMessageMatcher implements ArgumentMatcher> { - -private static final int PRIMARY_INDEX = 0; -private final TridentTuple tuple; - -private KafkaMessageMatcher(TridentTuple tuple) { -this.tuple = tuple; -} - -@SuppressWarnings("unchecked") -@Override -public boolean matches(ProducerRecord
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155650836 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +90,13 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); +IRichBolt consumer = ds.getConsumer(); -Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - -HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -HdfsFileOptions options = mock(HdfsFileOptions.class); -Field optionsField = state.getClass().getDeclaredField("options"); -optionsField.setAccessible(true); -optionsField.set(state, options); - -List tupleList = mockTupleList(); - -for (TridentTuple t : tupleList) { -stateUpdater.updateState(state, Collections.singletonList(t), null); -try { -verify(options).execute(Collections.singletonList(t)); -} catch (IOException e) { -throw new RuntimeException(e); -} -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; +Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- Unfortunately external Bolt implementations are less standarilzed than Trident State and State Updater implementations. We can't mock all the internals in bolt and then we can't get control of the bolt. It was relatively easy for Trident State implementations. Maybe ideal approach to test the provider is checking Bolt's configuration and see provider configuration is applied to the bolt. I was trying it but noticed that I can't get current configuration from the bolt. Do you have an idea to mock the class to track the method calls for instances even I don't get control to create the instance? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155647898 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -117,9 +138,21 @@ public TableBuilderInfo parallelismHint(int parallelismHint) { return this; } +// FIXME: we may want to separate Stream and Table, and let output table be Table instead of Stream --- End diff -- Thanks ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155647861 --- Diff: sql/storm-sql-core/pom.xml --- @@ -66,10 +66,34 @@ calcite-core --- End diff -- Good. Thanks for the reminder. :) ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155647803 --- Diff: sql/storm-sql-core/pom.xml --- @@ -66,10 +66,34 @@ calcite-core ${calcite.version} + --- End diff -- Makes sense if we don't need them. If you're trying to avoid version conflicts for transitive dependencies I think adding a `` section to set the versions is better. It lets you set which version of the dependency you want, and then all transitive includes of that dependency will be the specified version. I think we do this with e.g. the Jackson dependencies in the Storm parent pom. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155646360 --- Diff: sql/storm-sql-core/pom.xml --- @@ -66,10 +66,34 @@ calcite-core --- End diff -- Sure. I didn't mean to say that you should do any extra work on checkstyle in this, just wanted to make sure reducing the counter wasn't being forgotten by accident. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155646317 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java --- @@ -26,59 +27,61 @@ import java.io.InputStreamReader; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import org.apache.storm.Config; import org.apache.storm.spout.Scheme; +import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IBatchSpout; -import org.apache.storm.tuple.Fields; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes. + * Spout for Socket data. Only available for Storm SQL. This doesn't guarantee at-least-once. --- End diff -- Same thing for spout. I think I need to add same comment from bolt and also mentioning that it doesn't support replaying so at-most-once semantic. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155645948 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. The class is for test purpose and doesn't handle reconnection or so. --- End diff -- I think removing "or so" is clearer. What I meant for test is not unit test, just for users to test query before attaching external source. Maybe we can just remove mentioning about test purpose and mention that it doesn't handle reconnection and support replaying so at-most-once semantic. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155644582 --- Diff: sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java --- @@ -49,53 +46,56 @@ */ public class MongoDataSourcesProvider implements DataSourcesProvider { -private static class MongoTridentDataSource implements ISqlTridentDataSource { +private static class MongoStreamsDataSource implements ISqlStreamsDataSource { private final String url; private final Properties props; private final IOutputSerializer serializer; -private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) { +private MongoStreamsDataSource(String url, Properties props, IOutputSerializer serializer) { this.url = url; this.props = props; this.serializer = serializer; } @Override -public ITridentDataSource getProducer() { +public IRichSpout getProducer() { throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer"); } @Override -public SqlTridentConsumer getConsumer() { +public IRichBolt getConsumer() { Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config"); -String serField = props.getProperty("trident.ser.field", "tridentSerField"); -MongoMapper mapper = new TridentMongoMapper(serField, serializer); -MongoState.Options options = new MongoState.Options() -.withUrl(url) - .withCollectionName(props.getProperty("collection.name")) -.withMapper(mapper); +String serField; +if (props.contains("ser.field")) { +serField = props.getProperty("ser.field"); +} else if (props.contains("trident.ser.field")) { --- End diff -- I think I missed it. Nice finding. Will address. Btw it is specific to mongo config, so constants will be put in same class. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155644012 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -117,9 +138,21 @@ public TableBuilderInfo parallelismHint(int parallelismHint) { return this; } +// FIXME: we may want to separate Stream and Table, and let output table be Table instead of Stream --- End diff -- OK I'll file an issue and remove the comment. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155643727 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -89,16 +95,31 @@ public TableBuilderInfo field(String name, RelDataType type) { return this; } +public TableBuilderInfo field(String name, SqlTypeName type, ColumnConstraint constraint) { + interpretConstraint(constraint, fields.size()); + return field(name, typeFactory.createSqlType(type)); +} + +public TableBuilderInfo field(String name, RelDataType type, ColumnConstraint constraint) { + interpretConstraint(constraint, fields.size()); + fields.add(new FieldType(name, type)); + return this; +} + public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) { RelDataType dataType = type.deriveType(typeFactory); + interpretConstraint(constraint, fields.size()); + fields.add(new FieldType(name, dataType)); + return this; +} + +private void interpretConstraint(ColumnConstraint constraint, int fieldIdx) { if (constraint instanceof ColumnConstraint.PrimaryKey) { ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint; Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table"); --- End diff -- Will fix. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155643640 --- Diff: sql/storm-sql-core/pom.xml --- @@ -66,10 +66,34 @@ calcite-core ${calcite.version} + --- End diff -- They're for supporting another parts of Calcite and unneeded dependencies for us. Some dependencies are something already we have, so also good to avoid the version conflict. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155643000 --- Diff: sql/storm-sql-core/pom.xml --- @@ -66,10 +66,34 @@ calcite-core --- End diff -- I'll see how the number is changed. It might be remain the same if I fixed checkstyle issue from A and violated from B. Actually I'm focusing on planning and addressing window aggregation and join, so TBH I'd like to address the checkstyle issue later altogether. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r154738635 --- Diff: sql/storm-sql-core/pom.xml --- @@ -66,10 +66,34 @@ calcite-core ${calcite.version} + --- End diff -- Why the exclusions? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155320443 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelTable.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.calcite; + +public interface ParallelTable extends StormTable { + +/** + * Returns parallelism hint of this table. Returns null if don't know. --- End diff -- nit: Returns null if not known. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155515549 --- Diff: sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java --- @@ -49,53 +46,56 @@ */ public class MongoDataSourcesProvider implements DataSourcesProvider { -private static class MongoTridentDataSource implements ISqlTridentDataSource { +private static class MongoStreamsDataSource implements ISqlStreamsDataSource { private final String url; private final Properties props; private final IOutputSerializer serializer; -private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) { +private MongoStreamsDataSource(String url, Properties props, IOutputSerializer serializer) { this.url = url; this.props = props; this.serializer = serializer; } @Override -public ITridentDataSource getProducer() { +public IRichSpout getProducer() { throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer"); } @Override -public SqlTridentConsumer getConsumer() { +public IRichBolt getConsumer() { Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config"); -String serField = props.getProperty("trident.ser.field", "tridentSerField"); -MongoMapper mapper = new TridentMongoMapper(serField, serializer); -MongoState.Options options = new MongoState.Options() -.withUrl(url) - .withCollectionName(props.getProperty("collection.name")) -.withMapper(mapper); +String serField; +if (props.contains("ser.field")) { +serField = props.getProperty("ser.field"); +} else if (props.contains("trident.ser.field")) { --- End diff -- Nit: Are there constants somewhere that could be used instead of literals? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155320991 --- Diff: sql/storm-sql-core/pom.xml --- @@ -66,10 +66,34 @@ calcite-core --- End diff -- You might want to reduce the allowed checkstyle errors in the poms if you reduced the violations. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155516637 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. The class is for test purpose and doesn't handle reconnection or so. --- End diff -- What is meant by "or so"? Also if this class is only for tests, can it be moved to test scope? And if not, can it be moved into a package for test classes so it's clear that it's only for tests? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155520270 --- Diff: sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java --- @@ -190,363 +191,213 @@ public void open(ChannelContext ctx) { } } - public static class MockState implements State { -/** - * Collect all values in a static variable as the instance will go through serialization and deserialization. - * NOTE: This should be cleared before or after running each test. - */ -private transient static final ListVALUES = new ArrayList<>(); + public static class MockSpout extends BaseRichSpout { -public static List
getCollectedValues() { - return VALUES; +private final List records; +private final Fields outputFields; +private boolean emitted = false; +private SpoutOutputCollector collector; + +public MockSpout(List records, Fields outputFields) { + this.records = records; + this.outputFields = outputFields; } @Override -public void beginCommit(Long txid) { - // NOOP +public void open(Map
conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; } @Override -public void commit(Long txid) { - // NOOP -} +public void nextTuple() { + if (emitted) { +return; + } -public void updateState(List tuples, TridentCollector collector) { - for (TridentTuple tuple : tuples) { -VALUES.add(tuple.getValues()); + for (Values r : records) { +collector.emit(r); } -} - } - public static class MockStateFactory implements StateFactory { + emitted = true; +} @Override -public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - return new MockState(); +public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(outputFields); } } - public static class MockStateUpdater implements StateUpdater { + public static class MockBolt extends BaseRichBolt { --- End diff -- I think it could be helpful to use a JUnit TestRule (e.g. by subclassing http://junit.org/junit4/javadoc/4.12/org/junit/rules/ExternalResource.html) to help remember to clear VALUES after the test. It's easy to forget to clean up, and then the tests leak state. Same for the other test utils that keep static state. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155321227 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -117,9 +138,21 @@ public TableBuilderInfo parallelismHint(int parallelismHint) { return this; } +// FIXME: we may want to separate Stream and Table, and let output table be Table instead of Stream --- End diff -- Can you put this in an issue instead? I think it will be forgotten if left as a TODO in the code. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155321576 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -89,16 +95,31 @@ public TableBuilderInfo field(String name, RelDataType type) { return this; } +public TableBuilderInfo field(String name, SqlTypeName type, ColumnConstraint constraint) { + interpretConstraint(constraint, fields.size()); + return field(name, typeFactory.createSqlType(type)); +} + +public TableBuilderInfo field(String name, RelDataType type, ColumnConstraint constraint) { + interpretConstraint(constraint, fields.size()); + fields.add(new FieldType(name, type)); + return this; +} + public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) { RelDataType dataType = type.deriveType(typeFactory); + interpretConstraint(constraint, fields.size()); + fields.add(new FieldType(name, dataType)); + return this; +} + +private void interpretConstraint(ColumnConstraint constraint, int fieldIdx) { if (constraint instanceof ColumnConstraint.PrimaryKey) { ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint; Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table"); --- End diff -- Nit: There are -> There is ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155514626 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +90,13 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); +IRichBolt consumer = ds.getConsumer(); -Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - -HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -HdfsFileOptions options = mock(HdfsFileOptions.class); -Field optionsField = state.getClass().getDeclaredField("options"); -optionsField.setAccessible(true); -optionsField.set(state, options); - -List tupleList = mockTupleList(); - -for (TridentTuple t : tupleList) { -stateUpdater.updateState(state, Collections.singletonList(t), null); -try { -verify(options).execute(Collections.singletonList(t)); -} catch (IOException e) { -throw new RuntimeException(e); -} -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; +Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- This seems to be testing much less than before. Is the test no longer relevant? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155517029 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java --- @@ -26,59 +27,61 @@ import java.io.InputStreamReader; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import org.apache.storm.Config; import org.apache.storm.spout.Scheme; +import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IBatchSpout; -import org.apache.storm.tuple.Fields; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Trident Spout for Socket data. Only available for Storm SQL, and only use for test purposes. + * Spout for Socket data. Only available for Storm SQL. This doesn't guarantee at-least-once. --- End diff -- Is it now intended for non-test use? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155515253 --- Diff: sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java --- @@ -68,62 +70,13 @@ @SuppressWarnings("unchecked") @Test public void testKafkaSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); +IRichBolt consumer = ds.getConsumer(); -Assert.assertEquals(TridentKafkaStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(TridentKafkaUpdater.class, consumer.getStateUpdater().getClass()); - -TridentKafkaState state = (TridentKafkaState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -KafkaProducer producer = mock(KafkaProducer.class); - doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class)); -Field producerField = state.getClass().getDeclaredField("producer"); -producerField.setAccessible(true); -producerField.set(state, producer); - -List tupleList = mockTupleList(); -for (TridentTuple t : tupleList) { -state.updateState(Collections.singletonList(t), null); -verify(producer).send(argThat(new KafkaMessageMatcher(t))); -} -verifyNoMoreInteractions(producer); -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; -} - -private static class KafkaMessageMatcher implements ArgumentMatcher> { - -private static final int PRIMARY_INDEX = 0; -private final TridentTuple tuple; - -private KafkaMessageMatcher(TridentTuple tuple) { -this.tuple = tuple; -} - -@SuppressWarnings("unchecked") -@Override -public boolean matches(ProducerRecord