Here's an example Kafka client:

package com.company.rt.services.data;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.company.models.Event;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 *
 */
public final class MessageManager {

    private Properties relaxedProducerProps;
    private ProducerConfig relaxedProducerConfig;
    private Producer<String,String> relaxedProducer;

    private Properties durableProducerProps;
    private ProducerConfig durableProducerConfig;
    private Producer<String,String> durableProducer;

    public MessageManager(Map map) {
        relaxedProducerProps = new Properties();
        relaxedProducerProps.put("metadata.broker.list",
map.get("topology.kafka.broker.list"));
        relaxedProducerProps.put("serializer.class",
"kafka.serializer.StringEncoder");
        relaxedProducerProps.put("key.serializer.class",
"kafka.serializer.StringEncoder");
        relaxedProducerProps.put("request.required.acks", "1");
        relaxedProducerProps.put("partitioner.class",
"com.company.rt.producers.utilities.partitioners.KafkaRoundRobinPartitioner");
        relaxedProducerConfig = new ProducerConfig(relaxedProducerProps);
        relaxedProducer = new Producer<>(relaxedProducerConfig);

        durableProducerProps = new Properties();
        durableProducerProps.put("metadata.broker.list",
map.get("topology.kafka.broker.list"));
        durableProducerProps.put("serializer.class",
"kafka.serializer.StringEncoder");
        durableProducerProps.put("key.serializer.class",
"kafka.serializer.StringEncoder");
        durableProducerProps.put("request.required.acks", "-1");
        durableProducerProps.put("partitioner.class",
"com.company.rt.producers.utilities.partitioners.KafkaRoundRobinPartitioner");
        durableProducerConfig = new ProducerConfig(durableProducerProps);
        durableProducer = new Producer<>(durableProducerConfig);
    }

    public void send(String topic, String message, boolean durable)
throws Exception {
        KeyedMessage<String,String> keyedMessage = new
KeyedMessage<>(topic,"key",message);
        if (durable) durableProducer.send(keyedMessage);
        else relaxedProducer.send(keyedMessage);
    }

    public void send(String topic, List<String> messages, boolean
durable) throws Exception {
        List<KeyedMessage<String, String>> keyedMessages = new ArrayList<>();
        for(String message : messages) {
            keyedMessages.add(new KeyedMessage<>(topic,"key",message));
        }
        if (durable) durableProducer.send(keyedMessages);
        else relaxedProducer.send(keyedMessages);
    }

    public void send(String topic, Event event, boolean durable)
throws Exception {
//        KeyedMessage<String, Event> keyedMessage = new
KeyedMessage<>(topic, "key", event);
//        if(durable) durableProducer.send(keyedMessage);
//        else relaxedProducer.send(keyedMessage);
    }

}


On Wed, Feb 24, 2016 at 3:00 PM, david kavanagh <david_...@hotmail.com>
wrote:

> Thanks for the reply guys.
>
> I have looked at kafka-node and it looks reasonable simple to integrate
> Node with Storm. My original problem was getting the KafkaBolt to work, so
> i can output the data to Kafka and then use kafka-node to consume the data
> to the Node app. I have had some help from this mailing list trying to get
> the KafkaBolt to work but there seems to be a java issue (nothing to with
> Storm) that i can't get to the bottom of. I am doing this as part of my
> final year project in college and i am brand new to all these technologies
> (apart from Java) so it is more than likely something i am doing wrong
> somewhere.
>
> I don't actually need to use Kafka. That just seemed like the best way to
> connect Storm to the app. I have just taken a quick look at Redis and it
> seems to be a better way of connecting the two. I will try to get that up
> and running tomorrow and i will post up how i get on. Thanks again, the
> advice is much appreciated.
>
> Regards,
> David
>
> ------------------------------
> Date: Wed, 24 Feb 2016 14:31:40 -0500
> Subject: Re: Connecting Storm Output to Node.js app
> From: br...@resolvingarchitecture.com
> To: user@storm.apache.org
>
>
> Have you tried https://www.npmjs.com/package/kafka-node
>
> On Wed, Feb 24, 2016 at 2:28 PM, Patrick Wiener <patrick.wie...@web.de>
> wrote:
>
> Have you considered using Redis?
>
> You could hook up Storm with Redis (e.g. through Jedis) and using Redis
> built in pub/sub mechanism for data retrieval from within Node.js
>
>
> Regards,
> Patrick
>
>
> Am 23.02.2016 um 19:47 schrieb david kavanagh <david_...@hotmail.com>:
>
> Hello,
>
> I currently have Storm up and running on two Ubuntu VM's. I have Storm
> pulling data from a database on a MySQL Cluster and now I am trying to
> figure out a way of getting Storm and a Node.js app on a remote VM
> communicating. I have tried to use a Kafka Topic to store the output
> from Storm so the app can connect to the topic and gather the data,
> but i cannot get the KafkaBolt to work as it should. I have tried
> everything
> i can think of but no luck. Is there another way of getting Storm to
> connect
> to a remote Node.js app? I have searched exhaustively online but cannot
> find
> anything. If there are some tutorials i could be pointed to that would be
> great.
> Any help at all would be greatly appreciated.
>
> Kind Regards
> David
>
>
>
>
>
> --
>
> Warm Regards,
> Brian Taylor
>
> Resolving Architecture .:.
>
> 330-812-7098
>
> br...@resolvingarchitecture.com
>
> http://resolvingarchitecture.com
>
> www.linkedin.com/in/javadevops/
>



-- 

Warm Regards,
Brian Taylor

Resolving Architecture .:.

330-812-7098

br...@resolvingarchitecture.com

http://resolvingarchitecture.com

www.linkedin.com/in/javadevops/

Reply via email to