[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

2016-11-02 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on the issue:

https://github.com/apache/storm/pull/1756
  
That's good to hear Robert. I will resolve the conflicts and incorporate 
your suggestions. Thank you for your help. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1760: Add topology stream-awareness to storm-redis

2016-11-02 Thread mo-getter
GitHub user mo-getter opened a pull request:

https://github.com/apache/storm/pull/1760

Add topology stream-awareness to storm-redis

Allows users to control the streams to which the provided Bolts emit, via a 
StreamMapper interface, rather than emitting only to the "default" stream.

The existing constructors for the Bolts use a DefaultStreamMapper so that 
there are no breaking changes in behavior. Although, in the future, it might 
make more sense to use the InputSourceStreamMapper by default, which emits new 
tuples to the same stream the input tuple came in on (especially for the 
RedisFilterBolt).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mo-getter/storm master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1760.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1760


commit 28e31f3050cc837164005b22c7b66aacecf2cd7b
Author: Alan Smith 
Date:   2016-11-03T03:56:08Z

Add topology stream-awareness to storm-redis.

Allows users to control the streams to which the provided Bolts emit,
via a StreamMapper interface, rather than emitting only to the default
stream.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


How does the control flow in a Trident Topology work?

2016-11-02 Thread Li Wang
Hi guys,

I am trying to understand the implementation of Trident. Through reading the 
code in TridentTopolgyBuilder.java, I understand that some Coordinator 
components, such as MasterBatchCoordinator and TridentSpoutCoordinator, are 
added to a user defined topology in TridentTopologyBuilder.createTopology(). I 
try to understand the control flow of those coordinators, but is seems to be 
very difficult to get the sense just from source code. Is there any document 
giving a high level of the control flow of the coordinator components in a 
Trident Topology?

Any help is highly appreciated. Thanks!

Sincerely,
Li Wang

[GitHub] storm issue #1748: [STORM-2103][SQL] Introduce new sql external module: stor...

2016-11-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1748
  
@vesense I just assigned some time to myself. I don't have MongoDB 
installed but code change looks good. One question though: it seems to assign 
serialization field: do we use only one field for storing all the things, or 
does Mongo (or BSON) take care of it automatically? 

I'd like to see 'how to use' documented for storm-sql-mongo but others are 
also not documented, so we would want to create a document for describing how 
to configure / use external data source in Storm SQL.

Btw, do you run the SQL statement with storm-sql-mongo? I don't have 
mongodb installed so it might take another hours or even days to do manual 
test. If you don't really mind, I'd recommend you to paste your SQL statements 
(including datasource definitions) which runs well with the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1749: STORM-2175: fix double close of workers

2016-11-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/1749


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1750: STORM-2175: fix double close of workers

2016-11-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/1750


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1747: STORM-2018: Supervisor V2 (1.0.x)

2016-11-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/1747


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694

2016-11-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1757
  
@hmcl If it's really common so that most of examples for modules could 
refer it, I'd make common module for examples without dependencies for external 
storage. Or we could even place them to storm-core if it's also useful for 
end-users and it's not related to any external storage. 
Like I said I'm fine to what you propose from PR, just would like to see 
the way to avoid dependencies coupling at any chance, which would be better in 
my perspective.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

2016-11-02 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1756
  
Perf numbers look good compared to the 1.x line (very non scientific 
though).  I didn't dig into it a lot.  Running throughput vs latency on my mac 
I saw the CPU utilization with this is about half of what it is on the 1.x line 
at 20k sentences per second.  Also the latencies all across the board are 
lower, not by a lot but a few ms.  Also the JITTER seems to be doing a lot 
better job on the java code and it was able to hit a steady state at 20k/sec in 
abut 30 seconds where as the clojure code was taking almost 3 mins to hit a 
steady state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1758: STORM-2185: Storm Supervisor doesn't delete direct...

2016-11-02 Thread knusbaum
GitHub user knusbaum opened a pull request:

https://github.com/apache/storm/pull/1758

STORM-2185: Storm Supervisor doesn't delete directories properly sometimes



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm STORM-2185

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1758


commit 3ac3b8329121c38de774e57f694882eac9d1f043
Author: Kyle Nusbaum 
Date:   2016-10-31T21:05:44Z

Fixing KafkaSpout acking.

commit 2d3d7b109094fe966abd83f12e1ed388075b3be5
Author: Kyle Nusbaum 
Date:   2016-10-31T21:11:18Z

Fixing spacing.

commit 52da50044f0fba65e99af6a4fd2c5af55d54d455
Author: Kyle Nusbaum 
Date:   2016-11-02T20:29:53Z

Fix.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1759: STORM-2185: Storm Supervisor doesn't delete direct...

2016-11-02 Thread knusbaum
GitHub user knusbaum opened a pull request:

https://github.com/apache/storm/pull/1759

STORM-2185: Storm Supervisor doesn't delete directories properly sometimes 
- 1.x branch



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm STORM-2185-1.x-branch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1759.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1759


commit 0035a2cc91095dc440f0ca44df3a61d85db9494c
Author: Kyle Nusbaum 
Date:   2016-10-31T21:05:44Z

Fixing KafkaSpout acking.

commit b4dc3410c3dca405560de8b5a2649aecb4c2934d
Author: Kyle Nusbaum 
Date:   2016-10-31T21:11:18Z

Fixing spacing.

commit 5d6b6e4ee6ac55f7905082d01343740ca43594ce
Author: Kyle Nusbaum 
Date:   2016-11-02T20:29:53Z

Fix.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

2016-11-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1756#discussion_r86233431
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/local_executor.clj ---
@@ -25,18 +25,3 @@
 (let [val (AddressedTuple. task tuple)]
--- End diff --

This entire file should really be removed and with-tracked-cluster should 
be updated to not use it.  It is obviously not needed because this function is 
never called, only over ridden.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

2016-11-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1756#discussion_r86233780
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -179,22 +178,21 @@ public BuiltinMetrics getBuiltInMetrics() {
 }
 
 private TopologyContext mkTopologyContext(StormTopology topology) 
throws IOException {
-Map conf = (Map) workerData.get(Constants.CONF);
+Map conf = workerData.getConf();
 return new TopologyContext(
-topology,
-(Map) workerData.get(Constants.STORM_CONF),
-(Map) 
workerData.get(Constants.TASK_TO_COMPONENT),
-(Map>) 
workerData.get(Constants.COMPONENT_TO_SORTED_TASKS),
-(Map>) 
workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS),
-(String) workerData.get(Constants.STORM_ID),
+topology,
+workerData.getTopologyConf(),
+workerData.getTaskToComponent(),
+workerData.getComponentToSortedTasks(),
+workerData.getComponentToStreamToFields(),
+workerData.getTopologyId(),
 ConfigUtils.supervisorStormResourcesPath(
-ConfigUtils.supervisorStormDistRoot(conf, (String) 
workerData.get(Constants.STORM_ID))),
-ConfigUtils.workerPidsRoot(conf, (String) 
workerData.get(Constants.WORKER_ID)),
+ConfigUtils.supervisorStormDistRoot(conf, 
workerData.getTopologyId())),
+ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
 taskId,
-(Integer) workerData.get(Constants.PORT),
-(List) workerData.get(Constants.TASK_IDS),
-(Map) 
workerData.get(Constants.DEFAULT_SHARED_RESOURCES),
-(Map) 
workerData.get(Constants.USER_SHARED_RESOURCES),
+workerData.getPort(), workerData.getTaskIds(),
+workerData.getDefaultSharedResources(),
+   workerData.getUserSharedResources(),
--- End diff --

nit: indentation is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

2016-11-02 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1756
  
@abhishekagarwal87 I created a pull request to your repo for the failing 
tests 

https://github.com/abhishekagarwal87/storm/pull/7

I will keep looking at the pull request


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694

2016-11-02 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1757
  
@HeartSaVioR I am OK with a common module, but does it make sense to have a 
common module, with little or no dependencies, just to hold a couple of 
classes, and have both kafka modules refer to it? In this particular case, one 
of the classes that need to be reused lives in storm-starter, and three other 
classes are common to both storm-kafka and storm-kafka-client.

Should we create a common module to all the examples ? Or should the common 
module be storm-starter ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1751: [STORM-2172][SQL] Support Avro as input / output f...

2016-11-02 Thread sachingsachin
Github user sachingsachin commented on a diff in the pull request:

https://github.com/apache/storm/pull/1751#discussion_r86188870
  
--- Diff: 
external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/avro/AvroScheme.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.serde.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.sql.runtime.utils.SerdeUtils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class AvroScheme implements Scheme {
+  private final String schemaString;
+  private final List fieldNames;
+  private final CachedSchemas schemas;
+
+  public AvroScheme(String schemaString, List fieldNames) {
+this.schemaString = schemaString;
+this.fieldNames = fieldNames;
+this.schemas = new CachedSchemas();
+  }
+
+  @Override
+  public List deserialize(ByteBuffer ser) {
+try {
+  Schema schema = schemas.getSchema(schemaString);
+
+  DatumReader reader = new GenericDatumReader<>(schema);
+  BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(Utils.toByteArray(ser), null);
+  GenericRecord record = reader.read(null, decoder);
+
+  ArrayList list = new ArrayList<>(fieldNames.size());
+  for (String field : fieldNames) {
+Object value = record.get(field);
+// Avro strings are stored using a special Avro type instead of 
using Java primitives
+if (value instanceof Utf8) {
+  list.add(value.toString());
+} else if (value instanceof Map) {
+  Map map = 
SerdeUtils.convertAvroUtf8Map((Map)value);
+  list.add(map);
+} else {
+  list.add(value);
--- End diff --

Do we need to check array type containers too for Utf8-to-String 
conversion? Example: array or set of Utf-8 objects? That should cover all the 
cases IMO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1751: [STORM-2172][SQL] Support Avro as input / output f...

2016-11-02 Thread sachingsachin
Github user sachingsachin commented on a diff in the pull request:

https://github.com/apache/storm/pull/1751#discussion_r86189092
  
--- Diff: 
external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.utils;
+
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.util.Utf8;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.serde.avro.AvroScheme;
+import org.apache.storm.sql.runtime.serde.avro.AvroSerializer;
+import org.apache.storm.sql.runtime.serde.json.JsonScheme;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.utils.Utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public final class SerdeUtils {
+public static Scheme getScheme(String inputFormatClass, Properties 
properties, List fieldNames) {
+Scheme scheme;
+if (isNotEmpty(inputFormatClass)) {
+switch (inputFormatClass) {
+case "org.apache.storm.sql.runtime.serde.json.JsonScheme" :
+scheme = new JsonScheme(fieldNames);
+break;
+case "org.apache.storm.sql.runtime.serde.avro.AvroScheme" :
+String schemaString = 
properties.getProperty("avro.schema");
+Preconditions.checkArgument(isNotEmpty(schemaString), 
"avro.schema can not be empty");
+scheme = new AvroScheme(schemaString, fieldNames);
+break;
+default:
+scheme = Utils.newInstance(inputFormatClass);
+}
+} else {
+//use JsonScheme as the default scheme
+scheme = new JsonScheme(fieldNames);
+}
+return scheme;
+}
+
+public static IOutputSerializer getSerializer(String 
outputFormatClass, Properties properties, List fieldNames) {
+IOutputSerializer serializer;
+if (isNotEmpty(outputFormatClass)) {
+switch (outputFormatClass) {
+case 
"org.apache.storm.sql.runtime.serde.json.JsonSerializer" :
+serializer = new JsonSerializer(fieldNames);
+break;
+case 
"org.apache.storm.sql.runtime.serde.avro.AvroSerializer" :
+String schemaString = 
properties.getProperty("avro.schema");
+Preconditions.checkArgument(isNotEmpty(schemaString), 
"avro.schema can not be empty");
+serializer = new AvroSerializer(schemaString, 
fieldNames);
+break;
+default:
+serializer = Utils.newInstance(outputFormatClass);
+}
+} else {
+//use JsonSerializer as the default serializer
+serializer = new JsonSerializer(fieldNames);
+}
+return serializer;
+}
+
+public static Map convertAvroUtf8Map(Map value) {
+Map map = new HashMap<>();
+for (Map.Entry entry : value.entrySet()) {
+// Avro only allows maps with Strings for keys, so we only 
have to worry
+// about deserializing the values
+Object key = entry.getKey().toString();
--- End diff --

Avro supports non-string maps too. So conversion to string might be 
incorrect in those cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct..

[GitHub] storm pull request #1751: [STORM-2172][SQL] Support Avro as input / output f...

2016-11-02 Thread sachingsachin
Github user sachingsachin commented on a diff in the pull request:

https://github.com/apache/storm/pull/1751#discussion_r86189208
  
--- Diff: 
external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/SerdeUtils.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.utils;
+
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.util.Utf8;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+import org.apache.storm.sql.runtime.serde.avro.AvroScheme;
+import org.apache.storm.sql.runtime.serde.avro.AvroSerializer;
+import org.apache.storm.sql.runtime.serde.json.JsonScheme;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.utils.Utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public final class SerdeUtils {
+public static Scheme getScheme(String inputFormatClass, Properties 
properties, List fieldNames) {
+Scheme scheme;
+if (isNotEmpty(inputFormatClass)) {
+switch (inputFormatClass) {
+case "org.apache.storm.sql.runtime.serde.json.JsonScheme" :
+scheme = new JsonScheme(fieldNames);
+break;
+case "org.apache.storm.sql.runtime.serde.avro.AvroScheme" :
+String schemaString = 
properties.getProperty("avro.schema");
+Preconditions.checkArgument(isNotEmpty(schemaString), 
"avro.schema can not be empty");
+scheme = new AvroScheme(schemaString, fieldNames);
+break;
+default:
+scheme = Utils.newInstance(inputFormatClass);
+}
+} else {
+//use JsonScheme as the default scheme
+scheme = new JsonScheme(fieldNames);
+}
+return scheme;
+}
+
+public static IOutputSerializer getSerializer(String 
outputFormatClass, Properties properties, List fieldNames) {
+IOutputSerializer serializer;
+if (isNotEmpty(outputFormatClass)) {
+switch (outputFormatClass) {
+case 
"org.apache.storm.sql.runtime.serde.json.JsonSerializer" :
+serializer = new JsonSerializer(fieldNames);
+break;
+case 
"org.apache.storm.sql.runtime.serde.avro.AvroSerializer" :
+String schemaString = 
properties.getProperty("avro.schema");
+Preconditions.checkArgument(isNotEmpty(schemaString), 
"avro.schema can not be empty");
+serializer = new AvroSerializer(schemaString, 
fieldNames);
+break;
+default:
+serializer = Utils.newInstance(outputFormatClass);
+}
+} else {
+//use JsonSerializer as the default serializer
+serializer = new JsonSerializer(fieldNames);
+}
+return serializer;
+}
+
+public static Map convertAvroUtf8Map(Map value) {
+Map map = new HashMap<>();
+for (Map.Entry entry : value.entrySet()) {
+// Avro only allows maps with Strings for keys, so we only 
have to worry
+// about deserializing the values
+Object key = entry.getKey().toString();
+Object val = entry.getValue();
+
+if (val instanceof Utf8) {
+map.put(key, val.toString());
+} else if (val instanceof Map) {
+map.put(key, convertAvroUtf8Map((Map)val));
+} else {
+map.put(key, val);
--- End diff --

Same comment about arrays/sets of Utf

[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694

2016-11-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1757
  
@hmcl I agree that we should try to avoid duplicated code, but I also have 
seen that unneeded dependency makes dependency problem which is not directed 
from first level deps but transitive dependencies.
If common codes are not related to kafka API, I'd rather to have common 
module between the twos, but if it's not (and even if it is), either way is OK 
to me. That was just my preference, and others could be thinking differently, 
and don't have strong opinion about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

2016-11-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1756#discussion_r86168341
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import com.google.common.base.Preconditions;
+import com.lmax.disruptor.EventHandler;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.*;
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.ExecutorShutdown;
+import org.apache.storm.executor.IRunningExecutor;
+import org.apache.storm.executor.LocalExecutor;
+import org.apache.storm.generated.*;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.*;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class Worker implements Shutdownable, DaemonCommon {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Worker.class);
+private final Map conf;
+private final IContext context;
+private final String topologyId;
+private final String assignmentId;
+private final int port;
+private final String workerId;
+private final LogConfigManager logConfigManager;
+
+
+private WorkerState workerState;
+private AtomicReference> executorsAtom;
+private Thread transferThread;
+private WorkerBackpressureThread backpressureThread;
+
+private AtomicReference credentialsAtom;
+private Subject subject;
+private Collection autoCreds;
+
+
+/**
+ * TODO: should worker even take the topologyId as input? this should 
be
+ * deducable from cluster state (by searching through assignments)
+ * what about if there's inconsistency in assignments? -> but nimbus 
should guarantee this consistency
+ *
+ * @param conf - Storm configuration
+ * @param context  -
+ * @param topologyId   - topology id
+ * @param assignmentId - assignement id
+ * @param port - port on which the worker runs
+ * @param workerId - worker id
+ */
+
+public Worker(Map conf, IContext context, String topologyId, String 
assignmentId, int port, String workerId) {
+this.conf = conf;
+this.context = context;
+this.topologyId = topologyId;
+this.assignmentId = assignmentId;
+this.port = port;
+this.workerId = workerId;
+this.logConfigManager = new LogConfigManager();
+}
+
+public void start() throws Exception {
+LOG.info("Launching worker for {} on {}:{} with id {} and conf 
{}", topologyId, assignmentId, port, workerId,
+conf);
+// because in local mode, its not a separate
+// process. supervisor will register it in this case
+// if ConfigUtils.isLocalMode(conf) returns false then it is in 
distributed mode.
+i

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

2016-11-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1756#discussion_r86166475
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import com.google.common.base.Preconditions;
+import com.lmax.disruptor.EventHandler;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.*;
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.ExecutorShutdown;
+import org.apache.storm.executor.IRunningExecutor;
+import org.apache.storm.executor.LocalExecutor;
+import org.apache.storm.generated.*;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.*;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class Worker implements Shutdownable, DaemonCommon {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Worker.class);
+private final Map conf;
+private final IContext context;
+private final String topologyId;
+private final String assignmentId;
+private final int port;
+private final String workerId;
+private final LogConfigManager logConfigManager;
+
+
+private WorkerState workerState;
+private AtomicReference> executorsAtom;
+private Thread transferThread;
+private WorkerBackpressureThread backpressureThread;
+
+private AtomicReference credentialsAtom;
+private Subject subject;
+private Collection autoCreds;
+
+
+/**
+ * TODO: should worker even take the topologyId as input? this should 
be
+ * deducable from cluster state (by searching through assignments)
+ * what about if there's inconsistency in assignments? -> but nimbus 
should guarantee this consistency
+ *
+ * @param conf - Storm configuration
+ * @param context  -
+ * @param topologyId   - topology id
+ * @param assignmentId - assignement id
+ * @param port - port on which the worker runs
+ * @param workerId - worker id
+ */
+
+public Worker(Map conf, IContext context, String topologyId, String 
assignmentId, int port, String workerId) {
+this.conf = conf;
+this.context = context;
+this.topologyId = topologyId;
+this.assignmentId = assignmentId;
+this.port = port;
+this.workerId = workerId;
+this.logConfigManager = new LogConfigManager();
+}
+
+public void start() throws Exception {
+LOG.info("Launching worker for {} on {}:{} with id {} and conf 
{}", topologyId, assignmentId, port, workerId,
+conf);
+// because in local mode, its not a separate
+// process. supervisor will register it in this case
+// if ConfigUtils.isLocalMode(conf) returns false then it is in 
distributed mode.
+i

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

2016-11-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1756#discussion_r86178116
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java ---
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.LogLevel;
+import org.apache.storm.generated.LogLevelAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LogConfigManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(LogConfigManager.class);
+
+private final AtomicReference> 
latestLogConfig;
+private final Map originalLogLevels;
+
+public LogConfigManager() {
+this(new AtomicReference<>(new TreeMap<>()));
+}
+
+public LogConfigManager(AtomicReference> 
latestLogConfig) {
+this.latestLogConfig = latestLogConfig;
+this.originalLogLevels = getLoggerLevels();
+LOG.info("Started with log levels: {}", originalLogLevels);
+}
+
+public void processLogConfigChange(LogConfig logConfig) {
+if (null != logConfig) {
+LOG.debug("Processing received log config: {}", logConfig);
+TreeMap loggers = new 
TreeMap<>(logConfig.get_named_logger_level());
+LoggerContext logContext = (LoggerContext) 
LogManager.getContext();
--- End diff --

In the previous code it passed in a false.  Not sure if it makes much 
difference in our case though.


https://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/LogManager.html#getContext(boolean)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1748: [STORM-2103][SQL] Introduce new sql external module: stor...

2016-11-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1748
  
@vesense Thanks for addressing numerous works. I hadn't have time recently 
and still have less spare time, but I'll try to find some time to review. 
Thanks in advance for your patience.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1748: [STORM-2103][SQL] Introduce new sql external module: stor...

2016-11-02 Thread vesense
Github user vesense commented on the issue:

https://github.com/apache/storm/pull/1748
  
I want to make it clear that PRs getting merged order is 
https://github.com/apache/storm/pull/1748 
https://github.com/apache/storm/pull/1751 
https://github.com/apache/storm/pull/1754
Hope these PRs can be merged ASAP since many following up PRs are on the 
road.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1747: STORM-2018: Supervisor V2 (1.0.x)

2016-11-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1747
  
@revans2 
I checked the last commit and it looks good. Travis CI failure is 
unrelated. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1750: STORM-2175: fix double close of workers

2016-11-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1750
  
@revans2 +1 and I'll take a look at #1749 very soon. Thanks for the great 
work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1749: STORM-2175: fix double close of workers

2016-11-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1749
  
@revans2 also +1 on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1750: STORM-2175: fix double close of workers

2016-11-02 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1750
  
@HeartSaVioR could you take a look at this and #1749 so I can merge this in?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

2016-11-02 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1756
  
@abhishekagarwal87 happy to take a look.  I'll see what I can do on the 
tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694

2016-11-02 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1757
  
@HeartSaVioR why are we trying to avoid the module dependencies so much, to 
the point of creating duplicate code? Shouldn't we try to avoid creating 
duplicate code at all costs?

If we eliminate all the module dependencies I created, we will have to 
duplicate 4 classes. Two of them will be duplicated in 2 places each, and the 
other 2 duplicated in one place. We will have 6 classes more than we need with 
the current module structure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1757: Apache master storm 2182 top storm 1694

2016-11-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1757
  
Before looking into the detail, how much storm-kafka-example and 
storm-kafka-client-example shares the code? Personally I'm OK to have some 
duplications if it can get rid of dependency between modules.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---