Also, i run the code in Utils, the problem is in line (Map) 
JSONValue.parse(JSONValue.toJSONString(stormConf)) in isValidConf, the 
JSONValue.parse() returns a null object. 
 
 

    Στις 9:33 π.μ. Παρασκευή, 21 Ιουλίου 2017, ο/η kalliopi kalantzaki 
<tsantalos-spa...@yahoo.gr> έγραψε:
 

 Thank you for your response. 
Yes, the problem is when i add to configuration the object AESEncryption. 
However, even if i comment all private members of this class and just use an 
empty class object, the problem remains. Even if i just use 
conf.registerSerialization(AESEncryption.class) without the custom serializer 
the problem is the same. Which means that storm does not get any serialization 
on non-primitive types in configuration. See the code...
Config conf=new Config();conf.registerSerialization(AESEncryption.class, 
AESSerializer.class);conf.setFallBackOnJavaSerialization(false);
LocalCluster cluster = new LocalCluster();
conf.put("aes",new AESEncryption());
cluster.submitTopology("stormKafkaTopology", conf, builder.createTopology());
public class AESEncryption{

   // public static final String cypherAlgorithm = "AES/CBC/PKCS5Padding";
   // private SecretKey privateKey;.
   // private IvParameterSpec entrophy;
   // private String encryptedMsg;
   // public static final int bitLength = 256;
//.. setters getterss methods}

public class AESSerilizer extends Serializer<AESEncryption> implements 
Serializable {

    public AESSerilizer() {
    }

    @Override
    public void write(Kryo kryo, Output output, AESEncryption object) {


        kryo.writeClassAndObject(output, object);

    }

    @Override
    public AESEncryption read(Kryo kryo, Input input, Class<AESEncryption> 
type) {
        return (AESEncryption) kryo.readClassAndObject(input);
    }

}



    Στις 5:09 μ.μ. Πέμπτη, 20 Ιουλίου 2017, ο/η Bobby Evans 
<ev...@yahoo-inc.com> έγραψε:
 

 FYI I filed https://issues.apache.org/jira/browse/STORM-2649 to make the 
exception better.  It should be fairly simple to do (aka I marked it as a 
newbie jira) if someone wants to look into try and take a crack at it.


- Bobby


On Thursday, July 20, 2017, 8:49:20 AM CDT, Bobby Evans <ev...@yahoo-inc.com> 
wrote:

I am not sure which config is causing the issue.  You can start testing it by 
calling
assert org.apache.storm.utils.Utils.isValidConf(conf);
as you add things to the config to find which config is causing the issues.  
Sadly the code does not provide any information about what it didn't like about 
the config.
You can look at the code for the validity check

https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L476-L502
It should not be too hard to modify it to at least log which entries are 
different when it does the check.

- Bobby


On Thursday, July 20, 2017, 2:44:48 AM CDT, kalliopi kalantzaki 
<tsantalos-spa...@yahoo.gr> wrote:

Hello,
I am trying to pass to storm configuration an object (not priminitive type), 
the AESEncryption.class. I created a custom kryo serializer the AESSerializer 
class and in main topology i used: 
Config conf=new Config();conf.registerSerialization(AESEncryption.class, 
AESSerializer.class);conf.setFallBackOnJavaSerialization(false);
LocalCluster cluster = new 
LocalCluster();cluster.submitTopology("stormKafkaTopology", conf, 
builder.createTopology());
public class AESEncryption{

    public static final String cypherAlgorithm = "AES/CBC/PKCS5Padding";
    private SecretKey privateKey;.
    private IvParameterSpec entrophy;
    private String encryptedMsg;
    public static final int bitLength = 256;
... setters getterss methods}

public class AESSerilizer extends Serializer<AESEncryption> implements 
Serializable {

    public AESSerilizer() {
    }

    @Override
    public void write(Kryo kryo, Output output, AESEncryption object) {


        kryo.writeClassAndObject(output, object);

    }

    @Override
    public AESEncryption read(Kryo kryo, Input input, Class<AESEncryption> 
type) {
        return (AESEncryption) kryo.readClassAndObject(input);
    }

}
However, even using kryo serialization i always get the same 
exception....Problem initializing cluster topology:
java.lang.IllegalArgumentException: Topology conf is not json-serializable
    at org.apache.storm.testing$submit_local_topology.invoke(testing.clj:308) 
~[storm-core-1.1.0.jar:1.1.0]
    at 
org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:49) 
~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.LocalCluster.submitTopology(Unknown Source) 
~[storm-core-1.1.0.jar:1.1.0]
    at 
com.jrtechnologies.ifg.bsp.kafka.storm.StormKafkaStarter.main(StormKafkaStarter.java:117)
 [classes/:?]

Any ideas what am i doing wrong? 


   

   

Reply via email to