Hello vbm,

I am working on the exact same problem. Did you find the solution for the
same.
I am using following code in my client application which will listen to
kafka connect (confluent).

I have one to one mapping for kafka topic and ignite cache. When there is an
insert into db, the kafka listener listens that and using gson library i am
converting json to object and the stmr.addData() works fine. But while
updating the value in db, i am facing marshller error.I tried to use
cache.put() method ,but it gives me cachewriteexception .


@KafkaListener(topics = { "kafka-Users" })
        public void listenUsers(String message) {
                logger.error(message);
                ObjectMapper mapper = new ObjectMapper();
                JsonNode rootNode;
                try {
                        rootNode = mapper.readTree(message);
                        Users user = new Users();
                        IgniteDataStreamer<Long, Users> stmr =
ignite.dataStreamer(IgniteProperties.USERS_CACHE.getName());
//                      stmr.allowOverwrite(true);

                        /*
                         * stmr.receiver(new StreamTransformer<Long, Users>() {
                         * 
                         * @Override public Object process(MutableEntry<Long, 
Users> entry,
Object...
                         * arguments) throws EntryProcessorException { return 
null; }
                         * 
                         * });
                         */

                        /*
                         * stmr.receiver(StreamTransformer.from((e, arg) -> { 
Users val =
e.getValue();
                         * System.out.println(val+" user from reciever 
$$$$$$$$$"); return null;
}));
                         */

                        Gson gson = new
GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE).create();
                        user = 
gson.fromJson(rootNode.get("payload").toString(), Users.class);
                        
stmr.addData(rootNode.get("payload").get("UsersKey").asLong(), user);
                        stmr.flush(); //
//                      stmr.allowOverwrite(true);
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }




can you please share your solution for the same.
Thanks,
Om Thacker



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to