Also, i run the code in Utils, the problem is in line (Map) JSONValue.parse(JSONValue.toJSONString(stormConf)) in isValidConf, the JSONValue.parse() returns a null object.
Στις 9:33 π.μ. Παρασκευή, 21 Ιουλίου 2017, ο/η kalliopi kalantzaki <tsantalos-spa...@yahoo.gr> έγραψε: Thank you for your response. Yes, the problem is when i add to configuration the object AESEncryption. However, even if i comment all private members of this class and just use an empty class object, the problem remains. Even if i just use conf.registerSerialization(AESEncryption.class) without the custom serializer the problem is the same. Which means that storm does not get any serialization on non-primitive types in configuration. See the code... Config conf=new Config();conf.registerSerialization(AESEncryption.class, AESSerializer.class);conf.setFallBackOnJavaSerialization(false); LocalCluster cluster = new LocalCluster(); conf.put("aes",new AESEncryption()); cluster.submitTopology("stormKafkaTopology", conf, builder.createTopology()); public class AESEncryption{ // public static final String cypherAlgorithm = "AES/CBC/PKCS5Padding"; // private SecretKey privateKey;. // private IvParameterSpec entrophy; // private String encryptedMsg; // public static final int bitLength = 256; //.. setters getterss methods} public class AESSerilizer extends Serializer<AESEncryption> implements Serializable { public AESSerilizer() { } @Override public void write(Kryo kryo, Output output, AESEncryption object) { kryo.writeClassAndObject(output, object); } @Override public AESEncryption read(Kryo kryo, Input input, Class<AESEncryption> type) { return (AESEncryption) kryo.readClassAndObject(input); } } Στις 5:09 μ.μ. Πέμπτη, 20 Ιουλίου 2017, ο/η Bobby Evans <ev...@yahoo-inc.com> έγραψε: FYI I filed https://issues.apache.org/jira/browse/STORM-2649 to make the exception better. It should be fairly simple to do (aka I marked it as a newbie jira) if someone wants to look into try and take a crack at it. - Bobby On Thursday, July 20, 2017, 8:49:20 AM CDT, Bobby Evans <ev...@yahoo-inc.com> wrote: I am not sure which config is causing the issue. You can start testing it by calling assert org.apache.storm.utils.Utils.isValidConf(conf); as you add things to the config to find which config is causing the issues. Sadly the code does not provide any information about what it didn't like about the config. You can look at the code for the validity check https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L476-L502 It should not be too hard to modify it to at least log which entries are different when it does the check. - Bobby On Thursday, July 20, 2017, 2:44:48 AM CDT, kalliopi kalantzaki <tsantalos-spa...@yahoo.gr> wrote: Hello, I am trying to pass to storm configuration an object (not priminitive type), the AESEncryption.class. I created a custom kryo serializer the AESSerializer class and in main topology i used: Config conf=new Config();conf.registerSerialization(AESEncryption.class, AESSerializer.class);conf.setFallBackOnJavaSerialization(false); LocalCluster cluster = new LocalCluster();cluster.submitTopology("stormKafkaTopology", conf, builder.createTopology()); public class AESEncryption{ public static final String cypherAlgorithm = "AES/CBC/PKCS5Padding"; private SecretKey privateKey;. private IvParameterSpec entrophy; private String encryptedMsg; public static final int bitLength = 256; ... setters getterss methods} public class AESSerilizer extends Serializer<AESEncryption> implements Serializable { public AESSerilizer() { } @Override public void write(Kryo kryo, Output output, AESEncryption object) { kryo.writeClassAndObject(output, object); } @Override public AESEncryption read(Kryo kryo, Input input, Class<AESEncryption> type) { return (AESEncryption) kryo.readClassAndObject(input); } } However, even using kryo serialization i always get the same exception....Problem initializing cluster topology: java.lang.IllegalArgumentException: Topology conf is not json-serializable at org.apache.storm.testing$submit_local_topology.invoke(testing.clj:308) ~[storm-core-1.1.0.jar:1.1.0] at org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:49) ~[storm-core-1.1.0.jar:1.1.0] at org.apache.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-1.1.0.jar:1.1.0] at com.jrtechnologies.ifg.bsp.kafka.storm.StormKafkaStarter.main(StormKafkaStarter.java:117) [classes/:?] Any ideas what am i doing wrong?