Repository: storm Updated Branches: refs/heads/1.0.x-branch 872f6e2c9 -> a06f52105
STORM-2216: prefer JSONValue.parseWithException Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58288ae5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58288ae5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58288ae5 Branch: refs/heads/1.0.x-branch Commit: 58288ae57afe5b01f983737a40c07b3e0ad0b955 Parents: 872f6e2 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Tue Nov 22 15:04:14 2016 -0600 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Fri Nov 25 10:18:29 2016 +0900 ---------------------------------------------------------------------- .../org/apache/storm/kafka/DynamicBrokersReader.java | 8 +++++--- .../src/jvm/org/apache/storm/kafka/ZkState.java | 2 +- .../src/jvm/org/apache/storm/drpc/ReturnResults.java | 14 +++++++++++--- .../org/apache/storm/multilang/JsonSerializer.java | 11 +++++------ .../org/apache/storm/task/GeneralTopologyContext.java | 11 +++++++---- .../org/apache/storm/topology/TopologyBuilder.java | 12 ++++++++++-- .../storm/trident/drpc/ReturnResultsReducer.java | 11 +++++++++-- .../trident/state/JSONNonTransactionalSerializer.java | 5 +++-- .../storm/trident/state/JSONOpaqueSerializer.java | 6 +++--- .../trident/state/JSONTransactionalSerializer.java | 6 +++--- .../org/apache/storm/trident/testing/TuplifyArgs.java | 14 +++++++++----- .../trident/topology/state/TransactionalState.java | 2 +- storm-core/src/jvm/org/apache/storm/utils/Utils.java | 4 +--- 13 files changed, 68 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java index 2ad90da..dcf5908 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java @@ -160,7 +160,7 @@ public class DynamicBrokersReader { try { String topicBrokersPath = partitionPath(topic); byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state"); - Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8")); + Map<Object, Object> value = (Map<Object, Object>) JSONValue.parseWithException(new String(hostPortData, "UTF-8")); Integer leader = ((Number) value.get("leader")).intValue(); if (leader == -1) { throw new RuntimeException("No leader found for partition " + partition); @@ -186,11 +186,13 @@ public class DynamicBrokersReader { */ private Broker getBrokerHost(byte[] contents) { try { - Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8")); + Map<Object, Object> value = (Map<Object, Object>) JSONValue.parseWithException(new String(contents, "UTF-8")); String host = (String) value.get("host"); Integer port = ((Long) value.get("port")).intValue(); return new Broker(host, port); - } catch (UnsupportedEncodingException e) { + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java index 1428bb7..ad96006 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java @@ -91,7 +91,7 @@ public class ZkState { if (b == null) { return null; } - return (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8")); + return (Map<Object, Object>) JSONValue.parseWithException(new String(b, "UTF-8")); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java b/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java index a9a5aa1..24174b2 100644 --- a/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java +++ b/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.json.simple.JSONValue; +import org.json.simple.parser.ParseException; public class ReturnResults extends BaseRichBolt { @@ -59,13 +60,20 @@ public class ReturnResults extends BaseRichBolt { public void execute(Tuple input) { String result = (String) input.getValue(0); String returnInfo = (String) input.getValue(1); - if(returnInfo!=null) { - Map retMap = (Map) JSONValue.parse(returnInfo); + if (returnInfo!=null) { + Map retMap = null; + try { + retMap = (Map) JSONValue.parseWithException(returnInfo); + } catch (ParseException e) { + LOG.error("Parseing returnInfo failed", e); + _collector.fail(input); + return; + } final String host = (String) retMap.get("host"); final int port = Utils.getInt(retMap.get("port")); String id = (String) retMap.get("id"); DistributedRPCInvocations.Iface client; - if(local) { + if (local) { client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host); } else { List server = new ArrayList() {{ http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java b/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java index eecf826..ec6d4c4 100644 --- a/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java +++ b/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java @@ -31,6 +31,7 @@ import java.util.Map; import org.json.simple.JSONObject; import org.json.simple.JSONValue; +import org.json.simple.parser.ParseException; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Utils; @@ -165,12 +166,10 @@ public class JsonSerializer implements ISerializer { } private Object readMessage() throws IOException, NoOutputException { - String string = readString(); - Object msg = JSONValue.parse(string); - if (msg != null) { - return msg; - } else { - throw new IOException("unable to parse: " + string); + try { + return JSONValue.parseWithException(readString()); + } catch (ParseException e) { + throw new IOException(e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java b/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java index f0cb489..45ca740 100644 --- a/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java +++ b/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import org.json.simple.JSONValue; import org.json.simple.JSONAware; +import org.json.simple.parser.ParseException; public class GeneralTopologyContext implements JSONAware { private StormTopology _topology; @@ -187,10 +188,12 @@ public class GeneralTopologyContext implements JSONAware { ComponentCommon common = getComponentCommon(spout); String jsonConf = common.get_json_conf(); if(jsonConf!=null) { - Map conf = (Map) JSONValue.parse(jsonConf); - Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS); - if(comp!=null) { - max = Math.max(Utils.getInt(comp), max); + try { + Map conf = (Map) JSONValue.parseWithException(jsonConf); + Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS); + max = Math.max(Utils.getInt(comp, max), max); + } catch (ParseException e) { + throw new RuntimeException(e); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java index 92cad77..40ede4c 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java +++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java @@ -30,6 +30,7 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.Utils; import org.json.simple.JSONValue; +import org.json.simple.parser.ParseException; import java.io.NotSerializableException; import java.nio.ByteBuffer; @@ -568,8 +569,15 @@ public class TopologyBuilder { } private static Map parseJson(String json) { - if(json==null) return new HashMap(); - else return (Map) JSONValue.parse(json); + if (json==null) { + return new HashMap(); + } else { + try { + return (Map) JSONValue.parseWithException(json); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } } private static String mergeIntoJson(Map into, Map newMap) { http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java b/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java index 6bb9a82..59c7de0 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java +++ b/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java @@ -31,6 +31,7 @@ import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.json.simple.JSONValue; +import org.json.simple.parser.ParseException; import org.apache.storm.trident.drpc.ReturnResultsReducer.ReturnResultsState; import org.apache.storm.trident.operation.MultiReducer; import org.apache.storm.trident.operation.TridentCollector; @@ -76,9 +77,15 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> { @Override public void complete(ReturnResultsState state, TridentCollector collector) { // only one of the multireducers will receive the tuples - if(state.returnInfo!=null) { + if (state.returnInfo!=null) { String result = JSONValue.toJSONString(state.results); - Map retMap = (Map) JSONValue.parse(state.returnInfo); + Map retMap = null; + try { + retMap = (Map) JSONValue.parseWithException(state.returnInfo); + } catch (ParseException e) { + collector.reportError(e); + return; + } final String host = (String) retMap.get("host"); final int port = Utils.getInt(retMap.get("port")); String id = (String) retMap.get("id"); http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java b/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java index 782f313..ee5191f 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java +++ b/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java @@ -19,6 +19,7 @@ package org.apache.storm.trident.state; import java.io.UnsupportedEncodingException; import org.json.simple.JSONValue; +import org.json.simple.parser.ParseException; public class JSONNonTransactionalSerializer implements Serializer { @@ -35,8 +36,8 @@ public class JSONNonTransactionalSerializer implements Serializer { @Override public Object deserialize(byte[] b) { try { - return JSONValue.parse(new String(b, "UTF-8")); - } catch (UnsupportedEncodingException e) { + return JSONValue.parseWithException(new String(b, "UTF-8")); + } catch (UnsupportedEncodingException | ParseException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java b/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java index 0c95f37..bd9a4a1 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java +++ b/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java @@ -21,7 +21,7 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; import org.json.simple.JSONValue; - +import org.json.simple.parser.ParseException; public class JSONOpaqueSerializer implements Serializer<OpaqueValue> { @@ -42,9 +42,9 @@ public class JSONOpaqueSerializer implements Serializer<OpaqueValue> { public OpaqueValue deserialize(byte[] b) { try { String s = new String(b, "UTF-8"); - List deser = (List) JSONValue.parse(s); + List deser = (List) JSONValue.parseWithException(s); return new OpaqueValue((Long) deser.get(0), deser.get(1), deser.get(2)); - } catch (UnsupportedEncodingException e) { + } catch (UnsupportedEncodingException | ParseException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java b/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java index 0370100..f9d1b93 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java +++ b/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java @@ -21,7 +21,7 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; import org.json.simple.JSONValue; - +import org.json.simple.parser.ParseException; public class JSONTransactionalSerializer implements Serializer<TransactionalValue> { @Override @@ -40,9 +40,9 @@ public class JSONTransactionalSerializer implements Serializer<TransactionalValu public TransactionalValue deserialize(byte[] b) { try { String s = new String(b, "UTF-8"); - List deser = (List) JSONValue.parse(s); + List deser = (List) JSONValue.parseWithException(s); return new TransactionalValue((Long) deser.get(0), deser.get(1)); - } catch (UnsupportedEncodingException e) { + } catch (UnsupportedEncodingException | ParseException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java b/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java index 5c52000..e6c37e2 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java +++ b/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java @@ -19,6 +19,7 @@ package org.apache.storm.trident.testing; import java.util.List; import org.json.simple.JSONValue; +import org.json.simple.parser.ParseException; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.tuple.TridentTuple; @@ -27,11 +28,14 @@ public class TuplifyArgs extends BaseFunction { @Override public void execute(TridentTuple input, TridentCollector collector) { - String args = input.getString(0); - List<List<Object>> tuples = (List) JSONValue.parse(args); - for(List<Object> tuple: tuples) { - collector.emit(tuple); + try { + String args = input.getString(0); + List<List<Object>> tuples = (List) JSONValue.parseWithException(args); + for (List<Object> tuple: tuples) { + collector.emit(tuple); + } + } catch (ParseException e) { + throw new RuntimeException(e); } } - } http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java index 3bf2794..df426d1 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java @@ -150,7 +150,7 @@ public class TransactionalState { path = "/" + path; try { if(_curator.checkExists().forPath(path)!=null) { - return JSONValue.parse(new String(_curator.getData().forPath(path), "UTF-8")); + return JSONValue.parseWithException(new String(_curator.getData().forPath(path), "UTF-8")); } else { return null; } http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index 8c9ce0a..af92eab 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -305,9 +305,7 @@ public class Utils { Object ret = JSONValue.parseWithException(in); in.close(); return (Map<String,Object>)ret; - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } catch (ParseException e) { + } catch (IOException | ParseException e) { throw new RuntimeException(e); } }