Here is some code that I used. I made the Kafka Ignite Streamer as a service
in Ignite and deploy it as a cluster singleton.

public class IgniteKafkaStreamerService implements Service {

        private static final long serialVersionUID = 1L;
        
        @IgniteInstanceResource
        private Ignite ignite;
        private KafkaStreamer<String, String> kafkaStreamer = new
KafkaStreamer<>();
        private IgniteLogger logger;

        @Override
        public void init(ServiceContext ctx) throws Exception {
                logger = ignite.log();
                IgniteDataStreamer<String, String> stmr = 
ignite.dataStreamer(CACHE_NAME);
                stmr.allowOverwrite(true);
                stmr.autoFlushFrequency(1000);
                
                kafkaStreamer.setIgnite(ignite);
                kafkaStreamer.setStreamer(stmr);
                kafkaStreamer.setThreads(4);
                kafkaStreamer.setTopic(KAFKA_TOPIC);
                Properties kafkaProps = 
Util.loadProperties("config/kafka.properties"); //
Some code to read in the kafka properties
                kafkaStreamer.setConsumerConfig(new ConsumerConfig(kafkaProps));
                kafkaStreamer.setSingleTupleExtractor(msg -> new
AbstractMap.SimpleEntry<String, String>(new String(msg.key()), new
String(msg.message())));
        }

        @Override
        public void execute(ServiceContext ctx) throws Exception {
                kafkaStreamer.start();
                logger.info("KafkaStreamer started.");
        }
        
        @Override
        public void cancel(ServiceContext ctx) {
                kafkaStreamer.stop();
                logger.info("KafkaStreamer stopped.");
        }

}

Below the code to startup:

public class IgniteNodeStartup {

        public static void main(String[] args) {
                // Use to start up an Ignite server with default configuration
                Ignite ignite = Ignition.start();
                ignite.getOrCreateCache(getCacheConfiguration());
                // Deploy data streamer service on the server nodes.
                ClusterGroup forServers = ignite.cluster().forServers();
                
ignite.services(forServers).deployClusterSingleton("KafkaService", new
IgniteKafkaStreamerService());
        }
        
        private static CacheConfiguration<String, String> 
getCacheConfiguration() {
                CacheConfiguration<String, String> cfg = new 
CacheConfiguration<String,
String>();
                cfg.setName(CACHE_NAME);
                cfg.setBackups(1);
                return cfg;
        }
}

I hope this helps.

Humphrey



--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-tp12649p12836.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to