Good morning all.

I'm VERY new to camel so I'm still trying to get a grip on all the various 
components so please bear with me.

I'm using quarkus/camel and have a route where I pull a message off of a kafka 
topic (this part works perfectly btw) but then I want to send the string on to 
a REST service and based on the response I get back from the service (i.e. 200 
- Ok - Go on to next message, 400 - Bad Request - throw message in an error 
queue or 503 - Service unavailable - Wait x amount of time and do y amounts of 
retries before stopping the route completely).

My first attempt was to do all the REST calls in a .proccess java class.

But I now saw that you can do a .to(bean:xxx) and basically also call a java 
class to do all the required code etc.

So my question is: what is the more "correct" way to do this (especially with 
regards to getting application.properties values to the java class and then 
sending/handeling the responses from the REST service. Would it be better to do 
all the error/wait handeling in the java class or rather build it into the 
route itself (with .errorHandler etc?)

Here is my current working route code:

@ApplicationScoped
public class EnrollementEventRoute extends RouteBuilder {
    private EnrollmentEventRestSender eers;

    @ConfigProperty(name = "kafka.topic.academia.registration")
    String registrationTopicName;

    @ConfigProperty(name = "kafka.academia.broker")
    String kafkaBroker;

    @ConfigProperty(name = "kafka.academia.config.clientId")
    String kafkaClientId;

    @ConfigProperty(name = "kafka.academia.registration.autoOffsetReset", 
defaultValue = "latest")
    String offset;

    @ConfigProperty(name = "kafka.academia.config.groupId")
    String groupId;

    @ConfigProperty(name = "kafka.academia.config.keyDeserializer")
    String keyDeserializer;

    @ConfigProperty(name = "kafka.academia.config.valueDeserializer")
    String valueDeserializer;

    @ConfigProperty(name = "fms.registration.restservice.endpoint")
    String restEndpoint;

    @Override
    public void configure() throws Exception {
        eers = new EnrollmentEventRestSender(restEndpoint);
        from(kafka(registrationTopicName)
                .brokers(kafkaBroker)
                .clientId(kafkaClientId)
                .groupId(groupId)
                .keyDeserializer(keyDeserializer)
                .valueDeserializer(valueDeserializer)
                .autoOffsetReset(offset))
                .log("Registration Event received: ${body}")
                .process(eers);

    }

And then here is the code in the EnrollmentEventRestSender class:

@ApplicationScoped
public class EnrollmentEventRestSender implements Processor {
  private String restEndpoint;

    public EnrollmentEventRestSender() {  //Dummy constructor needed.

    };

    public EnrollmentEventRestSender(String url) {
      this.restEndpoint = url;
    }



    @Override
    public void process(Exchange exchange) throws Exception {
        try {
          CloseableHttpClient client = HttpClients.createDefault();
          System.out.println("Got endpoint of: " + restEndpoint);
          HttpPost httpPost = new HttpPost(restEndpoint);
          String json = (String) exchange.getIn().getBody();
          System.out.println("Got JSON in Exchange: " + json);
          StringEntity entity = new StringEntity(json);
          httpPost.setEntity(entity);
         // httpPost.setHeader("Accept", "application/json");
          httpPost.setHeader("Content-type", "text/plain; charset=utf-8");
          CloseableHttpResponse response = client.execute(httpPost);
          System.out.println("Got Response of: " + 
response.getStatusLine().getStatusCode());
          if (!(response.getStatusLine().getStatusCode()==200)) { // Something 
wrong
            InputStream is = response.getEntity().getContent();
            BufferedReader rd = new BufferedReader(new InputStreamReader(is));
            StringBuilder errReply = new StringBuilder();
            String responseLine = null;
            while ((responseLine = rd.readLine()) != null) {
               errReply.append(responseLine.trim());
            }
            rd.close();
            is.close();
            System.out.println(errReply);
          }
          client.close();
        }
        catch (Exception ex ) {
          ex.printStackTrace();
        }
    }

}
[https://www.sun.ac.za/productionfooter/email/ProductionFooter.jpg]<https://www.sun.ac.za/english/about-us/strategic-documents>

The integrity and confidentiality of this email are governed by these terms. 
Disclaimer<https://www.sun.ac.za/emaildisclaimer/default.aspx>
Die integriteit en vertroulikheid van hierdie e-pos word deur die volgende 
bepalings bereƫl. 
Vrywaringsklousule<https://www.sun.ac.za/emaildisclaimer/default.aspx>

Reply via email to