*Code : *

public static void main(String[] args) throws InterruptedException {
   //Ignition.setClientMode(true);

    try (Ignite ignite = Ignition.start("ignite.xml")) {
      System.out.println();
      System.out.println(">>> Cache query example started.");
      CacheConfiguration<String, AllEventsAttributes> kafkaCache = new
CacheConfiguration<>(UA_Cache);

      kafkaCache.setCacheMode(CacheMode.PARTITIONED);
     
kafkaCache.setIndexedTypes(AffinityKey.class,AllEventsAttributes.class);

      KafkaStreamer<String, AllEventsAttributes> kafkaStreamer = new
KafkaStreamer<>();
      ignite.getOrCreateCache(kafkaCache);

      IgniteDataStreamer<String, AllEventsAttributes> stmr
=Ignition.ignite().dataStreamer(UA_Cache);
      stmr.allowOverwrite(true);
      kafkaStreamer.setIgnite(ignite);
      kafkaStreamer.setStreamer(stmr);

      List<String> topics=  new ArrayList<String>();
      topics.add("allEvents");
      // set the topic
      kafkaStreamer.setTopic(topics);
      // set the number of threads to process Kafka streams
      kafkaStreamer.setThreads(20);



      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
      props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
      props.put("group.id", "allEvents");
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
      kafkaStreamer.setConsumerConfig(props);
      final CountDownLatch latch = new CountDownLatch(40);
      
      kafkaStreamer.setMultipleTupleExtractor(

              record ->{
                Map<String, AllEventsAttributes> entries = new HashMap<>();
                try{
                  ObjectMapper mapper = new ObjectMapper();

                  AllEvents allEvents =
mapper.readValue(record.value().toString(), AllEvents.class);

                  if(!(allEvents.UserId.equals("0")) &&
!(allEvents.UserId.equals("")) && !allEvents.UserId.isEmpty()){
                    AllEventsAttributes allEventsAttributes = new
AllEventsAttributes(allEvents.UserId,
allEvents.RecUpdatedAt,allEvents.RecUpdatedAt);
                   
entries.put(allEventsAttributes.UserId,allEventsAttributes);
                   
                  }
               
/*                  String val = record.value().toString();
                 
                }catch (Exception ex) {
                  System.out.println("Unexpected error." + ex);
                }
                return entries;
              }
      );

      kafkaStreamer.start();
      System.out.println("Kafka streamer started!");
      latch.await();
    }
  }
}




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to