Hello, All.  I am trying to create an example where I can receive UDP
packets with the Netty4 component, but it isn't working.  I'm sending UDP
packets with a DatagramSocket to localhost, and I have a camel route
consuming from netty4:udp//localhost on the port the DatagramSocket sent
to.  Here's the code:

public class UdpWithCamelService implements BundleActivator {
    private static final Logger LOGGER =
LoggerFactory.getLogger(UdpWithCamelService.class);
    public static final int SERVER_PORT = 22222;
    public static final int RECEIVER_PORT = 22223;
    private OsgiDefaultCamelContext camelContext;
    private RouteBuilder routeBuilder;
    private ScheduledExecutorService scheduler;
    private UdpSender sender;

    public UdpWithCamelService() throws Exception {
        scheduler = Executors.newSingleThreadScheduledExecutor();
        sender = new UdpSender();
    }

    @Override
    public void start(BundleContext context) throws Exception {
        camelContext = new OsgiDefaultCamelContext(context);
        camelContext.start();
        routeBuilder = new CamelUdpReceiverRouteBuilder();
        camelContext.addRoutes(routeBuilder);
        final Runnable senderTask = () -> {
            try {
                sender.sendTimestamp();
                LOGGER.warn("Sent timestamp via UDP");
            } catch (Exception e) {
                LOGGER.error("Problem sending timestamp via UDP", e);
            }
        };
        scheduler.scheduleAtFixedRate(senderTask, 1, 1, TimeUnit.SECONDS);
    }

    @Override
    public void stop(BundleContext context) throws Exception {
        scheduler.shutdown();
        sender.stop();
        
camelContext.removeRouteDefinitions(routeBuilder.getRouteCollection().getRoutes());
        camelContext = null;
    }
}

public class UdpSender {
    private final DatagramSocket socket;

    public UdpSender() throws Exception {
        socket = new DatagramSocket(UdpWithCamelService.SERVER_PORT);
    }

    public void stop() {
        socket.disconnect();
        socket.close();
    }

    public void sendTimestamp() throws Exception {
        String timestamp = String.valueOf(System.currentTimeMillis());
        socket.send(
                new DatagramPacket(
                        timestamp.getBytes(),
timestamp.getBytes().length, InetAddress.getLocalHost(),
UdpWithCamelService.RECEIVER_PORT
                )
        );
    }
}

public class CamelUdpReceiverRouteBuilder extends RouteBuilder {
    private static final Logger LOGGER =
LoggerFactory.getLogger(CamelUdpReceiverRouteBuilder.class);

    @Override
    public void configure() throws Exception {
        from(String.format("netty4:udp://localhost:%d?sync=false&textline=true",
UdpWithCamelService.RECEIVER_PORT))
                .process(exchange -> {
                    String body = exchange.getIn().getBody(String.class);
                    LOGGER.warn("UDP Packet Received: " + body);
                });
    }
}

Thanks in advance,

Steve

Reply via email to