*Code : * public static void main(String[] args) throws InterruptedException { //Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("ignite.xml")) { System.out.println(); System.out.println(">>> Cache query example started."); CacheConfiguration<String, AllEventsAttributes> kafkaCache = new CacheConfiguration<>(UA_Cache); kafkaCache.setCacheMode(CacheMode.PARTITIONED); kafkaCache.setIndexedTypes(AffinityKey.class,AllEventsAttributes.class); KafkaStreamer<String, AllEventsAttributes> kafkaStreamer = new KafkaStreamer<>(); ignite.getOrCreateCache(kafkaCache); IgniteDataStreamer<String, AllEventsAttributes> stmr =Ignition.ignite().dataStreamer(UA_Cache); stmr.allowOverwrite(true); kafkaStreamer.setIgnite(ignite); kafkaStreamer.setStreamer(stmr); List<String> topics= new ArrayList<String>(); topics.add("allEvents"); // set the topic kafkaStreamer.setTopic(topics); // set the number of threads to process Kafka streams kafkaStreamer.setThreads(20); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "allEvents"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); kafkaStreamer.setConsumerConfig(props); final CountDownLatch latch = new CountDownLatch(40); kafkaStreamer.setMultipleTupleExtractor( record ->{ Map<String, AllEventsAttributes> entries = new HashMap<>(); try{ ObjectMapper mapper = new ObjectMapper(); AllEvents allEvents = mapper.readValue(record.value().toString(), AllEvents.class); if(!(allEvents.UserId.equals("0")) && !(allEvents.UserId.equals("")) && !allEvents.UserId.isEmpty()){ AllEventsAttributes allEventsAttributes = new AllEventsAttributes(allEvents.UserId, allEvents.RecUpdatedAt,allEvents.RecUpdatedAt); entries.put(allEventsAttributes.UserId,allEventsAttributes); } /* String val = record.value().toString(); }catch (Exception ex) { System.out.println("Unexpected error." + ex); } return entries; } ); kafkaStreamer.start(); System.out.println("Kafka streamer started!"); latch.await(); } } } -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/