[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java

2018-07-13 Thread danny0405
Github user danny0405 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2752#discussion_r202504021
  
--- Diff: storm-core/src/clj/org/apache/storm/ui/core.clj ---
@@ -1613,24 +1613,24 @@
   https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
   https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
   (StormMetricsRegistry/startMetricsReporters conf)
-  (UIHelpers/stormRunJetty  (int (conf UI-PORT))
-(conf UI-HOST)
-https-port
-header-buffer-size
-(reify IConfigurator
+  (UIHelpers/stormRunJetty (int (conf UI-PORT))
+   (conf UI-HOST)
+   https-port
+   header-buffer-size
+   (reify IConfigurator
   (execute [this server]
 (UIHelpers/configSsl server
-  https-port
-  https-ks-path
-  https-ks-password
-  https-ks-type
-  https-key-password
-  https-ts-path
-  https-ts-password
-  https-ts-type
-  https-need-client-auth
-  https-want-client-auth
-  header-buffer-size)
+ https-port
+ https-ks-path
+ https-ks-password
+ https-ks-type
+ https-key-password
+ https-ts-path
+ https-ts-password
+ https-ts-type
+ 
https-need-client-auth
+ 
https-want-client-auth
+ 
header-buffer-size)
--- End diff --

This file has many nits whitespace.


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2762


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary to StormMetricsReg...

2018-07-13 Thread zd-project
GitHub user zd-project opened a pull request:

https://github.com/apache/storm/pull/2764

 STORM-3147: Port ClusterSummary to StormMetricsRegistry

This PR depends on #2763 
The implementation is kind of ugly right now due to caching and 
synchronization in metrics update. I hope you can give out some advice in 
improvement. @revans2 @HeartSaVioR @srdo 

@kishorvpatil I've listed out all metrics I think worth implementing from 
ClusterSummay in Measured enum. I'm not sure if there's value in 
SUPERVISOR_TOTAL_RESOURCE, TOPOLOGY_STATUS, and TOPOLOGY_SCHED_STATUS. I'd like 
to hear what you think.

I have also discovered a potential bug of scheduler when working on this 
[STORM-3151](https://issues.apache.org/jira/browse/STORM-3151), can you help 
confirm it?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zd-project/storm STORM-3147

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2764.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2764


commit 802014c88a67ae5ef044c1f87889d976063eee71
Author: Zhengdai Hu 
Date:   2018-07-12T16:31:00Z

STORM-3150: Improve Gauge registration methods and refactored code

commit 4a8f63066dcede70a8df483383d6b2bcd616d299
Author: Zhengdai Hu 
Date:   2018-07-13T19:22:20Z

STORM-3147: Port ClusterSummary to StormMetricsRegistry




---


[GitHub] storm pull request #2763: STORM-3150: Improve Gauge registration methods and...

2018-07-13 Thread zd-project
GitHub user zd-project opened a pull request:

https://github.com/apache/storm/pull/2763

STORM-3150: Improve Gauge registration methods and refactored code

STORM-3150

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zd-project/storm STORM-3150

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2763.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2763


commit 802014c88a67ae5ef044c1f87889d976063eee71
Author: Zhengdai Hu 
Date:   2018-07-12T16:31:00Z

STORM-3150: Improve Gauge registration methods and refactored code




---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202431877
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
--- End diff --

There is no way to disable backpressure right now.  It didn't crash because 
this is running under netty.   Netty will catch the exception and pass it off 
to an exception handler/the future.  I didn't trace down the exact path and why 
we were not handling this exception to cause it to come down, but I will file a 
follow on JIRA to examine it.


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202431655
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,24 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
+//Just in case we get something we are confused 
about
+// we can continue processing the rest of the tasks
+LOG.error("BP index out of bounds {}", e);
--- End diff --

Yes, because when this was happening it was happening everywhere, and there 
is only one line under the try catch block so there isn't a lot of confusion 
about what happened.


---


[GitHub] storm issue #2744: [STORM-3132] Avoid NPE in the Values Constructor

2018-07-13 Thread kishorvpatil
Github user kishorvpatil commented on the issue:

https://github.com/apache/storm/pull/2744
  
@HeartSaVioR Removed unwanted condition.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202345310
  
--- Diff: sql/README.md ---
@@ -1,187 +1,8 @@
 # Storm SQL
 
-Compile SQL queries to Storm topologies.
+Compile SQL queries to Storm topologies and run.
 
-## Usage
-
-Run the ``storm sql`` command to compile SQL statements into Trident 
topology, and submit it to the Storm cluster
-
-```
-$ bin/storm sql  
-```
-
-In which `sql-file` contains a list of SQL statements to be executed, and 
`topo-name` is the name of the topology.
-
-StormSQL activates `explain mode` and shows query plan instead of 
submitting topology when user specifies `topo-name` as `--explain`.
-Detailed explanation is available from `Showing Query Plan (explain mode)` 
section.
-
-## Supported Features
-
-The following features are supported in the current repository:
-
-* Streaming from and to external data sources
-* Filtering tuples
-* Projections
-* Aggregations (Grouping)
-* User defined function (scalar and aggregate)
-* Join (Inner, Left outer, Right outer, Full outer)
-
-## Specifying External Data Sources
-
-In StormSQL data is represented by external tables. Users can specify data 
sources using the `CREATE EXTERNAL TABLE`
-statement. For example, the following statement specifies a Kafka spouts 
and sink:
-
-```
-CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 
'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES 
'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
-```
-
-The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in
-[Hive Data Definition 
Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
-
-`PARALLELISM` is StormSQL's own keyword which describes parallelism hint 
for input data source. This is same as providing parallelism hint to Trident 
Spout.
-Downstream operators are executed with same parallelism before repartition 
(Aggregation triggers repartition).
-
-Default value is 1, and this option has no effect on output data source. 
(We might change if needed. Normally repartition is the thing to avoid.)
-
-## Plugging in External Data Sources
-
-Users plug in external data sources through implementing the 
`ISqlTridentDataSource` interface and registers them using
-the mechanisms of Java's service loader. The external data source will be 
chosen based on the scheme of the URI of the
-tables. Please refer to the implementation of `storm-sql-kafka` for more 
details.
-
-## Specifying User Defined Function (UDF)
-
-Users can define user defined function (scalar or aggregate) using `CREATE 
FUNCTION` statement.
-For example, the following statement defines `MYPLUS` function which uses 
`org.apache.storm.sql.TestUtils$MyPlus` class.
-
-```
-CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'
-```
-
-Storm SQL determines whether the function as scalar or aggregate by 
checking which methods are defined.
-If the class defines `evaluate` method, Storm SQL treats the function as 
`scalar`,
-and if the class defines `add` method, Storm SQL treats the function as 
`aggregate`.
-
-Example of class for scalar function is here:
-
-```
-  public class MyPlus {
-public static Integer evaluate(Integer x, Integer y) {
-  return x + y;
-}
-  }
-
-```
-
-and class for aggregate function is here:
-
-```
-  public class MyConcat {
-public static String init() {
-  return "";
-}
-public static String add(String accumulator, String val) {
-  return accumulator + val;
-}
-public static String result(String accumulator) {
-  return accumulator;
-}
-  }
-```
-
-If users don't define `result` method, result is the last return value of 
`add` method.
-Users need to define `result` method only when we need to transform 
accumulated value.
-
-## Example: Filtering Kafka Stream
-
-Let's say there is a Kafka stream that represents the transactions of 
orders. Each message in the stream contains the id
-of the order, the unit price of the product and the quantity of the 
orders. The goal is to filter orders where the
-transactions are significant and to insert these orders into another Kafka 
stream for further analysis.
-
-The user can specify the following SQL statements in the SQL file:
-
-```
-CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY 
INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES 

[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202356815
  
--- Diff: 
sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
 ---
@@ -47,54 +45,60 @@
  * The properties are in JSON format which specifies the name of the 
MongoDB collection and etc.
  */
 public class MongoDataSourcesProvider implements DataSourcesProvider {
+public static final String SCHEME_NAME = "mongodb";
+public static final String VALUE_SERIALIZED_FIELD = "ser.field";
+public static final String TRIDENT_VALUE_SERIALIZED_FIELD = 
"trident.ser.field";
+public static final String DEFAULT_VALUE_SERIALIZED_FIELD = 
"tridentSerField";
+public static final String COLLECTION_NAME = "collection.name";
 
-private static class MongoTridentDataSource implements 
ISqlTridentDataSource {
+private static class MongoStreamsDataSource implements 
ISqlStreamsDataSource {
 private final String url;
 private final Properties props;
 private final IOutputSerializer serializer;
 
-private MongoTridentDataSource(String url, Properties props, 
IOutputSerializer serializer) {
+private MongoStreamsDataSource(String url, Properties props, 
IOutputSerializer serializer) {
 this.url = url;
 this.props = props;
 this.serializer = serializer;
 }
 
 @Override
-public ITridentDataSource getProducer() {
+public IRichSpout getProducer() {
 throw new 
UnsupportedOperationException(this.getClass().getName() + " doesn't provide 
Producer");
 }
 
 @Override
-public SqlTridentConsumer getConsumer() {
+public IRichBolt getConsumer() {
 Preconditions.checkArgument(!props.isEmpty(), "Writable 
MongoDB must contain collection config");
-String serField = props.getProperty("trident.ser.field", 
"tridentSerField");
-MongoMapper mapper = new TridentMongoMapper(serField, 
serializer);
-
-MongoState.Options options = new MongoState.Options()
-.withUrl(url)
-
.withCollectionName(props.getProperty("collection.name"))
-.withMapper(mapper);
-
-StateFactory stateFactory = new MongoStateFactory(options);
-StateUpdater stateUpdater = new MongoStateUpdater();
-
-return new SimpleSqlTridentConsumer(stateFactory, 
stateUpdater);
+String serField;
+if (props.contains(VALUE_SERIALIZED_FIELD)) {
+serField = props.getProperty(VALUE_SERIALIZED_FIELD);
+} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) {
+// backward compatibility
+serField = 
props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD);
--- End diff --

Nit: Since this is targeting 2.0.0, it might be okay not to provide 
backward compatibility.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202355372
  
--- Diff: 
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
 ---
@@ -88,46 +75,12 @@ public void shutDown() throws IOException {
 @SuppressWarnings("unchecked")
 @Test
 public void testHdfsSink() throws Exception {
-ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
+ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
 URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
 Assert.assertNotNull(ds);
 
-ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
-
-Assert.assertEquals(HdfsStateFactory.class, 
consumer.getStateFactory().getClass());
-Assert.assertEquals(HdfsUpdater.class, 
consumer.getStateUpdater().getClass());
-
-HdfsState state = (HdfsState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
-StateUpdater stateUpdater = consumer.getStateUpdater();
-
-HdfsFileOptions options = mock(HdfsFileOptions.class);
-Field optionsField = state.getClass().getDeclaredField("options");
-optionsField.setAccessible(true);
-optionsField.set(state, options);
+IRichBolt consumer = ds.getConsumer();
 
-List tupleList = mockTupleList();
-
-for (TridentTuple t : tupleList) {
-stateUpdater.updateState(state, Collections.singletonList(t), 
null);
-try {
-verify(options).execute(Collections.singletonList(t));
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-}
-}
-
-private static List mockTupleList() {
-List tupleList = new ArrayList<>();
-TridentTuple t0 = mock(TridentTuple.class);
-TridentTuple t1 = mock(TridentTuple.class);
-doReturn(1).when(t0).get(0);
-doReturn(2).when(t1).get(0);
-doReturn(Lists.newArrayList(1, "2")).when(t0).getValues();
-doReturn(Lists.newArrayList(2, "3")).when(t1).getValues();
-tupleList.add(t0);
-tupleList.add(t1);
-return tupleList;
+Assert.assertEquals(HdfsBolt.class, consumer.getClass());
--- End diff --

Looking at this again, I think the way to do it would be to use a factory 
like 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
 to instantiate the HdfsBolt. You could pass the factory class name in through 
a property and instantiate it when the data source is created. The default 
factory would just call `new HdfsBolt()`.

I'm not sure if it's worth it. Up to you.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202341073
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

I'm wondering if it would be worth updating this rather than removing it, 
or maybe merging parts of it into one of the other storm-sql docs (e.g. the 
overview doc). I think the page provides a nice overview of what storm-sql is, 
and how it works.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202347946
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java ---
@@ -118,16 +138,32 @@ public RelDataType getRowType(
 @Override
 public Statistic getStatistic() {
 return stat != null ? stat : Statistics.of(rows.size(),
-   
ImmutableList.of());
+ImmutableList.of());
 }
 
 @Override
 public Schema.TableType getJdbcTableType() {
 return Schema.TableType.STREAM;
 }
+
+@Override
+public boolean isRolledUp(String s) {
--- End diff --

Nit: `s` doesn't really say much as a variable name, can we replace it with 
one that says what this string is? Same for the other methods here.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202359820
  
--- Diff: 
sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java ---
@@ -41,7 +26,46 @@
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.junit.rules.ExternalResource;
+
 public class TestUtils {
+  public static final ExternalResource mockInsertBoltValueResource = new 
ExternalResource() {
+@Override
+protected void before() throws Throwable {
+  MockInsertBolt.getCollectedValues().clear();
+}
+
+@Override
+protected void after() {
+  // no-op
--- End diff --

Nit: I think you can leave out this method entirely


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202358383
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. Only available for Storm SQL.
+ * The class doesn't handle reconnection so you may not want to use this 
for production.
--- End diff --

Does it make sense for us to include code like this if we know it isn't 
suitable for production use? 


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202349103
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
 ---
@@ -72,13 +72,13 @@
 // merge and push unions rules
 UnionEliminatorRule.INSTANCE,
 
-TridentScanRule.INSTANCE,
-TridentFilterRule.INSTANCE,
-TridentProjectRule.INSTANCE,
-TridentAggregateRule.INSTANCE,
-TridentJoinRule.INSTANCE,
-TridentModifyRule.INSTANCE,
-TridentCalcRule.INSTANCE
+StreamsScanRule.INSTANCE,
+StreamsFilterRule.INSTANCE,
+StreamsProjectRule.INSTANCE,
+StreamsAggregateRule.INSTANCE,
--- End diff --

Are we holding on to aggregate and join so they'll be easy to implement 
later, or why are the aggregate and join rules here?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202350280
  
--- Diff: 
sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import 
org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Values;
+
+public class StreamsStreamInsertRel extends StormStreamInsertRelBase 
implements StreamsRel {
+private final int primaryKeyIndex;
+
+public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet 
traits, RelOptTable table, Prepare.CatalogReader catalogReader,
+  RelNode child, Operation operation, 
List updateColumnList, List sourceExpressionList,
+  boolean flattened, int primaryKeyIndex) {
+super(cluster, traits, table, catalogReader, child, operation, 
updateColumnList, sourceExpressionList, flattened);
+this.primaryKeyIndex = primaryKeyIndex;
+}
+
+@Override
+public RelNode copy(RelTraitSet traitSet, List inputs) {
+return new StreamsStreamInsertRel(getCluster(), traitSet, 
getTable(), getCatalogReader(),
+sole(inputs), getOperation(), getUpdateColumnList(), 
getSourceExpressionList(), isFlattened(), primaryKeyIndex);
+}
+
+@Override
+public void streamsPlan(StreamsPlanCreator planCreator) throws 
Exception {
+// SingleRel
+RelNode input = getInput();
+StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+Stream inputStream = planCreator.pop();
+
+Preconditions.checkArgument(isInsert(), "Only INSERT statement is 
supported.");
+
+// Calcite ensures that the value is structurized to the table 
definition
+// hence we can use PK index directly
+// To elaborate, if table BAR is defined as ID INTEGER PK, NAME 
VARCHAR, DEPTID INTEGER
+// and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is 
executed,
+// Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to 
the value before INSERT.
+
+// FIXME: this should be really different...
--- End diff --

I think we should raise a followup issue immediately and remove the FIXME, 
or just fix it now. I don't really know what the FIXME is saying should be 
changed.


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202344785
  
--- Diff: docs/storm-sql.md ---
@@ -6,14 +6,14 @@ documentation: true
 
 The Storm SQL integration allows users to run SQL queries over streaming 
data in Storm. Not only the SQL interface allows faster development cycles on 
streaming analytics, but also opens up the opportunities to unify batch data 
processing like [Apache Hive](///hive.apache.org) and real-time streaming data 
analytics.
 
-At a very high level StormSQL compiles the SQL queries to 
[Trident](Trident-API-Overview.html) topologies and executes them in Storm 
clusters. This document provides information of how to use StormSQL as end 
users. For people that are interested in more details in the design and the 
implementation of StormSQL please refer to the [this](storm-sql-internal.html) 
page.
+At a very high level StormSQL compiles the SQL queries to Storm topologies 
leveraging Streams API and executes them in Storm clusters. This document 
provides information of how to use StormSQL as end users. For people that are 
interested in more details in the design and the implementation of StormSQL 
please refer to the [this](storm-sql-internal.html) page.
--- End diff --

There's a link to storm-sql-internal in this line


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202357967
  
--- Diff: 
sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.datasource.socket.bolt;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.Socket;
+import java.util.Map;
+
+import org.apache.storm.shade.org.apache.commons.io.IOUtils;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Bolt implementation for Socket. Only available for Storm SQL.
+ * The class doesn't handle reconnection so you may not want to use this 
for production.
+ */
+public class SocketBolt implements IRichBolt {
--- End diff --

Nit: Consider extending `BaseRichBolt` instead, so you don't have to 
implement the methods you don't want to customize


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202346634
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
--- End diff --

I'm a little surprised that an uncaught exception here doesn't crash the 
worker though? I'd expect the worker to crash, and for the worker to come back 
up with backpressure disabled?


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202345723
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,24 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
+//Just in case we get something we are confused 
about
+// we can continue processing the rest of the tasks
+LOG.error("BP index out of bounds {}", e);
--- End diff --

Ah! I realized `e` is bound to {} so my suggestion is already applied (show 
index within same line), but no stack trace.


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202344217
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,24 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
+//Just in case we get something we are confused 
about
+// we can continue processing the rest of the tasks
+LOG.error("BP index out of bounds {}", e);
--- End diff --

Is it intentional that the stack trace is stripped here?


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202344011
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
+LOG.error("BP index out of bounds {}", e);
--- End diff --

Nit: Is it intentional that the stack trace is stripped here?


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202343846
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
--- End diff --

Makes sense, thanks.


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202343054
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
--- End diff --

OK. Makes sense. I know message of exception will contain the index which 
will be included to log message, but explicitly write in log message would help 
users to find the index while grepping since it can be found within same line.


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202342969
  
--- Diff: 
storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
 ---
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  
The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance 
with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class BackPressureStatusTest {
+
+@Test
+void bufferTest() throws IOException {
--- End diff --

Actually I'll just delete it because there is no reason to keep it, 
especially because we don't verify the result in any way.


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202342773
  
--- Diff: 
storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
 ---
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  
The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance 
with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class BackPressureStatusTest {
+
+@Test
+void bufferTest() throws IOException {
--- End diff --

The original problem was because kryo was throwing an exception trying to 
serialize a BackPressureStatus message.  I wrote this test initially to try and 
debug what was happening, and I ended up serializing millions of 
BackPressureStatus in loops to try and verify that it was working properly.  
When I failed I remembered that kryo was not thread safe and started looking 
around there.  If you want me to delete it I can.


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202341617
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
--- End diff --

So there are two issues happening in the jira.  Previously when under heavy 
load a backpressure messsage would be sent.  Under the old code this might 
contain a -1 which would cause an ArrayIndexOutOFBoundsException to happen.  
When it did some backpressure status updates in the message might be lost 
(specifically the ones that turn off backpressure) and the topology would stop 
processing.  I fixed the cause of the -1 indexes, but I thought it would be 
best to also guard against future problems and if there is one we can process 
the rest of the status updates.


---


[GitHub] storm pull request #2590: STORM-2974: Add transactional non-opaque spout to ...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2590


---


[GitHub] storm issue #2590: STORM-2974: Add transactional non-opaque spout to storm-k...

2018-07-13 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2590
  
I have found that couple of public classes are renamed, but they're in 
`internal` package which makes others feeling non-public, so I think we are OK.


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202300189
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java 
---
@@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, 
Object message) throws Except
 BackPressureStatus status = (BackPressureStatus) message;
 if (status.bpTasks != null) {
 for (Integer bpTask : status.bpTasks) {
-remoteBpStatus[bpTask].set(true);
+try {
+remoteBpStatus[bpTask].set(true);
+} catch (ArrayIndexOutOfBoundsException e) {
--- End diff --

I'm also not sure why this can happen?


---


[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo

2018-07-13 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2762#discussion_r202299829
  
--- Diff: 
storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
 ---
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  
The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance 
with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class BackPressureStatusTest {
+
+@Test
+void bufferTest() throws IOException {
--- End diff --

What is this test demonstrating?


---


[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...

2018-07-13 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2443#discussion_r202282820
  
--- Diff: docs/storm-sql-internal.md ---
@@ -1,59 +0,0 @@

--- End diff --

I removed the document since it is documented based on Trident 
implementation.


---


[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...

2018-07-13 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2443
  
@srdo 
Rebased. I'm seeing intermittent test failure like below, but not 
consistent failure. Will try to take a look at once I have time to, but let's 
move it out of this PR.

```
17:32:06.845 [SLOT_1027] INFO  o.a.s.m.StormMetricRegistry - Starting 
metrics reporters...
17:32:06.845 [SLOT_1027] INFO  o.a.s.s.a.ClientAuthUtils - Got AutoCreds []
17:32:06.846 [SLOT_1027] INFO  o.a.s.d.w.WorkerState - Reading assignments
17:32:06.846 [SLOT_1027] ERROR o.a.s.d.s.Slot - Error when processing event
java.io.IOException: java.lang.NullPointerException
at 
org.apache.storm.daemon.supervisor.LocalContainer.launch(LocalContainer.java:54)
 ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.daemon.supervisor.LocalContainerLauncher.launchContainer(LocalContainerLauncher.java:42)
 ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.daemon.supervisor.Slot.handleWaitingForBlobUpdate(Slot.java:528)
 ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:232) 
~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:902) 
[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
Caused by: java.lang.NullPointerException
at 
org.apache.storm.daemon.worker.WorkerState.readWorkerExecutors(WorkerState.java:630)
 ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.daemon.worker.WorkerState.(WorkerState.java:153) 
~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.daemon.worker.Worker.loadWorker(Worker.java:172) 
~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.daemon.worker.Worker.lambda$start$39(Worker.java:164) 
~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_66]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_66]
at org.apache.storm.daemon.worker.Worker.start(Worker.java:163) 
~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.daemon.supervisor.LocalContainer.launch(LocalContainer.java:52)
 ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
... 4 more
17:32:06.847 [SLOT_1027] ERROR o.a.s.u.Utils - Halting process: Error when 
processing an event
java.lang.RuntimeException: Halting process: Error when processing an event
at org.apache.storm.utils.Utils.exitProcess(Utils.java:473) 
[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:949) 
[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
```


---