Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch 872f6e2c9 -> a06f52105


STORM-2216: prefer JSONValue.parseWithException


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58288ae5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58288ae5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58288ae5

Branch: refs/heads/1.0.x-branch
Commit: 58288ae57afe5b01f983737a40c07b3e0ad0b955
Parents: 872f6e2
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Nov 22 15:04:14 2016 -0600
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Fri Nov 25 10:18:29 2016 +0900

----------------------------------------------------------------------
 .../org/apache/storm/kafka/DynamicBrokersReader.java  |  8 +++++---
 .../src/jvm/org/apache/storm/kafka/ZkState.java       |  2 +-
 .../src/jvm/org/apache/storm/drpc/ReturnResults.java  | 14 +++++++++++---
 .../org/apache/storm/multilang/JsonSerializer.java    | 11 +++++------
 .../org/apache/storm/task/GeneralTopologyContext.java | 11 +++++++----
 .../org/apache/storm/topology/TopologyBuilder.java    | 12 ++++++++++--
 .../storm/trident/drpc/ReturnResultsReducer.java      | 11 +++++++++--
 .../trident/state/JSONNonTransactionalSerializer.java |  5 +++--
 .../storm/trident/state/JSONOpaqueSerializer.java     |  6 +++---
 .../trident/state/JSONTransactionalSerializer.java    |  6 +++---
 .../org/apache/storm/trident/testing/TuplifyArgs.java | 14 +++++++++-----
 .../trident/topology/state/TransactionalState.java    |  2 +-
 storm-core/src/jvm/org/apache/storm/utils/Utils.java  |  4 +---
 13 files changed, 68 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
index 2ad90da..dcf5908 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
@@ -160,7 +160,7 @@ public class DynamicBrokersReader {
         try {
             String topicBrokersPath = partitionPath(topic);
             byte[] hostPortData = _curator.getData().forPath(topicBrokersPath 
+ "/" + partition + "/state");
-            Map<Object, Object> value = (Map<Object, Object>) 
JSONValue.parse(new String(hostPortData, "UTF-8"));
+            Map<Object, Object> value = (Map<Object, Object>) 
JSONValue.parseWithException(new String(hostPortData, "UTF-8"));
             Integer leader = ((Number) value.get("leader")).intValue();
             if (leader == -1) {
                 throw new RuntimeException("No leader found for partition " + 
partition);
@@ -186,11 +186,13 @@ public class DynamicBrokersReader {
      */
     private Broker getBrokerHost(byte[] contents) {
         try {
-            Map<Object, Object> value = (Map<Object, Object>) 
JSONValue.parse(new String(contents, "UTF-8"));
+            Map<Object, Object> value = (Map<Object, Object>) 
JSONValue.parseWithException(new String(contents, "UTF-8"));
             String host = (String) value.get("host");
             Integer port = ((Long) value.get("port")).intValue();
             return new Broker(host, port);
-        } catch (UnsupportedEncodingException e) {
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
index 1428bb7..ad96006 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
@@ -91,7 +91,7 @@ public class ZkState {
             if (b == null) {
                 return null;
             }
-            return (Map<Object, Object>) JSONValue.parse(new String(b, 
"UTF-8"));
+            return (Map<Object, Object>) JSONValue.parseWithException(new 
String(b, "UTF-8"));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java 
b/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
index a9a5aa1..24174b2 100644
--- a/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
+++ b/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
 
 
 public class ReturnResults extends BaseRichBolt {
@@ -59,13 +60,20 @@ public class ReturnResults extends BaseRichBolt {
     public void execute(Tuple input) {
         String result = (String) input.getValue(0);
         String returnInfo = (String) input.getValue(1);
-        if(returnInfo!=null) {
-            Map retMap = (Map) JSONValue.parse(returnInfo);
+        if (returnInfo!=null) {
+            Map retMap = null;
+            try {
+                retMap = (Map) JSONValue.parseWithException(returnInfo);
+            } catch (ParseException e) {
+                 LOG.error("Parseing returnInfo failed", e);
+                 _collector.fail(input);
+                 return;
+            }
             final String host = (String) retMap.get("host");
             final int port = Utils.getInt(retMap.get("port"));
             String id = (String) retMap.get("id");
             DistributedRPCInvocations.Iface client;
-            if(local) {
+            if (local) {
                 client = (DistributedRPCInvocations.Iface) 
ServiceRegistry.getService(host);
             } else {
                 List server = new ArrayList() {{

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java 
b/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java
index eecf826..ec6d4c4 100644
--- a/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java
+++ b/storm-core/src/jvm/org/apache/storm/multilang/JsonSerializer.java
@@ -31,6 +31,7 @@ import java.util.Map;
 
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
 
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Utils;
@@ -165,12 +166,10 @@ public class JsonSerializer implements ISerializer {
     }
 
     private Object readMessage() throws IOException, NoOutputException {
-        String string = readString();
-        Object msg = JSONValue.parse(string);
-        if (msg != null) {
-            return msg;
-        } else {
-            throw new IOException("unable to parse: " + string);
+        try {
+            return JSONValue.parseWithException(readString());
+        } catch (ParseException e) {
+            throw new IOException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java 
b/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
index f0cb489..45ca740 100644
--- a/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
+++ b/storm-core/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import org.json.simple.JSONValue;
 import org.json.simple.JSONAware;
+import org.json.simple.parser.ParseException;
 
 public class GeneralTopologyContext implements JSONAware {
     private StormTopology _topology;
@@ -187,10 +188,12 @@ public class GeneralTopologyContext implements JSONAware {
             ComponentCommon common = getComponentCommon(spout);
             String jsonConf = common.get_json_conf();
             if(jsonConf!=null) {
-                Map conf = (Map) JSONValue.parse(jsonConf);
-                Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
-                if(comp!=null) {
-                    max = Math.max(Utils.getInt(comp), max);
+                try {
+                    Map conf = (Map) JSONValue.parseWithException(jsonConf);
+                    Object comp = 
conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
+                    max = Math.max(Utils.getInt(comp, max), max);
+                } catch (ParseException e) {
+                    throw new RuntimeException(e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java 
b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 92cad77..40ede4c 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -30,6 +30,7 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.Utils;
 import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
 
 import java.io.NotSerializableException;
 import java.nio.ByteBuffer;
@@ -568,8 +569,15 @@ public class TopologyBuilder {
     }
     
     private static Map parseJson(String json) {
-        if(json==null) return new HashMap();
-        else return (Map) JSONValue.parse(json);
+        if (json==null) {
+            return new HashMap();
+        } else {
+            try {
+                return (Map) JSONValue.parseWithException(json);
+            } catch (ParseException e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
     
     private static String mergeIntoJson(Map into, Map newMap) {

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java 
b/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
index 6bb9a82..59c7de0 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
 import org.apache.storm.trident.drpc.ReturnResultsReducer.ReturnResultsState;
 import org.apache.storm.trident.operation.MultiReducer;
 import org.apache.storm.trident.operation.TridentCollector;
@@ -76,9 +77,15 @@ public class ReturnResultsReducer implements 
MultiReducer<ReturnResultsState> {
     @Override
     public void complete(ReturnResultsState state, TridentCollector collector) 
{
         // only one of the multireducers will receive the tuples
-        if(state.returnInfo!=null) {
+        if (state.returnInfo!=null) {
             String result = JSONValue.toJSONString(state.results);
-            Map retMap = (Map) JSONValue.parse(state.returnInfo);
+            Map retMap = null;
+            try {
+                retMap = (Map) JSONValue.parseWithException(state.returnInfo);
+            } catch (ParseException e) {
+                collector.reportError(e);
+                return;
+            }
             final String host = (String) retMap.get("host");
             final int port = Utils.getInt(retMap.get("port"));
             String id = (String) retMap.get("id");

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java
 
b/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java
index 782f313..ee5191f 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/state/JSONNonTransactionalSerializer.java
@@ -19,6 +19,7 @@ package org.apache.storm.trident.state;
 
 import java.io.UnsupportedEncodingException;
 import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
 
 
 public class JSONNonTransactionalSerializer implements Serializer {
@@ -35,8 +36,8 @@ public class JSONNonTransactionalSerializer implements 
Serializer {
     @Override
     public Object deserialize(byte[] b) {
         try {
-            return JSONValue.parse(new String(b, "UTF-8"));
-        } catch (UnsupportedEncodingException e) {
+            return JSONValue.parseWithException(new String(b, "UTF-8"));
+        } catch (UnsupportedEncodingException | ParseException e) {
             throw new RuntimeException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java 
b/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java
index 0c95f37..bd9a4a1 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/state/JSONOpaqueSerializer.java
@@ -21,7 +21,7 @@ import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.List;
 import org.json.simple.JSONValue;
-
+import org.json.simple.parser.ParseException;
 
 public class JSONOpaqueSerializer implements Serializer<OpaqueValue> {
 
@@ -42,9 +42,9 @@ public class JSONOpaqueSerializer implements 
Serializer<OpaqueValue> {
     public OpaqueValue deserialize(byte[] b) {
         try {
             String s = new String(b, "UTF-8");
-            List deser = (List) JSONValue.parse(s);
+            List deser = (List) JSONValue.parseWithException(s);
             return new OpaqueValue((Long) deser.get(0), deser.get(1), 
deser.get(2));
-        } catch (UnsupportedEncodingException e) {
+        } catch (UnsupportedEncodingException | ParseException e) {
             throw new RuntimeException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java
 
b/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java
index 0370100..f9d1b93 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/state/JSONTransactionalSerializer.java
@@ -21,7 +21,7 @@ import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.List;
 import org.json.simple.JSONValue;
-
+import org.json.simple.parser.ParseException;
 
 public class JSONTransactionalSerializer implements 
Serializer<TransactionalValue> {
     @Override
@@ -40,9 +40,9 @@ public class JSONTransactionalSerializer implements 
Serializer<TransactionalValu
     public TransactionalValue deserialize(byte[] b) {
         try {
             String s = new String(b, "UTF-8");
-            List deser = (List) JSONValue.parse(s);
+            List deser = (List) JSONValue.parseWithException(s);
             return new TransactionalValue((Long) deser.get(0), deser.get(1));
-        } catch (UnsupportedEncodingException e) {
+        } catch (UnsupportedEncodingException | ParseException e) {
             throw new RuntimeException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java 
b/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java
index 5c52000..e6c37e2 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/testing/TuplifyArgs.java
@@ -19,6 +19,7 @@ package org.apache.storm.trident.testing;
 
 import java.util.List;
 import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
 import org.apache.storm.trident.operation.BaseFunction;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.tuple.TridentTuple;
@@ -27,11 +28,14 @@ public class TuplifyArgs extends BaseFunction {
 
     @Override
     public void execute(TridentTuple input, TridentCollector collector) {
-        String args = input.getString(0);
-        List<List<Object>> tuples = (List) JSONValue.parse(args);
-        for(List<Object> tuple: tuples) {
-            collector.emit(tuple);
+        try {
+            String args = input.getString(0);
+            List<List<Object>> tuples = (List) 
JSONValue.parseWithException(args);
+            for (List<Object> tuple: tuples) {
+                collector.emit(tuple);
+            }
+        } catch (ParseException e) {
+            throw new RuntimeException(e);
         }
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index 3bf2794..df426d1 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -150,7 +150,7 @@ public class TransactionalState {
         path = "/" + path;
         try {
             if(_curator.checkExists().forPath(path)!=null) {
-                return JSONValue.parse(new 
String(_curator.getData().forPath(path), "UTF-8"));
+                return JSONValue.parseWithException(new 
String(_curator.getData().forPath(path), "UTF-8"));
             } else {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/58288ae5/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 8c9ce0a..af92eab 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -305,9 +305,7 @@ public class Utils {
             Object ret = JSONValue.parseWithException(in);
             in.close();
             return (Map<String,Object>)ret;
-        } catch (IOException ioe) {
-            throw new RuntimeException(ioe);
-        } catch (ParseException e) {
+        } catch (IOException | ParseException e) {
             throw new RuntimeException(e);
         }
     }

Reply via email to