[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java
Github user abhishekagarwal87 commented on the issue: https://github.com/apache/storm/pull/1756 That's good to hear Robert. I will resolve the conflicts and incorporate your suggestions. Thank you for your help. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1760: Add topology stream-awareness to storm-redis
GitHub user mo-getter opened a pull request: https://github.com/apache/storm/pull/1760 Add topology stream-awareness to storm-redis Allows users to control the streams to which the provided Bolts emit, via a StreamMapper interface, rather than emitting only to the "default" stream. The existing constructors for the Bolts use a DefaultStreamMapper so that there are no breaking changes in behavior. Although, in the future, it might make more sense to use the InputSourceStreamMapper by default, which emits new tuples to the same stream the input tuple came in on (especially for the RedisFilterBolt). You can merge this pull request into a Git repository by running: $ git pull https://github.com/mo-getter/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1760.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1760 commit 28e31f3050cc837164005b22c7b66aacecf2cd7b Author: Alan Smith Date: 2016-11-03T03:56:08Z Add topology stream-awareness to storm-redis. Allows users to control the streams to which the provided Bolts emit, via a StreamMapper interface, rather than emitting only to the default stream. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
How does the control flow in a Trident Topology work?
Hi guys, I am trying to understand the implementation of Trident. Through reading the code in TridentTopolgyBuilder.java, I understand that some Coordinator components, such as MasterBatchCoordinator and TridentSpoutCoordinator, are added to a user defined topology in TridentTopologyBuilder.createTopology(). I try to understand the control flow of those coordinators, but is seems to be very difficult to get the sense just from source code. Is there any document giving a high level of the control flow of the coordinator components in a Trident Topology? Any help is highly appreciated. Thanks! Sincerely, Li Wang
[GitHub] storm issue #1748: [STORM-2103][SQL] Introduce new sql external module: stor...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1748 @vesense I just assigned some time to myself. I don't have MongoDB installed but code change looks good. One question though: it seems to assign serialization field: do we use only one field for storing all the things, or does Mongo (or BSON) take care of it automatically? I'd like to see 'how to use' documented for storm-sql-mongo but others are also not documented, so we would want to create a document for describing how to configure / use external data source in Storm SQL. Btw, do you run the SQL statement with storm-sql-mongo? I don't have mongodb installed so it might take another hours or even days to do manual test. If you don't really mind, I'd recommend you to paste your SQL statements (including datasource definitions) which runs well with the patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1749: STORM-2175: fix double close of workers
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/1749 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1750: STORM-2175: fix double close of workers
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/1750 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1747: STORM-2018: Supervisor V2 (1.0.x)
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/1747 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1757 @hmcl If it's really common so that most of examples for modules could refer it, I'd make common module for examples without dependencies for external storage. Or we could even place them to storm-core if it's also useful for end-users and it's not related to any external storage. Like I said I'm fine to what you propose from PR, just would like to see the way to avoid dependencies coupling at any chance, which would be better in my perspective. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1756 Perf numbers look good compared to the 1.x line (very non scientific though). I didn't dig into it a lot. Running throughput vs latency on my mac I saw the CPU utilization with this is about half of what it is on the 1.x line at 20k sentences per second. Also the latencies all across the board are lower, not by a lot but a few ms. Also the JITTER seems to be doing a lot better job on the java code and it was able to hit a steady state at 20k/sec in abut 30 seconds where as the clojure code was taking almost 3 mins to hit a steady state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1758: STORM-2185: Storm Supervisor doesn't delete direct...
GitHub user knusbaum opened a pull request: https://github.com/apache/storm/pull/1758 STORM-2185: Storm Supervisor doesn't delete directories properly sometimes You can merge this pull request into a Git repository by running: $ git pull https://github.com/knusbaum/incubator-storm STORM-2185 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1758.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1758 commit 3ac3b8329121c38de774e57f694882eac9d1f043 Author: Kyle Nusbaum Date: 2016-10-31T21:05:44Z Fixing KafkaSpout acking. commit 2d3d7b109094fe966abd83f12e1ed388075b3be5 Author: Kyle Nusbaum Date: 2016-10-31T21:11:18Z Fixing spacing. commit 52da50044f0fba65e99af6a4fd2c5af55d54d455 Author: Kyle Nusbaum Date: 2016-11-02T20:29:53Z Fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1759: STORM-2185: Storm Supervisor doesn't delete direct...
GitHub user knusbaum opened a pull request: https://github.com/apache/storm/pull/1759 STORM-2185: Storm Supervisor doesn't delete directories properly sometimes - 1.x branch You can merge this pull request into a Git repository by running: $ git pull https://github.com/knusbaum/incubator-storm STORM-2185-1.x-branch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1759.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1759 commit 0035a2cc91095dc440f0ca44df3a61d85db9494c Author: Kyle Nusbaum Date: 2016-10-31T21:05:44Z Fixing KafkaSpout acking. commit b4dc3410c3dca405560de8b5a2649aecb4c2934d Author: Kyle Nusbaum Date: 2016-10-31T21:11:18Z Fixing spacing. commit 5d6b6e4ee6ac55f7905082d01343740ca43594ce Author: Kyle Nusbaum Date: 2016-11-02T20:29:53Z Fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r86233431 --- Diff: storm-core/src/clj/org/apache/storm/daemon/local_executor.clj --- @@ -25,18 +25,3 @@ (let [val (AddressedTuple. task tuple)] --- End diff -- This entire file should really be removed and with-tracked-cluster should be updated to not use it. It is obviously not needed because this function is never called, only over ridden. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r86233780 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java --- @@ -179,22 +178,21 @@ public BuiltinMetrics getBuiltInMetrics() { } private TopologyContext mkTopologyContext(StormTopology topology) throws IOException { -Map conf = (Map) workerData.get(Constants.CONF); +Map conf = workerData.getConf(); return new TopologyContext( -topology, -(Map) workerData.get(Constants.STORM_CONF), -(Map) workerData.get(Constants.TASK_TO_COMPONENT), -(Map>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS), -(Map>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS), -(String) workerData.get(Constants.STORM_ID), +topology, +workerData.getTopologyConf(), +workerData.getTaskToComponent(), +workerData.getComponentToSortedTasks(), +workerData.getComponentToStreamToFields(), +workerData.getTopologyId(), ConfigUtils.supervisorStormResourcesPath( -ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get(Constants.STORM_ID))), -ConfigUtils.workerPidsRoot(conf, (String) workerData.get(Constants.WORKER_ID)), +ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())), +ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()), taskId, -(Integer) workerData.get(Constants.PORT), -(List) workerData.get(Constants.TASK_IDS), -(Map) workerData.get(Constants.DEFAULT_SHARED_RESOURCES), -(Map) workerData.get(Constants.USER_SHARED_RESOURCES), +workerData.getPort(), workerData.getTaskIds(), +workerData.getDefaultSharedResources(), + workerData.getUserSharedResources(), --- End diff -- nit: indentation is off --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1756 @abhishekagarwal87 I created a pull request to your repo for the failing tests https://github.com/abhishekagarwal87/storm/pull/7 I will keep looking at the pull request --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1757 @HeartSaVioR I am OK with a common module, but does it make sense to have a common module, with little or no dependencies, just to hold a couple of classes, and have both kafka modules refer to it? In this particular case, one of the classes that need to be reused lives in storm-starter, and three other classes are common to both storm-kafka and storm-kafka-client. Should we create a common module to all the examples ? Or should the common module be storm-starter ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1751: [STORM-2172][SQL] Support Avro as input / output f...
Github user sachingsachin commented on a diff in the pull request: https://github.com/apache/storm/pull/1751#discussion_r86188870 --- Diff: external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.runtime.serde.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.util.Utf8; +import org.apache.storm.spout.Scheme; +import org.apache.storm.sql.runtime.utils.SerdeUtils; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class AvroScheme implements Scheme { + private final String schemaString; + private final List fieldNames; + private final CachedSchemas schemas; + + public AvroScheme(String schemaString, List fieldNames) { +this.schemaString = schemaString; +this.fieldNames = fieldNames; +this.schemas = new CachedSchemas(); + } + + @Override + public List deserialize(ByteBuffer ser) { +try { + Schema schema = schemas.getSchema(schemaString); + + DatumReader reader = new GenericDatumReader<>(schema); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null); + GenericRecord record = reader.read(null, decoder); + + ArrayList list = new ArrayList<>(fieldNames.size()); + for (String field : fieldNames) { +Object value = record.get(field); +// Avro strings are stored using a special Avro type instead of using Java primitives +if (value instanceof Utf8) { + list.add(value.toString()); +} else if (value instanceof Map) { + Map map = SerdeUtils.convertAvroUtf8Map((Map)value); + list.add(map); +} else { + list.add(value); --- End diff -- Do we need to check array type containers too for Utf8-to-String conversion? Example: array or set of Utf-8 objects? That should cover all the cases IMO --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1751: [STORM-2172][SQL] Support Avro as input / output f...
Github user sachingsachin commented on a diff in the pull request: https://github.com/apache/storm/pull/1751#discussion_r86189092 --- Diff: external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.runtime.utils; + +import static org.apache.commons.lang.StringUtils.isNotEmpty; + +import com.google.common.base.Preconditions; +import org.apache.avro.util.Utf8; +import org.apache.storm.spout.Scheme; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.sql.runtime.serde.avro.AvroScheme; +import org.apache.storm.sql.runtime.serde.avro.AvroSerializer; +import org.apache.storm.sql.runtime.serde.json.JsonScheme; +import org.apache.storm.sql.runtime.serde.json.JsonSerializer; +import org.apache.storm.utils.Utils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public final class SerdeUtils { +public static Scheme getScheme(String inputFormatClass, Properties properties, List fieldNames) { +Scheme scheme; +if (isNotEmpty(inputFormatClass)) { +switch (inputFormatClass) { +case "org.apache.storm.sql.runtime.serde.json.JsonScheme" : +scheme = new JsonScheme(fieldNames); +break; +case "org.apache.storm.sql.runtime.serde.avro.AvroScheme" : +String schemaString = properties.getProperty("avro.schema"); +Preconditions.checkArgument(isNotEmpty(schemaString), "avro.schema can not be empty"); +scheme = new AvroScheme(schemaString, fieldNames); +break; +default: +scheme = Utils.newInstance(inputFormatClass); +} +} else { +//use JsonScheme as the default scheme +scheme = new JsonScheme(fieldNames); +} +return scheme; +} + +public static IOutputSerializer getSerializer(String outputFormatClass, Properties properties, List fieldNames) { +IOutputSerializer serializer; +if (isNotEmpty(outputFormatClass)) { +switch (outputFormatClass) { +case "org.apache.storm.sql.runtime.serde.json.JsonSerializer" : +serializer = new JsonSerializer(fieldNames); +break; +case "org.apache.storm.sql.runtime.serde.avro.AvroSerializer" : +String schemaString = properties.getProperty("avro.schema"); +Preconditions.checkArgument(isNotEmpty(schemaString), "avro.schema can not be empty"); +serializer = new AvroSerializer(schemaString, fieldNames); +break; +default: +serializer = Utils.newInstance(outputFormatClass); +} +} else { +//use JsonSerializer as the default serializer +serializer = new JsonSerializer(fieldNames); +} +return serializer; +} + +public static Map convertAvroUtf8Map(Map value) { +Map map = new HashMap<>(); +for (Map.Entry entry : value.entrySet()) { +// Avro only allows maps with Strings for keys, so we only have to worry +// about deserializing the values +Object key = entry.getKey().toString(); --- End diff -- Avro supports non-string maps too. So conversion to string might be incorrect in those cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct..
[GitHub] storm pull request #1751: [STORM-2172][SQL] Support Avro as input / output f...
Github user sachingsachin commented on a diff in the pull request: https://github.com/apache/storm/pull/1751#discussion_r86189208 --- Diff: external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.runtime.utils; + +import static org.apache.commons.lang.StringUtils.isNotEmpty; + +import com.google.common.base.Preconditions; +import org.apache.avro.util.Utf8; +import org.apache.storm.spout.Scheme; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.sql.runtime.serde.avro.AvroScheme; +import org.apache.storm.sql.runtime.serde.avro.AvroSerializer; +import org.apache.storm.sql.runtime.serde.json.JsonScheme; +import org.apache.storm.sql.runtime.serde.json.JsonSerializer; +import org.apache.storm.utils.Utils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public final class SerdeUtils { +public static Scheme getScheme(String inputFormatClass, Properties properties, List fieldNames) { +Scheme scheme; +if (isNotEmpty(inputFormatClass)) { +switch (inputFormatClass) { +case "org.apache.storm.sql.runtime.serde.json.JsonScheme" : +scheme = new JsonScheme(fieldNames); +break; +case "org.apache.storm.sql.runtime.serde.avro.AvroScheme" : +String schemaString = properties.getProperty("avro.schema"); +Preconditions.checkArgument(isNotEmpty(schemaString), "avro.schema can not be empty"); +scheme = new AvroScheme(schemaString, fieldNames); +break; +default: +scheme = Utils.newInstance(inputFormatClass); +} +} else { +//use JsonScheme as the default scheme +scheme = new JsonScheme(fieldNames); +} +return scheme; +} + +public static IOutputSerializer getSerializer(String outputFormatClass, Properties properties, List fieldNames) { +IOutputSerializer serializer; +if (isNotEmpty(outputFormatClass)) { +switch (outputFormatClass) { +case "org.apache.storm.sql.runtime.serde.json.JsonSerializer" : +serializer = new JsonSerializer(fieldNames); +break; +case "org.apache.storm.sql.runtime.serde.avro.AvroSerializer" : +String schemaString = properties.getProperty("avro.schema"); +Preconditions.checkArgument(isNotEmpty(schemaString), "avro.schema can not be empty"); +serializer = new AvroSerializer(schemaString, fieldNames); +break; +default: +serializer = Utils.newInstance(outputFormatClass); +} +} else { +//use JsonSerializer as the default serializer +serializer = new JsonSerializer(fieldNames); +} +return serializer; +} + +public static Map convertAvroUtf8Map(Map value) { +Map map = new HashMap<>(); +for (Map.Entry entry : value.entrySet()) { +// Avro only allows maps with Strings for keys, so we only have to worry +// about deserializing the values +Object key = entry.getKey().toString(); +Object val = entry.getValue(); + +if (val instanceof Utf8) { +map.put(key, val.toString()); +} else if (val instanceof Map) { +map.put(key, convertAvroUtf8Map((Map)val)); +} else { +map.put(key, val); --- End diff -- Same comment about arrays/sets of Utf
[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1757 @hmcl I agree that we should try to avoid duplicated code, but I also have seen that unneeded dependency makes dependency problem which is not directed from first level deps but transitive dependencies. If common codes are not related to kafka API, I'd rather to have common module between the twos, but if it's not (and even if it is), either way is OK to me. That was just my preference, and others could be thinking differently, and don't have strong opinion about this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r86168341 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.base.Preconditions; +import com.lmax.disruptor.EventHandler; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.storm.Config; +import org.apache.storm.cluster.*; +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.ExecutorShutdown; +import org.apache.storm.executor.IRunningExecutor; +import org.apache.storm.executor.LocalExecutor; +import org.apache.storm.generated.*; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.*; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Worker implements Shutdownable, DaemonCommon { + +private static final Logger LOG = LoggerFactory.getLogger(Worker.class); +private final Map conf; +private final IContext context; +private final String topologyId; +private final String assignmentId; +private final int port; +private final String workerId; +private final LogConfigManager logConfigManager; + + +private WorkerState workerState; +private AtomicReference> executorsAtom; +private Thread transferThread; +private WorkerBackpressureThread backpressureThread; + +private AtomicReference credentialsAtom; +private Subject subject; +private Collection autoCreds; + + +/** + * TODO: should worker even take the topologyId as input? this should be + * deducable from cluster state (by searching through assignments) + * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency + * + * @param conf - Storm configuration + * @param context - + * @param topologyId - topology id + * @param assignmentId - assignement id + * @param port - port on which the worker runs + * @param workerId - worker id + */ + +public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) { +this.conf = conf; +this.context = context; +this.topologyId = topologyId; +this.assignmentId = assignmentId; +this.port = port; +this.workerId = workerId; +this.logConfigManager = new LogConfigManager(); +} + +public void start() throws Exception { +LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, +conf); +// because in local mode, its not a separate +// process. supervisor will register it in this case +// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode. +i
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r86166475 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.base.Preconditions; +import com.lmax.disruptor.EventHandler; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.storm.Config; +import org.apache.storm.cluster.*; +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.ExecutorShutdown; +import org.apache.storm.executor.IRunningExecutor; +import org.apache.storm.executor.LocalExecutor; +import org.apache.storm.generated.*; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.*; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Worker implements Shutdownable, DaemonCommon { + +private static final Logger LOG = LoggerFactory.getLogger(Worker.class); +private final Map conf; +private final IContext context; +private final String topologyId; +private final String assignmentId; +private final int port; +private final String workerId; +private final LogConfigManager logConfigManager; + + +private WorkerState workerState; +private AtomicReference> executorsAtom; +private Thread transferThread; +private WorkerBackpressureThread backpressureThread; + +private AtomicReference credentialsAtom; +private Subject subject; +private Collection autoCreds; + + +/** + * TODO: should worker even take the topologyId as input? this should be + * deducable from cluster state (by searching through assignments) + * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency + * + * @param conf - Storm configuration + * @param context - + * @param topologyId - topology id + * @param assignmentId - assignement id + * @param port - port on which the worker runs + * @param workerId - worker id + */ + +public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) { +this.conf = conf; +this.context = context; +this.topologyId = topologyId; +this.assignmentId = assignmentId; +this.port = port; +this.workerId = workerId; +this.logConfigManager = new LogConfigManager(); +} + +public void start() throws Exception { +LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, +conf); +// because in local mode, its not a separate +// process. supervisor will register it in this case +// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode. +i
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r86178116 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java --- @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.storm.generated.LogConfig; +import org.apache.storm.generated.LogLevel; +import org.apache.storm.generated.LogLevelAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; + +public class LogConfigManager { + +private static final Logger LOG = LoggerFactory.getLogger(LogConfigManager.class); + +private final AtomicReference> latestLogConfig; +private final Map originalLogLevels; + +public LogConfigManager() { +this(new AtomicReference<>(new TreeMap<>())); +} + +public LogConfigManager(AtomicReference> latestLogConfig) { +this.latestLogConfig = latestLogConfig; +this.originalLogLevels = getLoggerLevels(); +LOG.info("Started with log levels: {}", originalLogLevels); +} + +public void processLogConfigChange(LogConfig logConfig) { +if (null != logConfig) { +LOG.debug("Processing received log config: {}", logConfig); +TreeMap loggers = new TreeMap<>(logConfig.get_named_logger_level()); +LoggerContext logContext = (LoggerContext) LogManager.getContext(); --- End diff -- In the previous code it passed in a false. Not sure if it makes much difference in our case though. https://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/LogManager.html#getContext(boolean) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1748: [STORM-2103][SQL] Introduce new sql external module: stor...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1748 @vesense Thanks for addressing numerous works. I hadn't have time recently and still have less spare time, but I'll try to find some time to review. Thanks in advance for your patience. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1748: [STORM-2103][SQL] Introduce new sql external module: stor...
Github user vesense commented on the issue: https://github.com/apache/storm/pull/1748 I want to make it clear that PRs getting merged order is https://github.com/apache/storm/pull/1748 https://github.com/apache/storm/pull/1751 https://github.com/apache/storm/pull/1754 Hope these PRs can be merged ASAP since many following up PRs are on the road. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1747: STORM-2018: Supervisor V2 (1.0.x)
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1747 @revans2 I checked the last commit and it looks good. Travis CI failure is unrelated. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1750: STORM-2175: fix double close of workers
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1750 @revans2 +1 and I'll take a look at #1749 very soon. Thanks for the great work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1749: STORM-2175: fix double close of workers
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1749 @revans2 also +1 on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1750: STORM-2175: fix double close of workers
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1750 @HeartSaVioR could you take a look at this and #1749 so I can merge this in? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1756 @abhishekagarwal87 happy to take a look. I'll see what I can do on the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1757 @HeartSaVioR why are we trying to avoid the module dependencies so much, to the point of creating duplicate code? Shouldn't we try to avoid creating duplicate code at all costs? If we eliminate all the module dependencies I created, we will have to duplicate 4 classes. Two of them will be duplicated in 2 places each, and the other 2 duplicated in one place. We will have 6 classes more than we need with the current module structure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1757 Before looking into the detail, how much storm-kafka-example and storm-kafka-client-example shares the code? Personally I'm OK to have some duplications if it can get rid of dependency between modules. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---