[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2443


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202684941
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

Nice finding. Will fix it and also will attach source file of draw.io, but 
not sure where to add.
I'll draw for second diagram as well.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202683106
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

The second diagram on the page also mentions Trident. I don't know if it 
makes sense to update, or just remove it.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202682665
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

Thanks, looks great. I think the dotted line box is missing the "StormSQL" 
label. Also maybe we should put the draw.io source file in the docs directory 
too, so we don't have to remake the diagram again if we need to change it later.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202678116
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

Just finished it.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202674112
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

Yeah right. I didn't think about that. Drawing a new one.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202653817
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

It would probably be pretty quick to duplicate with e.g. 
https://www.draw.io/, since it's just some boxes. 


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202650814
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. Only available for Storm SQL.
+ * The class doesn't handle reconnection so you may not want to use this 
for production.
--- End diff --

Okay. It's fine to keep it, just wanted to make sure we didn't want to e.g. 
move it to /test


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202650588
  
--- Diff: 
sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
 ---
@@ -47,54 +45,60 @@
  * The properties are in JSON format which specifies the name of the 
MongoDB collection and etc.
  */
 public class MongoDataSourcesProvider implements DataSourcesProvider {
+public static final String SCHEME_NAME = "mongodb";
+public static final String VALUE_SERIALIZED_FIELD = "ser.field";
+public static final String TRIDENT_VALUE_SERIALIZED_FIELD = 
"trident.ser.field";
+public static final String DEFAULT_VALUE_SERIALIZED_FIELD = 
"tridentSerField";
+public static final String COLLECTION_NAME = "collection.name";
 
-private static class MongoTridentDataSource implements 
ISqlTridentDataSource {
+private static class MongoStreamsDataSource implements 
ISqlStreamsDataSource {
 private final String url;
 private final Properties props;
 private final IOutputSerializer serializer;
 
-private MongoTridentDataSource(String url, Properties props, 
IOutputSerializer serializer) {
+private MongoStreamsDataSource(String url, Properties props, 
IOutputSerializer serializer) {
 this.url = url;
 this.props = props;
 this.serializer = serializer;
 }
 
 @Override
-public ITridentDataSource getProducer() {
+public IRichSpout getProducer() {
 throw new 
UnsupportedOperationException(this.getClass().getName() + " doesn't provide 
Producer");
 }
 
 @Override
-public SqlTridentConsumer getConsumer() {
+public IRichBolt getConsumer() {
 Preconditions.checkArgument(!props.isEmpty(), "Writable 
MongoDB must contain collection config");
-String serField = props.getProperty("trident.ser.field", 
"tridentSerField");
-MongoMapper mapper = new TridentMongoMapper(serField, 
serializer);
-
-MongoState.Options options = new MongoState.Options()
-.withUrl(url)
-
.withCollectionName(props.getProperty("collection.name"))
-.withMapper(mapper);
-
-StateFactory stateFactory = new MongoStateFactory(options);
-StateUpdater stateUpdater = new MongoStateUpdater();
-
-return new SimpleSqlTridentConsumer(stateFactory, 
stateUpdater);
+String serField;
+if (props.contains(VALUE_SERIALIZED_FIELD)) {
+serField = props.getProperty(VALUE_SERIALIZED_FIELD);
+} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) {
+// backward compatibility
+serField = 
props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD);
--- End diff --

Sure, that makes sense.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202650461
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
 ---
@@ -72,13 +72,13 @@
 // merge and push unions rules
 UnionEliminatorRule.INSTANCE,
 
-TridentScanRule.INSTANCE,
-TridentFilterRule.INSTANCE,
-TridentProjectRule.INSTANCE,
-TridentAggregateRule.INSTANCE,
-TridentJoinRule.INSTANCE,
-TridentModifyRule.INSTANCE,
-TridentCalcRule.INSTANCE
+StreamsScanRule.INSTANCE,
+StreamsFilterRule.INSTANCE,
+StreamsProjectRule.INSTANCE,
+StreamsAggregateRule.INSTANCE,
--- End diff --

Makes sense.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202644352
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

We could even do some photo editing but not sure it is easy to do since I 
don't know which font is used for `storm-sql-internal-workflow.png`.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202639091
  
--- Diff: 
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
 ---
@@ -88,46 +75,12 @@ public void shutDown() throws IOException {
 @SuppressWarnings("unchecked")
 @Test
 public void testHdfsSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
-
-Assert.assertEquals(HdfsStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(HdfsUpdater.class, 
consumer.getStateUpdater().getClass());
-
-HdfsState state = (HdfsState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-StateUpdater stateUpdater = consumer.getStateUpdater();
-
-HdfsFileOptions options = mock(HdfsFileOptions.class);
-Field optionsField = state.getClass().getDeclaredField("options");
-optionsField.setAccessible(true);
-optionsField.set(state, options);
+IRichBolt consumer = ds.getConsumer();
 
-List tupleList = mockTupleList();
-
-for (TridentTuple t : tupleList) {
-stateUpdater.updateState(state, Collections.singletonList(t), 
null);
-try {
-verify(options).execute(Collections.singletonList(t));
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-}
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
+Assert.assertEquals(HdfsBolt.class, consumer.getClass());
--- End diff --

Thanks for the information! I'll think about applying it but let me address 
other comments first.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202635190
  
--- Diff: 
sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
 ---
@@ -47,54 +45,60 @@
  * The properties are in JSON format which specifies the name of the 
MongoDB collection and etc.
  */
 public class MongoDataSourcesProvider implements DataSourcesProvider {
+public static final String SCHEME_NAME = "mongodb";
+public static final String VALUE_SERIALIZED_FIELD = "ser.field";
+public static final String TRIDENT_VALUE_SERIALIZED_FIELD = 
"trident.ser.field";
+public static final String DEFAULT_VALUE_SERIALIZED_FIELD = 
"tridentSerField";
+public static final String COLLECTION_NAME = "collection.name";
 
-private static class MongoTridentDataSource implements 
ISqlTridentDataSource {
+private static class MongoStreamsDataSource implements 
ISqlStreamsDataSource {
 private final String url;
 private final Properties props;
 private final IOutputSerializer serializer;
 
-private MongoTridentDataSource(String url, Properties props, 
IOutputSerializer serializer) {
+private MongoStreamsDataSource(String url, Properties props, 
IOutputSerializer serializer) {
 this.url = url;
 this.props = props;
 this.serializer = serializer;
 }
 
 @Override
-public ITridentDataSource getProducer() {
+public IRichSpout getProducer() {
 throw new 
UnsupportedOperationException(this.getClass().getName() + " doesn't provide 
Producer");
 }
 
 @Override
-public SqlTridentConsumer getConsumer() {
+public IRichBolt getConsumer() {
 Preconditions.checkArgument(!props.isEmpty(), "Writable 
MongoDB must contain collection config");
-String serField = props.getProperty("trident.ser.field", 
"tridentSerField");
-MongoMapper mapper = new TridentMongoMapper(serField, 
serializer);
-
-MongoState.Options options = new MongoState.Options()
-.withUrl(url)
-
.withCollectionName(props.getProperty("collection.name"))
-.withMapper(mapper);
-
-StateFactory stateFactory = new MongoStateFactory(options);
-StateUpdater stateUpdater = new MongoStateUpdater();
-
-return new SimpleSqlTridentConsumer(stateFactory, 
stateUpdater);
+String serField;
+if (props.contains(VALUE_SERIALIZED_FIELD)) {
+serField = props.getProperty(VALUE_SERIALIZED_FIELD);
+} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) {
+// backward compatibility
+serField = 
props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD);
--- End diff --

Yeah right, but it has been existing one and easy to support backward 
compatibility, so doesn't matter if we support it. We didn't deprecate it 
either.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202631700
  
--- Diff: sql/README.md ---
@@ -1,187 +1,8 @@
 # Storm SQL
 
-Compile SQL queries to Storm topologies.
+Compile SQL queries to Storm topologies and run.
 
-## Usage
-
-Run the ``storm sql`` command to compile SQL statements into Trident 
topology, and submit it to the Storm cluster
-
-```
-$ bin/storm sql  
-```
-
-In which `sql-file` contains a list of SQL statements to be executed, and 
`topo-name` is the name of the topology.
-
-StormSQL activates `explain mode` and shows query plan instead of 
submitting topology when user specifies `topo-name` as `--explain`.
-Detailed explanation is available from `Showing Query Plan (explain mode)` 
section.
-
-## Supported Features
-
-The following features are supported in the current repository:
-
-* Streaming from and to external data sources
-* Filtering tuples
-* Projections
-* Aggregations (Grouping)
-* User defined function (scalar and aggregate)
-* Join (Inner, Left outer, Right outer, Full outer)
-
-## Specifying External Data Sources
-
-In StormSQL data is represented by external tables. Users can specify data 
sources using the `CREATE EXTERNAL TABLE`
-statement. For example, the following statement specifies a Kafka spouts 
and sink:
-
-```
-CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 
'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
-```
-
-The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in
-[Hive Data Definition 
Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
-
-`PARALLELISM` is StormSQL's own keyword which describes parallelism hint 
for input data source. This is same as providing parallelism hint to Trident 
Spout.
-Downstream operators are executed with same parallelism before repartition 
(Aggregation triggers repartition).
-
-Default value is 1, and this option has no effect on output data source. 
(We might change if needed. Normally repartition is the thing to avoid.)
-
-## Plugging in External Data Sources
-
-Users plug in external data sources through implementing the 
`ISqlTridentDataSource` interface and registers them using
-the mechanisms of Java's service loader. The external data source will be 
chosen based on the scheme of the URI of the
-tables. Please refer to the implementation of `storm-sql-kafka` for more 
details.
-
-## Specifying User Defined Function (UDF)
-
-Users can define user defined function (scalar or aggregate) using `CREATE 
FUNCTION` statement.
-For example, the following statement defines `MYPLUS` function which uses 
`org.apache.storm.sql.TestUtils$MyPlus` class.
-
-```
-CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'
-```
-
-Storm SQL determines whether the function as scalar or aggregate by 
checking which methods are defined.
-If the class defines `evaluate` method, Storm SQL treats the function as 
`scalar`,
-and if the class defines `add` method, Storm SQL treats the function as 
`aggregate`.
-
-Example of class for scalar function is here:
-
-```
-  public class MyPlus {
-public static Integer evaluate(Integer x, Integer y) {
-  return x + y;
-}
-  }
-
-```
-
-and class for aggregate function is here:
-
-```
-  public class MyConcat {
-public static String init() {
-  return "";
-}
-public static String add(String accumulator, String val) {
-  return accumulator + val;
-}
-public static String result(String accumulator) {
-  return accumulator;
-}
-  }
-```
-
-If users don't define `result` method, result is the last return value of 
`add` method.
-Users need to define `result` method only when we need to transform 
accumulated value.
-
-## Example: Filtering Kafka Stream
-
-Let's say there is a Kafka stream that represents the transactions of 
orders. Each message in the stream contains the id
-of the order, the unit price of the product and the quantity of the 
orders. The goal is to filter orders where the
-transactions are significant and to insert these orders into another Kafka 
stream for further analysis.
-
-The user can specify the following SQL statements in the SQL file:
-
-```
-CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY 
INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' 

[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202632910
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
 ---
@@ -72,13 +72,13 @@
 // merge and push unions rules
 UnionEliminatorRule.INSTANCE,
 
-TridentScanRule.INSTANCE,
-TridentFilterRule.INSTANCE,
-TridentProjectRule.INSTANCE,
-TridentAggregateRule.INSTANCE,
-TridentJoinRule.INSTANCE,
-TridentModifyRule.INSTANCE,
-TridentCalcRule.INSTANCE
+StreamsScanRule.INSTANCE,
+StreamsFilterRule.INSTANCE,
+StreamsProjectRule.INSTANCE,
+StreamsAggregateRule.INSTANCE,
--- End diff --

This is to have control to warn to the end users when aggregate or join is 
used in users' query. I didn't test if we don't have our own rule and user 
query trigger the feature. Yeah it is also good to not forget about missing 
features.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202638476
  
--- Diff: 
sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java ---
@@ -41,7 +26,46 @@
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.junit.rules.ExternalResource;
+
 public class TestUtils {
+  public static final ExternalResource mockInsertBoltValueResource = new 
ExternalResource() {
+@Override
+protected void before() throws Throwable {
+  MockInsertBolt.getCollectedValues().clear();
+}
+
+@Override
+protected void after() {
+  // no-op
--- End diff --

Will fix.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202638239
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. Only available for Storm SQL.
+ * The class doesn't handle reconnection so you may not want to use this 
for production.
--- End diff --

You could do some experiments with your production query via replacing 
input table and/or output table to socket. 

We may also want to have console output table (it can't be input table for 
sure) but we could address it from follow-up issue.

Please refer others' cases: Spark has couple of input sources and output 
sinks for testing, whereas Flink doesn't look like providing table sources and 
table sinks for testing. I'd rather feel better on Spark for this case.


http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks


https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202633872
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import 
org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Values;
+
+public class StreamsStreamInsertRel extends StormStreamInsertRelBase 
implements StreamsRel {
+private final int primaryKeyIndex;
+
+public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet 
traits, RelOptTable table, Prepare.CatalogReader catalogReader,
+  RelNode child, Operation operation, 
List updateColumnList, List sourceExpressionList,
+  boolean flattened, int primaryKeyIndex) {
+super(cluster, traits, table, catalogReader, child, operation, 
updateColumnList, sourceExpressionList, flattened);
+this.primaryKeyIndex = primaryKeyIndex;
+}
+
+@Override
+public RelNode copy(RelTraitSet traitSet, List inputs) {
+return new StreamsStreamInsertRel(getCluster(), traitSet, 
getTable(), getCatalogReader(),
+sole(inputs), getOperation(), getUpdateColumnList(), 
getSourceExpressionList(), isFlattened(), primaryKeyIndex);
+}
+
+@Override
+public void streamsPlan(StreamsPlanCreator planCreator) throws 
Exception {
+// SingleRel
+RelNode input = getInput();
+StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+Stream inputStream = planCreator.pop();
+
+Preconditions.checkArgument(isInsert(), "Only INSERT statement is 
supported.");
+
+// Calcite ensures that the value is structurized to the table 
definition
+// hence we can use PK index directly
+// To elaborate, if table BAR is defined as ID INTEGER PK, NAME 
VARCHAR, DEPTID INTEGER
+// and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is 
executed,
+// Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to 
the value before INSERT.
+
+// FIXME: this should be really different...
--- End diff --

Removing the line would be right for now... since sadly I forgot why I add 
FIXME line here.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202631939
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---
@@ -118,16 +138,32 @@ public RelDataType getRowType(
 @Override
 public Statistic getStatistic() {
 return stat != null ? stat : Statistics.of(rows.size(),
-   
ImmutableList.of());
+ImmutableList.of());
 }
 
 @Override
 public Schema.TableType getJdbcTableType() {
 return Schema.TableType.STREAM;
 }
+
+@Override
+public boolean isRolledUp(String s) {
--- End diff --

OK will find origin methods and restore its origin names.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202638350
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. Only available for Storm SQL.
+ * The class doesn't handle reconnection so you may not want to use this 
for production.
+ */
+public class SocketBolt implements IRichBolt {
--- End diff --

Yeah nice finding! Will fix.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202631502
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

@srdo 
Agreed. Regarding updating the doc, I guess most parts of content are 
correct, but unfortunately I don't have resource for diagram so required to 
redraw one. I'd rather explain in content to avoid redraw one (like "Now 
Trident topology in a diagram is replaced to normal Storm topology"). What do 
you think?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202345310
  
--- Diff: sql/README.md ---
@@ -1,187 +1,8 @@
 # Storm SQL
 
-Compile SQL queries to Storm topologies.
+Compile SQL queries to Storm topologies and run.
 
-## Usage
-
-Run the ``storm sql`` command to compile SQL statements into Trident 
topology, and submit it to the Storm cluster
-
-```
-$ bin/storm sql  
-```
-
-In which `sql-file` contains a list of SQL statements to be executed, and 
`topo-name` is the name of the topology.
-
-StormSQL activates `explain mode` and shows query plan instead of 
submitting topology when user specifies `topo-name` as `--explain`.
-Detailed explanation is available from `Showing Query Plan (explain mode)` 
section.
-
-## Supported Features
-
-The following features are supported in the current repository:
-
-* Streaming from and to external data sources
-* Filtering tuples
-* Projections
-* Aggregations (Grouping)
-* User defined function (scalar and aggregate)
-* Join (Inner, Left outer, Right outer, Full outer)
-
-## Specifying External Data Sources
-
-In StormSQL data is represented by external tables. Users can specify data 
sources using the `CREATE EXTERNAL TABLE`
-statement. For example, the following statement specifies a Kafka spouts 
and sink:
-
-```
-CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 
'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
-```
-
-The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in
-[Hive Data Definition 
Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
-
-`PARALLELISM` is StormSQL's own keyword which describes parallelism hint 
for input data source. This is same as providing parallelism hint to Trident 
Spout.
-Downstream operators are executed with same parallelism before repartition 
(Aggregation triggers repartition).
-
-Default value is 1, and this option has no effect on output data source. 
(We might change if needed. Normally repartition is the thing to avoid.)
-
-## Plugging in External Data Sources
-
-Users plug in external data sources through implementing the 
`ISqlTridentDataSource` interface and registers them using
-the mechanisms of Java's service loader. The external data source will be 
chosen based on the scheme of the URI of the
-tables. Please refer to the implementation of `storm-sql-kafka` for more 
details.
-
-## Specifying User Defined Function (UDF)
-
-Users can define user defined function (scalar or aggregate) using `CREATE 
FUNCTION` statement.
-For example, the following statement defines `MYPLUS` function which uses 
`org.apache.storm.sql.TestUtils$MyPlus` class.
-
-```
-CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'
-```
-
-Storm SQL determines whether the function as scalar or aggregate by 
checking which methods are defined.
-If the class defines `evaluate` method, Storm SQL treats the function as 
`scalar`,
-and if the class defines `add` method, Storm SQL treats the function as 
`aggregate`.
-
-Example of class for scalar function is here:
-
-```
-  public class MyPlus {
-public static Integer evaluate(Integer x, Integer y) {
-  return x + y;
-}
-  }
-
-```
-
-and class for aggregate function is here:
-
-```
-  public class MyConcat {
-public static String init() {
-  return "";
-}
-public static String add(String accumulator, String val) {
-  return accumulator + val;
-}
-public static String result(String accumulator) {
-  return accumulator;
-}
-  }
-```
-
-If users don't define `result` method, result is the last return value of 
`add` method.
-Users need to define `result` method only when we need to transform 
accumulated value.
-
-## Example: Filtering Kafka Stream
-
-Let's say there is a Kafka stream that represents the transactions of 
orders. Each message in the stream contains the id
-of the order, the unit price of the product and the quantity of the 
orders. The goal is to filter orders where the
-transactions are significant and to insert these orders into another Kafka 
stream for further analysis.
-
-The user can specify the following SQL statements in the SQL file:
-
-```
-CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY 
INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES 

[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202356815
  
--- Diff: 
sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
 ---
@@ -47,54 +45,60 @@
  * The properties are in JSON format which specifies the name of the 
MongoDB collection and etc.
  */
 public class MongoDataSourcesProvider implements DataSourcesProvider {
+public static final String SCHEME_NAME = "mongodb";
+public static final String VALUE_SERIALIZED_FIELD = "ser.field";
+public static final String TRIDENT_VALUE_SERIALIZED_FIELD = 
"trident.ser.field";
+public static final String DEFAULT_VALUE_SERIALIZED_FIELD = 
"tridentSerField";
+public static final String COLLECTION_NAME = "collection.name";
 
-private static class MongoTridentDataSource implements 
ISqlTridentDataSource {
+private static class MongoStreamsDataSource implements 
ISqlStreamsDataSource {
 private final String url;
 private final Properties props;
 private final IOutputSerializer serializer;
 
-private MongoTridentDataSource(String url, Properties props, 
IOutputSerializer serializer) {
+private MongoStreamsDataSource(String url, Properties props, 
IOutputSerializer serializer) {
 this.url = url;
 this.props = props;
 this.serializer = serializer;
 }
 
 @Override
-public ITridentDataSource getProducer() {
+public IRichSpout getProducer() {
 throw new 
UnsupportedOperationException(this.getClass().getName() + " doesn't provide 
Producer");
 }
 
 @Override
-public SqlTridentConsumer getConsumer() {
+public IRichBolt getConsumer() {
 Preconditions.checkArgument(!props.isEmpty(), "Writable 
MongoDB must contain collection config");
-String serField = props.getProperty("trident.ser.field", 
"tridentSerField");
-MongoMapper mapper = new TridentMongoMapper(serField, 
serializer);
-
-MongoState.Options options = new MongoState.Options()
-.withUrl(url)
-
.withCollectionName(props.getProperty("collection.name"))
-.withMapper(mapper);
-
-StateFactory stateFactory = new MongoStateFactory(options);
-StateUpdater stateUpdater = new MongoStateUpdater();
-
-return new SimpleSqlTridentConsumer(stateFactory, 
stateUpdater);
+String serField;
+if (props.contains(VALUE_SERIALIZED_FIELD)) {
+serField = props.getProperty(VALUE_SERIALIZED_FIELD);
+} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) {
+// backward compatibility
+serField = 
props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD);
--- End diff --

Nit: Since this is targeting 2.0.0, it might be okay not to provide 
backward compatibility.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202355372
  
--- Diff: 
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
 ---
@@ -88,46 +75,12 @@ public void shutDown() throws IOException {
 @SuppressWarnings("unchecked")
 @Test
 public void testHdfsSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
-
-Assert.assertEquals(HdfsStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(HdfsUpdater.class, 
consumer.getStateUpdater().getClass());
-
-HdfsState state = (HdfsState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-StateUpdater stateUpdater = consumer.getStateUpdater();
-
-HdfsFileOptions options = mock(HdfsFileOptions.class);
-Field optionsField = state.getClass().getDeclaredField("options");
-optionsField.setAccessible(true);
-optionsField.set(state, options);
+IRichBolt consumer = ds.getConsumer();
 
-List tupleList = mockTupleList();
-
-for (TridentTuple t : tupleList) {
-stateUpdater.updateState(state, Collections.singletonList(t), 
null);
-try {
-verify(options).execute(Collections.singletonList(t));
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-}
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
+Assert.assertEquals(HdfsBolt.class, consumer.getClass());
--- End diff --

Looking at this again, I think the way to do it would be to use a factory 
like 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
 to instantiate the HdfsBolt. You could pass the factory class name in through 
a property and instantiate it when the data source is created. The default 
factory would just call `new HdfsBolt()`.

I'm not sure if it's worth it. Up to you.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202341073
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

I'm wondering if it would be worth updating this rather than removing it, 
or maybe merging parts of it into one of the other storm-sql docs (e.g. the 
overview doc). I think the page provides a nice overview of what storm-sql is, 
and how it works.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202347946
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---
@@ -118,16 +138,32 @@ public RelDataType getRowType(
 @Override
 public Statistic getStatistic() {
 return stat != null ? stat : Statistics.of(rows.size(),
-   
ImmutableList.of());
+ImmutableList.of());
 }
 
 @Override
 public Schema.TableType getJdbcTableType() {
 return Schema.TableType.STREAM;
 }
+
+@Override
+public boolean isRolledUp(String s) {
--- End diff --

Nit: `s` doesn't really say much as a variable name, can we replace it with 
one that says what this string is? Same for the other methods here.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202359820
  
--- Diff: 
sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java ---
@@ -41,7 +26,46 @@
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.junit.rules.ExternalResource;
+
 public class TestUtils {
+  public static final ExternalResource mockInsertBoltValueResource = new 
ExternalResource() {
+@Override
+protected void before() throws Throwable {
+  MockInsertBolt.getCollectedValues().clear();
+}
+
+@Override
+protected void after() {
+  // no-op
--- End diff --

Nit: I think you can leave out this method entirely


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202358383
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. Only available for Storm SQL.
+ * The class doesn't handle reconnection so you may not want to use this 
for production.
--- End diff --

Does it make sense for us to include code like this if we know it isn't 
suitable for production use? 


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202349103
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
 ---
@@ -72,13 +72,13 @@
 // merge and push unions rules
 UnionEliminatorRule.INSTANCE,
 
-TridentScanRule.INSTANCE,
-TridentFilterRule.INSTANCE,
-TridentProjectRule.INSTANCE,
-TridentAggregateRule.INSTANCE,
-TridentJoinRule.INSTANCE,
-TridentModifyRule.INSTANCE,
-TridentCalcRule.INSTANCE
+StreamsScanRule.INSTANCE,
+StreamsFilterRule.INSTANCE,
+StreamsProjectRule.INSTANCE,
+StreamsAggregateRule.INSTANCE,
--- End diff --

Are we holding on to aggregate and join so they'll be easy to implement 
later, or why are the aggregate and join rules here?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202350280
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import 
org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Values;
+
+public class StreamsStreamInsertRel extends StormStreamInsertRelBase 
implements StreamsRel {
+private final int primaryKeyIndex;
+
+public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet 
traits, RelOptTable table, Prepare.CatalogReader catalogReader,
+  RelNode child, Operation operation, 
List updateColumnList, List sourceExpressionList,
+  boolean flattened, int primaryKeyIndex) {
+super(cluster, traits, table, catalogReader, child, operation, 
updateColumnList, sourceExpressionList, flattened);
+this.primaryKeyIndex = primaryKeyIndex;
+}
+
+@Override
+public RelNode copy(RelTraitSet traitSet, List inputs) {
+return new StreamsStreamInsertRel(getCluster(), traitSet, 
getTable(), getCatalogReader(),
+sole(inputs), getOperation(), getUpdateColumnList(), 
getSourceExpressionList(), isFlattened(), primaryKeyIndex);
+}
+
+@Override
+public void streamsPlan(StreamsPlanCreator planCreator) throws 
Exception {
+// SingleRel
+RelNode input = getInput();
+StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+Stream inputStream = planCreator.pop();
+
+Preconditions.checkArgument(isInsert(), "Only INSERT statement is 
supported.");
+
+// Calcite ensures that the value is structurized to the table 
definition
+// hence we can use PK index directly
+// To elaborate, if table BAR is defined as ID INTEGER PK, NAME 
VARCHAR, DEPTID INTEGER
+// and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is 
executed,
+// Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to 
the value before INSERT.
+
+// FIXME: this should be really different...
--- End diff --

I think we should raise a followup issue immediately and remove the FIXME, 
or just fix it now. I don't really know what the FIXME is saying should be 
changed.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202344785
  
--- Diff: docs/storm-sql.md ---
@@ -6,14 +6,14 @@ documentation: true
 
 The Storm SQL integration allows users to run SQL queries over streaming 
data in Storm. Not only the SQL interface allows faster development cycles on 
streaming analytics, but also opens up the opportunities to unify batch data 
processing like [Apache Hive](///hive.apache.org) and real-time streaming data 
analytics.
 
-At a very high level StormSQL compiles the SQL queries to 
[Trident](Trident-API-Overview.html) topologies and executes them in Storm 
clusters. This document provides information of how to use StormSQL as end 
users. For people that are interested in more details in the design and the 
implementation of StormSQL please refer to the [this](storm-sql-internal.html) 
page.
+At a very high level StormSQL compiles the SQL queries to Storm topologies 
leveraging Streams API and executes them in Storm clusters. This document 
provides information of how to use StormSQL as end users. For people that are 
interested in more details in the design and the implementation of StormSQL 
please refer to the [this](storm-sql-internal.html) page.
--- End diff --

There's a link to storm-sql-internal in this line


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202357967
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. Only available for Storm SQL.
+ * The class doesn't handle reconnection so you may not want to use this 
for production.
+ */
+public class SocketBolt implements IRichBolt {
--- End diff --

Nit: Consider extending `BaseRichBolt` instead, so you don't have to 
implement the methods you don't want to customize


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202282820
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

I removed the document since it is documented based on Trident 
implementation.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155672497
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
 ---
@@ -26,59 +27,61 @@
 import java.io.InputStreamReader;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.storm.Config;
 import org.apache.storm.spout.Scheme;
+import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Trident Spout for Socket data. Only available for Storm SQL, and only 
use for test purposes.
+ * Spout for Socket data. Only available for Storm SQL. This doesn't 
guarantee at-least-once.
--- End diff --

I found myself dumb, I already implemented replay of messages.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155661686
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---
@@ -117,9 +138,21 @@ public TableBuilderInfo parallelismHint(int 
parallelismHint) {
   return this;
 }
 
+// FIXME: we may want to separate Stream and Table, and let output 
table be Table instead of Stream
--- End diff --

https://issues.apache.org/jira/browse/STORM-2848


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155656225
  
--- Diff: 
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
 ---
@@ -88,46 +90,13 @@ public void shutDown() throws IOException {
 @SuppressWarnings("unchecked")
 @Test
 public void testHdfsSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
+IRichBolt consumer = ds.getConsumer();
 
-Assert.assertEquals(HdfsStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(HdfsUpdater.class, 
consumer.getStateUpdater().getClass());
-
-HdfsState state = (HdfsState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-StateUpdater stateUpdater = consumer.getStateUpdater();
-
-HdfsFileOptions options = mock(HdfsFileOptions.class);
-Field optionsField = state.getClass().getDeclaredField("options");
-optionsField.setAccessible(true);
-optionsField.set(state, options);
-
-List tupleList = mockTupleList();
-
-for (TridentTuple t : tupleList) {
-stateUpdater.updateState(state, Collections.singletonList(t), 
null);
-try {
-verify(options).execute(Collections.singletonList(t));
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-}
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
+Assert.assertEquals(HdfsBolt.class, consumer.getClass());
--- End diff --

Ok. Maybe the previous test isn't really relevant anymore anyway.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155655708
  
--- Diff: 
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
 ---
@@ -88,46 +90,13 @@ public void shutDown() throws IOException {
 @SuppressWarnings("unchecked")
 @Test
 public void testHdfsSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
+IRichBolt consumer = ds.getConsumer();
 
-Assert.assertEquals(HdfsStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(HdfsUpdater.class, 
consumer.getStateUpdater().getClass());
-
-HdfsState state = (HdfsState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-StateUpdater stateUpdater = consumer.getStateUpdater();
-
-HdfsFileOptions options = mock(HdfsFileOptions.class);
-Field optionsField = state.getClass().getDeclaredField("options");
-optionsField.setAccessible(true);
-optionsField.set(state, options);
-
-List tupleList = mockTupleList();
-
-for (TridentTuple t : tupleList) {
-stateUpdater.updateState(state, Collections.singletonList(t), 
null);
-try {
-verify(options).execute(Collections.singletonList(t));
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-}
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
+Assert.assertEquals(HdfsBolt.class, consumer.getClass());
--- End diff --

Nope. The bolt instance is already initialized. That's why I couldn't use 
mockito's feature.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155654493
  
--- Diff: 
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
 ---
@@ -88,46 +90,13 @@ public void shutDown() throws IOException {
 @SuppressWarnings("unchecked")
 @Test
 public void testHdfsSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
+IRichBolt consumer = ds.getConsumer();
 
-Assert.assertEquals(HdfsStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(HdfsUpdater.class, 
consumer.getStateUpdater().getClass());
-
-HdfsState state = (HdfsState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-StateUpdater stateUpdater = consumer.getStateUpdater();
-
-HdfsFileOptions options = mock(HdfsFileOptions.class);
-Field optionsField = state.getClass().getDeclaredField("options");
-optionsField.setAccessible(true);
-optionsField.set(state, options);
-
-List tupleList = mockTupleList();
-
-for (TridentTuple t : tupleList) {
-stateUpdater.updateState(state, Collections.singletonList(t), 
null);
-try {
-verify(options).execute(Collections.singletonList(t));
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-}
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
+Assert.assertEquals(HdfsBolt.class, consumer.getClass());
--- End diff --

You can put a spy 
(https://static.javadoc.io/org.mockito/mockito-core/2.13.0/org/mockito/Mockito.html#spy)
 around it, but that only works if the calls you're interested in happen after 
you get access to the bolt reference. I'm not sure if that's what you need?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155653625
  
--- Diff: 
sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java ---
@@ -190,363 +191,213 @@ public void open(ChannelContext ctx) {
 }
   }
 
-  public static class MockState implements State {
-/**
- * Collect all values in a static variable as the instance will go 
through serialization and deserialization.
- * NOTE: This should be cleared before or after running each test.
- */
-private transient static final List VALUES = new 
ArrayList<>();
+  public static class MockSpout extends BaseRichSpout {
 
-public static List getCollectedValues() {
-  return VALUES;
+private final List records;
+private final Fields outputFields;
+private boolean emitted = false;
+private SpoutOutputCollector collector;
+
+public MockSpout(List records, Fields outputFields) {
+  this.records = records;
+  this.outputFields = outputFields;
 }
 
 @Override
-public void beginCommit(Long txid) {
-  // NOOP
+public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+  this.collector = collector;
 }
 
 @Override
-public void commit(Long txid) {
-  // NOOP
-}
+public void nextTuple() {
+  if (emitted) {
+return;
+  }
 
-public void updateState(List tuples, TridentCollector 
collector) {
-  for (TridentTuple tuple : tuples) {
-VALUES.add(tuple.getValues());
+  for (Values r : records) {
+collector.emit(r);
   }
-}
-  }
 
-  public static class MockStateFactory implements StateFactory {
+  emitted = true;
+}
 
 @Override
-public State makeState(Map conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
-  return new MockState();
+public void declareOutputFields(OutputFieldsDeclarer declarer) {
+  declarer.declare(outputFields);
 }
   }
 
-  public static class MockStateUpdater implements StateUpdater {
+  public static class MockBolt extends BaseRichBolt {
--- End diff --

Thanks for the information. I'll try to apply on that.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155651046
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelTable.java ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.calcite;
+
+public interface ParallelTable extends StormTable {
+
+/**
+ * Returns parallelism hint of this table. Returns null if don't know.
--- End diff --

Will fix.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155650878
  
--- Diff: 
sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
 ---
@@ -68,62 +70,13 @@
 @SuppressWarnings("unchecked")
 @Test
 public void testKafkaSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create("kafka://mock?topic=foo"), null, null, 
TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
+IRichBolt consumer = ds.getConsumer();
 
-Assert.assertEquals(TridentKafkaStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(TridentKafkaUpdater.class, 
consumer.getStateUpdater().getClass());
-
-TridentKafkaState state = (TridentKafkaState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-KafkaProducer producer = mock(KafkaProducer.class);
-
doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
-Field producerField = 
state.getClass().getDeclaredField("producer");
-producerField.setAccessible(true);
-producerField.set(state, producer);
-
-List tupleList = mockTupleList();
-for (TridentTuple t : tupleList) {
-state.updateState(Collections.singletonList(t), null);
-verify(producer).send(argThat(new KafkaMessageMatcher(t)));
-}
-verifyNoMoreInteractions(producer);
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
-}
-
-private static class KafkaMessageMatcher implements 
ArgumentMatcher> {
-
-private static final int PRIMARY_INDEX = 0;
-private final TridentTuple tuple;
-
-private KafkaMessageMatcher(TridentTuple tuple) {
-this.tuple = tuple;
-}
-
-@SuppressWarnings("unchecked")
-@Override
-public boolean matches(ProducerRecord record) {
-if (record.key() != tuple.get(PRIMARY_INDEX)) {
-return false;
-}
-ByteBuffer buf = record.value();
-ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
-return b.equals(buf);
-}
+Assert.assertEquals(KafkaBolt.class, consumer.getClass());
--- End diff --

Same answer.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155650836
  
--- Diff: 
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
 ---
@@ -88,46 +90,13 @@ public void shutDown() throws IOException {
 @SuppressWarnings("unchecked")
 @Test
 public void testHdfsSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
+IRichBolt consumer = ds.getConsumer();
 
-Assert.assertEquals(HdfsStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(HdfsUpdater.class, 
consumer.getStateUpdater().getClass());
-
-HdfsState state = (HdfsState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-StateUpdater stateUpdater = consumer.getStateUpdater();
-
-HdfsFileOptions options = mock(HdfsFileOptions.class);
-Field optionsField = state.getClass().getDeclaredField("options");
-optionsField.setAccessible(true);
-optionsField.set(state, options);
-
-List tupleList = mockTupleList();
-
-for (TridentTuple t : tupleList) {
-stateUpdater.updateState(state, Collections.singletonList(t), 
null);
-try {
-verify(options).execute(Collections.singletonList(t));
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-}
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
+Assert.assertEquals(HdfsBolt.class, consumer.getClass());
--- End diff --

Unfortunately external Bolt implementations are less standarilzed than 
Trident State and State Updater implementations. We can't mock all the 
internals in bolt and then we can't get control of the bolt. It was relatively 
easy for Trident State implementations.

Maybe ideal approach to test the provider is checking Bolt's configuration 
and see provider configuration is applied to the bolt. I was trying it but 
noticed that I can't get current configuration from the bolt.
Do you have an idea to mock the class to track the method calls for 
instances even I don't get control to create the instance?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155647898
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---
@@ -117,9 +138,21 @@ public TableBuilderInfo parallelismHint(int 
parallelismHint) {
   return this;
 }
 
+// FIXME: we may want to separate Stream and Table, and let output 
table be Table instead of Stream
--- End diff --

Thanks


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155647861
  
--- Diff: sql/storm-sql-core/pom.xml ---
@@ -66,10 +66,34 @@
 calcite-core
--- End diff --

Good. Thanks for the reminder. :)


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155647803
  
--- Diff: sql/storm-sql-core/pom.xml ---
@@ -66,10 +66,34 @@
 calcite-core
 ${calcite.version}
 
+
--- End diff --

Makes sense if we don't need them.

If you're trying to avoid version conflicts for transitive dependencies I 
think adding a `` section to set the versions is better. 
It lets you set which version of the dependency you want, and then all 
transitive includes of that dependency will be the specified version. I think 
we do this with e.g. the Jackson dependencies in the Storm parent pom.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155646360
  
--- Diff: sql/storm-sql-core/pom.xml ---
@@ -66,10 +66,34 @@
 calcite-core
--- End diff --

Sure. I didn't mean to say that you should do any extra work on checkstyle 
in this, just wanted to make sure reducing the counter wasn't being forgotten 
by accident.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155646317
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
 ---
@@ -26,59 +27,61 @@
 import java.io.InputStreamReader;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.storm.Config;
 import org.apache.storm.spout.Scheme;
+import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Trident Spout for Socket data. Only available for Storm SQL, and only 
use for test purposes.
+ * Spout for Socket data. Only available for Storm SQL. This doesn't 
guarantee at-least-once.
--- End diff --

Same thing for spout. I think I need to add same comment from bolt and also 
mentioning that it doesn't support replaying so at-most-once semantic.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155645948
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. The class is for test purpose and 
doesn't handle reconnection or so.
--- End diff --

I think removing "or so" is clearer. What I meant for test is not unit 
test, just for users to test query before attaching external source. Maybe we 
can just remove mentioning about test purpose and mention that it doesn't 
handle reconnection and support replaying so at-most-once semantic.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155644582
  
--- Diff: 
sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
 ---
@@ -49,53 +46,56 @@
  */
 public class MongoDataSourcesProvider implements DataSourcesProvider {
 
-private static class MongoTridentDataSource implements 
ISqlTridentDataSource {
+private static class MongoStreamsDataSource implements 
ISqlStreamsDataSource {
 private final String url;
 private final Properties props;
 private final IOutputSerializer serializer;
 
-private MongoTridentDataSource(String url, Properties props, 
IOutputSerializer serializer) {
+private MongoStreamsDataSource(String url, Properties props, 
IOutputSerializer serializer) {
 this.url = url;
 this.props = props;
 this.serializer = serializer;
 }
 
 @Override
-public ITridentDataSource getProducer() {
+public IRichSpout getProducer() {
 throw new 
UnsupportedOperationException(this.getClass().getName() + " doesn't provide 
Producer");
 }
 
 @Override
-public SqlTridentConsumer getConsumer() {
+public IRichBolt getConsumer() {
 Preconditions.checkArgument(!props.isEmpty(), "Writable 
MongoDB must contain collection config");
-String serField = props.getProperty("trident.ser.field", 
"tridentSerField");
-MongoMapper mapper = new TridentMongoMapper(serField, 
serializer);
 
-MongoState.Options options = new MongoState.Options()
-.withUrl(url)
-
.withCollectionName(props.getProperty("collection.name"))
-.withMapper(mapper);
+String serField;
+if (props.contains("ser.field")) {
+serField = props.getProperty("ser.field");
+} else if (props.contains("trident.ser.field")) {
--- End diff --

I think I missed it. Nice finding. Will address. Btw it is specific to 
mongo config, so constants will be put in same class.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155644012
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---
@@ -117,9 +138,21 @@ public TableBuilderInfo parallelismHint(int 
parallelismHint) {
   return this;
 }
 
+// FIXME: we may want to separate Stream and Table, and let output 
table be Table instead of Stream
--- End diff --

OK I'll file an issue and remove the comment.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155643727
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---
@@ -89,16 +95,31 @@ public TableBuilderInfo field(String name, RelDataType 
type) {
   return this;
 }
 
+public TableBuilderInfo field(String name, SqlTypeName type, 
ColumnConstraint constraint) {
+  interpretConstraint(constraint, fields.size());
+  return field(name, typeFactory.createSqlType(type));
+}
+
+public TableBuilderInfo field(String name, RelDataType type, 
ColumnConstraint constraint) {
+  interpretConstraint(constraint, fields.size());
+  fields.add(new FieldType(name, type));
+  return this;
+}
+
 public TableBuilderInfo field(String name, SqlDataTypeSpec type, 
ColumnConstraint constraint) {
   RelDataType dataType = type.deriveType(typeFactory);
+  interpretConstraint(constraint, fields.size());
+  fields.add(new FieldType(name, dataType));
+  return this;
+}
+
+private void interpretConstraint(ColumnConstraint constraint, int 
fieldIdx) {
   if (constraint instanceof ColumnConstraint.PrimaryKey) {
 ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) 
constraint;
 Preconditions.checkState(primaryKey == -1, "There are more than 
one primary key in the table");
--- End diff --

Will fix.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155643640
  
--- Diff: sql/storm-sql-core/pom.xml ---
@@ -66,10 +66,34 @@
 calcite-core
 ${calcite.version}
 
+
--- End diff --

They're for supporting another parts of Calcite and unneeded dependencies 
for us. Some dependencies are something already we have, so also good to avoid 
the version conflict.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155643000
  
--- Diff: sql/storm-sql-core/pom.xml ---
@@ -66,10 +66,34 @@
 calcite-core
--- End diff --

I'll see how the number is changed. It might be remain the same if I fixed 
checkstyle issue from A and violated from B. 
Actually I'm focusing on planning and addressing window aggregation and 
join, so TBH I'd like to address the checkstyle issue later altogether.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r154738635
  
--- Diff: sql/storm-sql-core/pom.xml ---
@@ -66,10 +66,34 @@
 calcite-core
 ${calcite.version}
 
+
--- End diff --

Why the exclusions?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155320443
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelTable.java ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.calcite;
+
+public interface ParallelTable extends StormTable {
+
+/**
+ * Returns parallelism hint of this table. Returns null if don't know.
--- End diff --

nit: Returns null if not known.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155515549
  
--- Diff: 
sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
 ---
@@ -49,53 +46,56 @@
  */
 public class MongoDataSourcesProvider implements DataSourcesProvider {
 
-private static class MongoTridentDataSource implements 
ISqlTridentDataSource {
+private static class MongoStreamsDataSource implements 
ISqlStreamsDataSource {
 private final String url;
 private final Properties props;
 private final IOutputSerializer serializer;
 
-private MongoTridentDataSource(String url, Properties props, 
IOutputSerializer serializer) {
+private MongoStreamsDataSource(String url, Properties props, 
IOutputSerializer serializer) {
 this.url = url;
 this.props = props;
 this.serializer = serializer;
 }
 
 @Override
-public ITridentDataSource getProducer() {
+public IRichSpout getProducer() {
 throw new 
UnsupportedOperationException(this.getClass().getName() + " doesn't provide 
Producer");
 }
 
 @Override
-public SqlTridentConsumer getConsumer() {
+public IRichBolt getConsumer() {
 Preconditions.checkArgument(!props.isEmpty(), "Writable 
MongoDB must contain collection config");
-String serField = props.getProperty("trident.ser.field", 
"tridentSerField");
-MongoMapper mapper = new TridentMongoMapper(serField, 
serializer);
 
-MongoState.Options options = new MongoState.Options()
-.withUrl(url)
-
.withCollectionName(props.getProperty("collection.name"))
-.withMapper(mapper);
+String serField;
+if (props.contains("ser.field")) {
+serField = props.getProperty("ser.field");
+} else if (props.contains("trident.ser.field")) {
--- End diff --

Nit: Are there constants somewhere that could be used instead of literals?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155320991
  
--- Diff: sql/storm-sql-core/pom.xml ---
@@ -66,10 +66,34 @@
 calcite-core
--- End diff --

You might want to reduce the allowed checkstyle errors in the poms if you 
reduced the violations.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155516637
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. The class is for test purpose and 
doesn't handle reconnection or so.
--- End diff --

What is meant by "or so"?

Also if this class is only for tests, can it be moved to test scope? And if 
not, can it be moved into a package for test classes so it's clear that it's 
only for tests?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155520270
  
--- Diff: 
sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java ---
@@ -190,363 +191,213 @@ public void open(ChannelContext ctx) {
 }
   }
 
-  public static class MockState implements State {
-/**
- * Collect all values in a static variable as the instance will go 
through serialization and deserialization.
- * NOTE: This should be cleared before or after running each test.
- */
-private transient static final List VALUES = new 
ArrayList<>();
+  public static class MockSpout extends BaseRichSpout {
 
-public static List getCollectedValues() {
-  return VALUES;
+private final List records;
+private final Fields outputFields;
+private boolean emitted = false;
+private SpoutOutputCollector collector;
+
+public MockSpout(List records, Fields outputFields) {
+  this.records = records;
+  this.outputFields = outputFields;
 }
 
 @Override
-public void beginCommit(Long txid) {
-  // NOOP
+public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+  this.collector = collector;
 }
 
 @Override
-public void commit(Long txid) {
-  // NOOP
-}
+public void nextTuple() {
+  if (emitted) {
+return;
+  }
 
-public void updateState(List tuples, TridentCollector 
collector) {
-  for (TridentTuple tuple : tuples) {
-VALUES.add(tuple.getValues());
+  for (Values r : records) {
+collector.emit(r);
   }
-}
-  }
 
-  public static class MockStateFactory implements StateFactory {
+  emitted = true;
+}
 
 @Override
-public State makeState(Map conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
-  return new MockState();
+public void declareOutputFields(OutputFieldsDeclarer declarer) {
+  declarer.declare(outputFields);
 }
   }
 
-  public static class MockStateUpdater implements StateUpdater {
+  public static class MockBolt extends BaseRichBolt {
--- End diff --

I think it could be helpful to use a JUnit TestRule (e.g. by subclassing 
http://junit.org/junit4/javadoc/4.12/org/junit/rules/ExternalResource.html) to 
help remember to clear VALUES after the test. It's easy to forget to clean up, 
and then the tests leak state.

Same for the other test utils that keep static state.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155321227
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---
@@ -117,9 +138,21 @@ public TableBuilderInfo parallelismHint(int 
parallelismHint) {
   return this;
 }
 
+// FIXME: we may want to separate Stream and Table, and let output 
table be Table instead of Stream
--- End diff --

Can you put this in an issue instead? I think it will be forgotten if left 
as a TODO in the code.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155321576
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---
@@ -89,16 +95,31 @@ public TableBuilderInfo field(String name, RelDataType 
type) {
   return this;
 }
 
+public TableBuilderInfo field(String name, SqlTypeName type, 
ColumnConstraint constraint) {
+  interpretConstraint(constraint, fields.size());
+  return field(name, typeFactory.createSqlType(type));
+}
+
+public TableBuilderInfo field(String name, RelDataType type, 
ColumnConstraint constraint) {
+  interpretConstraint(constraint, fields.size());
+  fields.add(new FieldType(name, type));
+  return this;
+}
+
 public TableBuilderInfo field(String name, SqlDataTypeSpec type, 
ColumnConstraint constraint) {
   RelDataType dataType = type.deriveType(typeFactory);
+  interpretConstraint(constraint, fields.size());
+  fields.add(new FieldType(name, dataType));
+  return this;
+}
+
+private void interpretConstraint(ColumnConstraint constraint, int 
fieldIdx) {
   if (constraint instanceof ColumnConstraint.PrimaryKey) {
 ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) 
constraint;
 Preconditions.checkState(primaryKey == -1, "There are more than 
one primary key in the table");
--- End diff --

Nit: There are -> There is


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155514626
  
--- Diff: 
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
 ---
@@ -88,46 +90,13 @@ public void shutDown() throws IOException {
 @SuppressWarnings("unchecked")
 @Test
 public void testHdfsSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
+IRichBolt consumer = ds.getConsumer();
 
-Assert.assertEquals(HdfsStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(HdfsUpdater.class, 
consumer.getStateUpdater().getClass());
-
-HdfsState state = (HdfsState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-StateUpdater stateUpdater = consumer.getStateUpdater();
-
-HdfsFileOptions options = mock(HdfsFileOptions.class);
-Field optionsField = state.getClass().getDeclaredField("options");
-optionsField.setAccessible(true);
-optionsField.set(state, options);
-
-List tupleList = mockTupleList();
-
-for (TridentTuple t : tupleList) {
-stateUpdater.updateState(state, Collections.singletonList(t), 
null);
-try {
-verify(options).execute(Collections.singletonList(t));
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-}
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
+Assert.assertEquals(HdfsBolt.class, consumer.getClass());
--- End diff --

This seems to be testing much less than before. Is the test no longer 
relevant?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155517029
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
 ---
@@ -26,59 +27,61 @@
 import java.io.InputStreamReader;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.storm.Config;
 import org.apache.storm.spout.Scheme;
+import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Trident Spout for Socket data. Only available for Storm SQL, and only 
use for test purposes.
+ * Spout for Socket data. Only available for Storm SQL. This doesn't 
guarantee at-least-once.
--- End diff --

Is it now intended for non-test use?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2017-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r155515253
  
--- Diff: 
sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
 ---
@@ -68,62 +70,13 @@
 @SuppressWarnings("unchecked")
 @Test
 public void testKafkaSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create("kafka://mock?topic=foo"), null, null, 
TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
+IRichBolt consumer = ds.getConsumer();
 
-Assert.assertEquals(TridentKafkaStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(TridentKafkaUpdater.class, 
consumer.getStateUpdater().getClass());
-
-TridentKafkaState state = (TridentKafkaState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-KafkaProducer producer = mock(KafkaProducer.class);
-
doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
-Field producerField = 
state.getClass().getDeclaredField("producer");
-producerField.setAccessible(true);
-producerField.set(state, producer);
-
-List tupleList = mockTupleList();
-for (TridentTuple t : tupleList) {
-state.updateState(Collections.singletonList(t), null);
-verify(producer).send(argThat(new KafkaMessageMatcher(t)));
-}
-verifyNoMoreInteractions(producer);
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
-}
-
-private static class KafkaMessageMatcher implements 
ArgumentMatcher> {
-
-private static final int PRIMARY_INDEX = 0;
-private final TridentTuple tuple;
-
-private KafkaMessageMatcher(TridentTuple tuple) {
-this.tuple = tuple;
-}
-
-@SuppressWarnings("unchecked")
-@Override
-public boolean matches(ProducerRecord record) {
-if (record.key() != tuple.get(PRIMARY_INDEX)) {
-return false;
-}
-ByteBuffer buf = record.value();
-ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
-return b.equals(buf);
-}
+Assert.assertEquals(KafkaBolt.class, consumer.getClass());
--- End diff --

Same question as for the HDFS test


---