[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r202504021 --- Diff: storm-core/src/clj/org/apache/storm/ui/core.clj --- @@ -1613,24 +1613,24 @@ https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH) https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)] (StormMetricsRegistry/startMetricsReporters conf) - (UIHelpers/stormRunJetty (int (conf UI-PORT)) -(conf UI-HOST) -https-port -header-buffer-size -(reify IConfigurator + (UIHelpers/stormRunJetty (int (conf UI-PORT)) + (conf UI-HOST) + https-port + header-buffer-size + (reify IConfigurator (execute [this server] (UIHelpers/configSsl server - https-port - https-ks-path - https-ks-password - https-ks-type - https-key-password - https-ts-path - https-ts-password - https-ts-type - https-need-client-auth - https-want-client-auth - header-buffer-size) + https-port + https-ks-path + https-ks-password + https-ks-type + https-key-password + https-ts-path + https-ts-password + https-ts-type + https-need-client-auth + https-want-client-auth + header-buffer-size) --- End diff -- This file has many nits whitespace. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2762 ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary to StormMetricsReg...
GitHub user zd-project opened a pull request: https://github.com/apache/storm/pull/2764 STORM-3147: Port ClusterSummary to StormMetricsRegistry This PR depends on #2763 The implementation is kind of ugly right now due to caching and synchronization in metrics update. I hope you can give out some advice in improvement. @revans2 @HeartSaVioR @srdo @kishorvpatil I've listed out all metrics I think worth implementing from ClusterSummay in Measured enum. I'm not sure if there's value in SUPERVISOR_TOTAL_RESOURCE, TOPOLOGY_STATUS, and TOPOLOGY_SCHED_STATUS. I'd like to hear what you think. I have also discovered a potential bug of scheduler when working on this [STORM-3151](https://issues.apache.org/jira/browse/STORM-3151), can you help confirm it? You can merge this pull request into a Git repository by running: $ git pull https://github.com/zd-project/storm STORM-3147 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2764.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2764 commit 802014c88a67ae5ef044c1f87889d976063eee71 Author: Zhengdai Hu Date: 2018-07-12T16:31:00Z STORM-3150: Improve Gauge registration methods and refactored code commit 4a8f63066dcede70a8df483383d6b2bcd616d299 Author: Zhengdai Hu Date: 2018-07-13T19:22:20Z STORM-3147: Port ClusterSummary to StormMetricsRegistry ---
[GitHub] storm pull request #2763: STORM-3150: Improve Gauge registration methods and...
GitHub user zd-project opened a pull request: https://github.com/apache/storm/pull/2763 STORM-3150: Improve Gauge registration methods and refactored code STORM-3150 You can merge this pull request into a Git repository by running: $ git pull https://github.com/zd-project/storm STORM-3150 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2763.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2763 commit 802014c88a67ae5ef044c1f87889d976063eee71 Author: Zhengdai Hu Date: 2018-07-12T16:31:00Z STORM-3150: Improve Gauge registration methods and refactored code ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202431877 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { --- End diff -- There is no way to disable backpressure right now. It didn't crash because this is running under netty. Netty will catch the exception and pass it off to an exception handler/the future. I didn't trace down the exact path and why we were not handling this exception to cause it to come down, but I will file a follow on JIRA to examine it. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202431655 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,24 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { +//Just in case we get something we are confused about +// we can continue processing the rest of the tasks +LOG.error("BP index out of bounds {}", e); --- End diff -- Yes, because when this was happening it was happening everywhere, and there is only one line under the try catch block so there isn't a lot of confusion about what happened. ---
[GitHub] storm issue #2744: [STORM-3132] Avoid NPE in the Values Constructor
Github user kishorvpatil commented on the issue: https://github.com/apache/storm/pull/2744 @HeartSaVioR Removed unwanted condition. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202345310 --- Diff: sql/README.md --- @@ -1,187 +1,8 @@ # Storm SQL -Compile SQL queries to Storm topologies. +Compile SQL queries to Storm topologies and run. -## Usage - -Run the ``storm sql`` command to compile SQL statements into Trident topology, and submit it to the Storm cluster - -``` -$ bin/storm sql -``` - -In which `sql-file` contains a list of SQL statements to be executed, and `topo-name` is the name of the topology. - -StormSQL activates `explain mode` and shows query plan instead of submitting topology when user specifies `topo-name` as `--explain`. -Detailed explanation is available from `Showing Query Plan (explain mode)` section. - -## Supported Features - -The following features are supported in the current repository: - -* Streaming from and to external data sources -* Filtering tuples -* Projections -* Aggregations (Grouping) -* User defined function (scalar and aggregate) -* Join (Inner, Left outer, Right outer, Full outer) - -## Specifying External Data Sources - -In StormSQL data is represented by external tables. Users can specify data sources using the `CREATE EXTERNAL TABLE` -statement. For example, the following statement specifies a Kafka spouts and sink: - -``` -CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' -``` - -The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in -[Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). - -`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout. -Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition). - -Default value is 1, and this option has no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.) - -## Plugging in External Data Sources - -Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using -the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the -tables. Please refer to the implementation of `storm-sql-kafka` for more details. - -## Specifying User Defined Function (UDF) - -Users can define user defined function (scalar or aggregate) using `CREATE FUNCTION` statement. -For example, the following statement defines `MYPLUS` function which uses `org.apache.storm.sql.TestUtils$MyPlus` class. - -``` -CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' -``` - -Storm SQL determines whether the function as scalar or aggregate by checking which methods are defined. -If the class defines `evaluate` method, Storm SQL treats the function as `scalar`, -and if the class defines `add` method, Storm SQL treats the function as `aggregate`. - -Example of class for scalar function is here: - -``` - public class MyPlus { -public static Integer evaluate(Integer x, Integer y) { - return x + y; -} - } - -``` - -and class for aggregate function is here: - -``` - public class MyConcat { -public static String init() { - return ""; -} -public static String add(String accumulator, String val) { - return accumulator + val; -} -public static String result(String accumulator) { - return accumulator; -} - } -``` - -If users don't define `result` method, result is the last return value of `add` method. -Users need to define `result` method only when we need to transform accumulated value. - -## Example: Filtering Kafka Stream - -Let's say there is a Kafka stream that represents the transactions of orders. Each message in the stream contains the id -of the order, the unit price of the product and the quantity of the orders. The goal is to filter orders where the -transactions are significant and to insert these orders into another Kafka stream for further analysis. - -The user can specify the following SQL statements in the SQL file: - -``` -CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202356815 --- Diff: sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java --- @@ -47,54 +45,60 @@ * The properties are in JSON format which specifies the name of the MongoDB collection and etc. */ public class MongoDataSourcesProvider implements DataSourcesProvider { +public static final String SCHEME_NAME = "mongodb"; +public static final String VALUE_SERIALIZED_FIELD = "ser.field"; +public static final String TRIDENT_VALUE_SERIALIZED_FIELD = "trident.ser.field"; +public static final String DEFAULT_VALUE_SERIALIZED_FIELD = "tridentSerField"; +public static final String COLLECTION_NAME = "collection.name"; -private static class MongoTridentDataSource implements ISqlTridentDataSource { +private static class MongoStreamsDataSource implements ISqlStreamsDataSource { private final String url; private final Properties props; private final IOutputSerializer serializer; -private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) { +private MongoStreamsDataSource(String url, Properties props, IOutputSerializer serializer) { this.url = url; this.props = props; this.serializer = serializer; } @Override -public ITridentDataSource getProducer() { +public IRichSpout getProducer() { throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer"); } @Override -public SqlTridentConsumer getConsumer() { +public IRichBolt getConsumer() { Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config"); -String serField = props.getProperty("trident.ser.field", "tridentSerField"); -MongoMapper mapper = new TridentMongoMapper(serField, serializer); - -MongoState.Options options = new MongoState.Options() -.withUrl(url) - .withCollectionName(props.getProperty("collection.name")) -.withMapper(mapper); - -StateFactory stateFactory = new MongoStateFactory(options); -StateUpdater stateUpdater = new MongoStateUpdater(); - -return new SimpleSqlTridentConsumer(stateFactory, stateUpdater); +String serField; +if (props.contains(VALUE_SERIALIZED_FIELD)) { +serField = props.getProperty(VALUE_SERIALIZED_FIELD); +} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) { +// backward compatibility +serField = props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD); --- End diff -- Nit: Since this is targeting 2.0.0, it might be okay not to provide backward compatibility. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202355372 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +75,12 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); - -Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - -HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -HdfsFileOptions options = mock(HdfsFileOptions.class); -Field optionsField = state.getClass().getDeclaredField("options"); -optionsField.setAccessible(true); -optionsField.set(state, options); +IRichBolt consumer = ds.getConsumer(); -List tupleList = mockTupleList(); - -for (TridentTuple t : tupleList) { -stateUpdater.updateState(state, Collections.singletonList(t), null); -try { -verify(options).execute(Collections.singletonList(t)); -} catch (IOException e) { -throw new RuntimeException(e); -} -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; +Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- Looking at this again, I think the way to do it would be to use a factory like https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java to instantiate the HdfsBolt. You could pass the factory class name in through a property and instantiate it when the data source is created. The default factory would just call `new HdfsBolt()`. I'm not sure if it's worth it. Up to you. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202341073 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- I'm wondering if it would be worth updating this rather than removing it, or maybe merging parts of it into one of the other storm-sql docs (e.g. the overview doc). I think the page provides a nice overview of what storm-sql is, and how it works. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202347946 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -118,16 +138,32 @@ public RelDataType getRowType( @Override public Statistic getStatistic() { return stat != null ? stat : Statistics.of(rows.size(), - ImmutableList.of()); +ImmutableList.of()); } @Override public Schema.TableType getJdbcTableType() { return Schema.TableType.STREAM; } + +@Override +public boolean isRolledUp(String s) { --- End diff -- Nit: `s` doesn't really say much as a variable name, can we replace it with one that says what this string is? Same for the other methods here. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202359820 --- Diff: sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java --- @@ -41,7 +26,46 @@ import java.util.Map; import java.util.PriorityQueue; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.sql.runtime.ISqlStreamsDataSource; +import org.apache.storm.streams.Pair; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.junit.rules.ExternalResource; + public class TestUtils { + public static final ExternalResource mockInsertBoltValueResource = new ExternalResource() { +@Override +protected void before() throws Throwable { + MockInsertBolt.getCollectedValues().clear(); +} + +@Override +protected void after() { + // no-op --- End diff -- Nit: I think you can leave out this method entirely ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202358383 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.storm.shade.org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. Only available for Storm SQL. + * The class doesn't handle reconnection so you may not want to use this for production. --- End diff -- Does it make sense for us to include code like this if we know it isn't suitable for production use? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202349103 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java --- @@ -72,13 +72,13 @@ // merge and push unions rules UnionEliminatorRule.INSTANCE, -TridentScanRule.INSTANCE, -TridentFilterRule.INSTANCE, -TridentProjectRule.INSTANCE, -TridentAggregateRule.INSTANCE, -TridentJoinRule.INSTANCE, -TridentModifyRule.INSTANCE, -TridentCalcRule.INSTANCE +StreamsScanRule.INSTANCE, +StreamsFilterRule.INSTANCE, +StreamsProjectRule.INSTANCE, +StreamsAggregateRule.INSTANCE, --- End diff -- Are we holding on to aggregate and join so they'll be easy to implement later, or why are the aggregate and join rules here? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202350280 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.planner.streams.rel; + +import java.util.List; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.storm.sql.planner.StormRelUtils; +import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase; +import org.apache.storm.sql.planner.streams.StreamsPlanCreator; +import org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction; +import org.apache.storm.streams.Stream; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.tuple.Values; + +public class StreamsStreamInsertRel extends StormStreamInsertRelBase implements StreamsRel { +private final int primaryKeyIndex; + +public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, + RelNode child, Operation operation, List updateColumnList, List sourceExpressionList, + boolean flattened, int primaryKeyIndex) { +super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened); +this.primaryKeyIndex = primaryKeyIndex; +} + +@Override +public RelNode copy(RelTraitSet traitSet, List inputs) { +return new StreamsStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(), +sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), primaryKeyIndex); +} + +@Override +public void streamsPlan(StreamsPlanCreator planCreator) throws Exception { +// SingleRel +RelNode input = getInput(); +StormRelUtils.getStormRelInput(input).streamsPlan(planCreator); +Stream inputStream = planCreator.pop(); + +Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported."); + +// Calcite ensures that the value is structurized to the table definition +// hence we can use PK index directly +// To elaborate, if table BAR is defined as ID INTEGER PK, NAME VARCHAR, DEPTID INTEGER +// and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is executed, +// Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to the value before INSERT. + +// FIXME: this should be really different... --- End diff -- I think we should raise a followup issue immediately and remove the FIXME, or just fix it now. I don't really know what the FIXME is saying should be changed. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202344785 --- Diff: docs/storm-sql.md --- @@ -6,14 +6,14 @@ documentation: true The Storm SQL integration allows users to run SQL queries over streaming data in Storm. Not only the SQL interface allows faster development cycles on streaming analytics, but also opens up the opportunities to unify batch data processing like [Apache Hive](///hive.apache.org) and real-time streaming data analytics. -At a very high level StormSQL compiles the SQL queries to [Trident](Trident-API-Overview.html) topologies and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to the [this](storm-sql-internal.html) page. +At a very high level StormSQL compiles the SQL queries to Storm topologies leveraging Streams API and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to the [this](storm-sql-internal.html) page. --- End diff -- There's a link to storm-sql-internal in this line ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202357967 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.storm.shade.org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. Only available for Storm SQL. + * The class doesn't handle reconnection so you may not want to use this for production. + */ +public class SocketBolt implements IRichBolt { --- End diff -- Nit: Consider extending `BaseRichBolt` instead, so you don't have to implement the methods you don't want to customize ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202346634 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { --- End diff -- I'm a little surprised that an uncaught exception here doesn't crash the worker though? I'd expect the worker to crash, and for the worker to come back up with backpressure disabled? ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202345723 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,24 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { +//Just in case we get something we are confused about +// we can continue processing the rest of the tasks +LOG.error("BP index out of bounds {}", e); --- End diff -- Ah! I realized `e` is bound to {} so my suggestion is already applied (show index within same line), but no stack trace. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202344217 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,24 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { +//Just in case we get something we are confused about +// we can continue processing the rest of the tasks +LOG.error("BP index out of bounds {}", e); --- End diff -- Is it intentional that the stack trace is stripped here? ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202344011 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { +LOG.error("BP index out of bounds {}", e); --- End diff -- Nit: Is it intentional that the stack trace is stripped here? ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202343846 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { --- End diff -- Makes sense, thanks. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202343054 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { --- End diff -- OK. Makes sense. I know message of exception will contain the index which will be included to log message, but explicitly write in log message would help users to find the index while grepping since it can be found within same line. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202342969 --- Diff: storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java --- @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.messaging.netty; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.apache.storm.shade.io.netty.buffer.ByteBuf; +import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class BackPressureStatusTest { + +@Test +void bufferTest() throws IOException { --- End diff -- Actually I'll just delete it because there is no reason to keep it, especially because we don't verify the result in any way. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202342773 --- Diff: storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java --- @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.messaging.netty; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.apache.storm.shade.io.netty.buffer.ByteBuf; +import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class BackPressureStatusTest { + +@Test +void bufferTest() throws IOException { --- End diff -- The original problem was because kryo was throwing an exception trying to serialize a BackPressureStatus message. I wrote this test initially to try and debug what was happening, and I ended up serializing millions of BackPressureStatus in loops to try and verify that it was working properly. When I failed I remembered that kryo was not thread safe and started looking around there. If you want me to delete it I can. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202341617 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { --- End diff -- So there are two issues happening in the jira. Previously when under heavy load a backpressure messsage would be sent. Under the old code this might contain a -1 which would cause an ArrayIndexOutOFBoundsException to happen. When it did some backpressure status updates in the message might be lost (specifically the ones that turn off backpressure) and the topology would stop processing. I fixed the cause of the -1 indexes, but I thought it would be best to also guard against future problems and if there is one we can process the rest of the status updates. ---
[GitHub] storm pull request #2590: STORM-2974: Add transactional non-opaque spout to ...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2590 ---
[GitHub] storm issue #2590: STORM-2974: Add transactional non-opaque spout to storm-k...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2590 I have found that couple of public classes are renamed, but they're in `internal` package which makes others feeling non-public, so I think we are OK. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202300189 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { --- End diff -- I'm also not sure why this can happen? ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202299829 --- Diff: storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java --- @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.messaging.netty; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.storm.serialization.KryoValuesSerializer; +import org.apache.storm.shade.io.netty.buffer.ByteBuf; +import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class BackPressureStatusTest { + +@Test +void bufferTest() throws IOException { --- End diff -- What is this test demonstrating? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202282820 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- I removed the document since it is documented based on Trident implementation. ---
[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2443 @srdo Rebased. I'm seeing intermittent test failure like below, but not consistent failure. Will try to take a look at once I have time to, but let's move it out of this PR. ``` 17:32:06.845 [SLOT_1027] INFO o.a.s.m.StormMetricRegistry - Starting metrics reporters... 17:32:06.845 [SLOT_1027] INFO o.a.s.s.a.ClientAuthUtils - Got AutoCreds [] 17:32:06.846 [SLOT_1027] INFO o.a.s.d.w.WorkerState - Reading assignments 17:32:06.846 [SLOT_1027] ERROR o.a.s.d.s.Slot - Error when processing event java.io.IOException: java.lang.NullPointerException at org.apache.storm.daemon.supervisor.LocalContainer.launch(LocalContainer.java:54) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.LocalContainerLauncher.launchContainer(LocalContainerLauncher.java:42) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.Slot.handleWaitingForBlobUpdate(Slot.java:528) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:232) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:902) [storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] Caused by: java.lang.NullPointerException at org.apache.storm.daemon.worker.WorkerState.readWorkerExecutors(WorkerState.java:630) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.worker.WorkerState.(WorkerState.java:153) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.worker.Worker.loadWorker(Worker.java:172) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.worker.Worker.lambda$start$39(Worker.java:164) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_66] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_66] at org.apache.storm.daemon.worker.Worker.start(Worker.java:163) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.LocalContainer.launch(LocalContainer.java:52) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] ... 4 more 17:32:06.847 [SLOT_1027] ERROR o.a.s.u.Utils - Halting process: Error when processing an event java.lang.RuntimeException: Halting process: Error when processing an event at org.apache.storm.utils.Utils.exitProcess(Utils.java:473) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:949) [storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] ``` ---