Camel: detect Kafka not available

2017-11-22 Thread Yacov Schondorf
I am trying to detect when Kafka is not available. I have modified the
example -

https://github.com/apache/camel/blob/master/examples/camel-example-kafka/src/main/java/org/apache/camel/example/kafka/MessageConsumerClient.java
and
added following code right after camelContext.start()



final Collection endpoints = camelContext.getEndpoints();

for (Endpoint endpoint : endpoints) {

if (endpoint instanceof DefaultEndpoint) {

final DefaultEndpoint endpoint1 = (DefaultEndpoint)
endpoint;

endpoint1.setBridgeErrorHandler(true);

final HashMap consumerProperties = new
HashMap<>();

consumerProperties.put("backoffMultiplier", 10);

consumerProperties.put("backoffErrorThreshold", 5);

endpoint1.setConsumerProperties(consumerProperties);

}

}



I ran the main() and hoped to see the consumer stopping the attempts to
connect to Kafka after 5 tries, but this did not work. I keep getting
output messages of “Connection to node -1 could not be established. Broker
may not be available.”

Is this the right way to go? What am I doing wrong?


Thanks.


Re: Camel: detect Kafka not available

2017-11-28 Thread Yacov Schondorf
But this is exactly my point - there is no stack trace! I want there to be
a stacktrace so that I could catch it using the regular error handler. This
is the purpose of the call to endpoint1.setBridgeErrorHandler(true);
However, the call does not work, no trace is printed and the polling
continues. Here is the complete code based on https://github.com/apache/
camel/blob/master/examples/camel-example-kafka/src/main/
java/org/apache/camel/example/kafka/MessageConsumerClient.java with my
addition between the *// change start *and *// change end *blocks:

package org.apache.camel.example.kafka;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;

public final class MessageConsumerClient {

private static final Logger LOG =
LoggerFactory.getLogger(MessageConsumerClient.class);

private MessageConsumerClient() {
}

public static void main(String[] args) throws Exception {

LOG.info("About to run Kafka-camel integration...");

CamelContext camelContext = new DefaultCamelContext();

// Add route to send messages to Kafka

camelContext.addRoutes(new RouteBuilder() {
public void configure() {
PropertiesComponent pc =
getContext().getComponent("properties", PropertiesComponent.class);
pc.setLocation("classpath:application.properties");

log.info("About to start route: Kafka Server -> Log ");
onException(Exception.class).process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception
{
System.out.println("Exception occurred!!");
}
});

from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}"
+ "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}"
+ "&groupId={{consumer.group}}")
.routeId("FromKafka")
.log("${body}");
}
});

camelContext.start();

//
// change start
//
final Collection endpoints = camelContext.getEndpoints();
for (Endpoint endpoint : endpoints) {
if (endpoint instanceof DefaultEndpoint) {
final DefaultEndpoint endpoint1 = (DefaultEndpoint)
endpoint;
endpoint1.setBridgeErrorHandler(true);
final HashMap consumerProperties = new
HashMap<>();
consumerProperties.put("backoffMultiplier", 10);
consumerProperties.put("backoffErrorThreshold", 5);
endpoint1.setConsumerProperties(consumerProperties);
}
}
//
// change end here
//

// let it run for 5 minutes before shutting down
Thread.sleep(5 * 60 * 1000);

camelContext.stop();
    }

}





2017-11-25 19:18 GMT+02:00 Claus Ibsen :

> Post the stactrace so we can see from where the error is thrown.
>
>
> On Wed, Nov 22, 2017 at 11:01 AM, Yacov Schondorf
>  wrote:
> > I am trying to detect when Kafka is not available. I have modified the
> > example -
> >
> > https://github.com/apache/camel/blob/master/examples/
> camel-example-kafka/src/main/java/org/apache/camel/example/
> kafka/MessageConsumerClient.java
> > and
> > added following code right after camelContext.start()
> >
> >
> >
> > final Collection endpoints =
> camelContext.getEndpoints();
> >
> > for (Endpoint endpoint : endpoints) {
> >
> > if (endpoint instanceof DefaultEndpoint) {
> >
> > final DefaultEndpoint endpoint1 = (DefaultEndpoint)
> > endpoint;
> >
> > endpoint1.setBridgeErrorHandler(true);
> >
> > final HashMap consumerProperties = new
> > HashMap<>();
> >
> > consumerProperties.put("backoffMultiplier", 10);
> >
> > consumerProperties.put("backoffErrorThreshold", 5);
> >
> > endpoint1.setConsumerPrope

Re: Camel: detect Kafka not available

2017-12-03 Thread Yacov Schondorf
No solution by Camel for detecting connection errors? I gave a very clear
reproducible scenario...

2017-11-28 11:44 GMT+02:00 Yacov Schondorf :

> But this is exactly my point - there is no stack trace! I want there to be
> a stacktrace so that I could catch it using the regular error handler. This
> is the purpose of the call to endpoint1.setBridgeErrorHandler(true);
> However, the call does not work, no trace is printed and the polling
> continues. Here is the complete code based on https://github.com/apache/c
> amel/blob/master/examples/camel-example-kafka/src/main/java/
> org/apache/camel/example/kafka/MessageConsumerClient.java with my
> addition between the *// change start *and *// change end *blocks:
>
> package org.apache.camel.example.kafka;
>
> import org.apache.camel.CamelContext;
> import org.apache.camel.Endpoint;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.kafka.KafkaEndpoint;
> import org.apache.camel.component.properties.PropertiesComponent;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.camel.impl.DefaultEndpoint;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.util.Collection;
> import java.util.HashMap;
> import java.util.Iterator;
>
> public final class MessageConsumerClient {
>
> private static final Logger LOG = LoggerFactory.getLogger(
> MessageConsumerClient.class);
>
> private MessageConsumerClient() {
> }
>
> public static void main(String[] args) throws Exception {
>
> LOG.info("About to run Kafka-camel integration...");
>
> CamelContext camelContext = new DefaultCamelContext();
>
> // Add route to send messages to Kafka
>
> camelContext.addRoutes(new RouteBuilder() {
> public void configure() {
> PropertiesComponent pc = 
> getContext().getComponent("properties",
> PropertiesComponent.class);
> pc.setLocation("classpath:application.properties");
>
> log.info("About to start route: Kafka Server -> Log ");
> onException(Exception.class).process(new Processor() {
> @Override
> public void process(Exchange exchange) throws
> Exception {
> System.out.println("Exception occurred!!");
> }
> });
> from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{
> kafka.port}}"
> + "&maxPollRecords={{consumer.maxPollRecords}}"
> + "&consumersCount={{consumer.consumersCount}}"
> + "&seekTo={{consumer.seekTo}}"
> + "&groupId={{consumer.group}}")
> .routeId("FromKafka")
> .log("${body}");
> }
> });
>
> camelContext.start();
>
> //
> // change start
> //
> final Collection endpoints = camelContext.getEndpoints();
> for (Endpoint endpoint : endpoints) {
> if (endpoint instanceof DefaultEndpoint) {
> final DefaultEndpoint endpoint1 = (DefaultEndpoint)
> endpoint;
> endpoint1.setBridgeErrorHandler(true);
> final HashMap consumerProperties = new
> HashMap<>();
> consumerProperties.put("backoffMultiplier", 10);
> consumerProperties.put("backoffErrorThreshold", 5);
> endpoint1.setConsumerProperties(consumerProperties);
> }
> }
> //
> // change end here
> //
>
> // let it run for 5 minutes before shutting down
> Thread.sleep(5 * 60 * 1000);
>
> camelContext.stop();
> }
>
> }
>
>
>
>
>
> 2017-11-25 19:18 GMT+02:00 Claus Ibsen :
>
>> Post the stactrace so we can see from where the error is thrown.
>>
>>
>> On Wed, Nov 22, 2017 at 11:01 AM, Yacov Schondorf
>>  wrote:
>> > I am trying to detect when Kafka is not available. I have modified the
>> > example -
>> >
>> > https://github.com/apache/camel/blob/master/examples/camel-
>> example-kafka/src/main/java/org/apache/camel/example/kafka
>> /MessageConsumerClient.java
>> > and
>> > added following code right after camelContext.start()
>> >
>> >
>> >
>> >

Re: Camel: detect Kafka not available

2018-01-07 Thread Yacov Schondorf
Just for the record, I have tried adding "bridgeErrorHandler=true" to the
route . This does not help and Camel still tries to connect to the
non-existing Kafka. Final route looks like this:


from("kafka:{{consumer.topic}}?bridgeErrorHandler=true&brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}"
+ "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}"
+ "&groupId={{consumer.group}}")

I was expecting an exception to be thrown but this does not happen. This is
based on https://github.com/apache/camel/blob/master/examples/camel-e
xample-kafka/src/main/java/org/apache/camel/example/kafka/
MessageConsumerClient.java with just an additional
"bridgeErrorHandler=true" added to the route.


2017-12-03 14:54 GMT+02:00 Yacov Schondorf :

> No solution by Camel for detecting connection errors? I gave a very clear
> reproducible scenario...
>
> 2017-11-28 11:44 GMT+02:00 Yacov Schondorf :
>
>> But this is exactly my point - there is no stack trace! I want there to
>> be a stacktrace so that I could catch it using the regular error handler.
>> This is the purpose of the call to endpoint1.setBridgeErrorHandler(true);
>> However, the call does not work, no trace is printed and the polling
>> continues. Here is the complete code based on https://github.com/apache/c
>> amel/blob/master/examples/camel-example-kafka/src/main/java/
>> org/apache/camel/example/kafka/MessageConsumerClient.java with my
>> addition between the *// change start *and *// change end *blocks:
>>
>> package org.apache.camel.example.kafka;
>>
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.Endpoint;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.kafka.KafkaEndpoint;
>> import org.apache.camel.component.properties.PropertiesComponent;
>> import org.apache.camel.impl.DefaultCamelContext;
>> import org.apache.camel.impl.DefaultEndpoint;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.util.Collection;
>> import java.util.HashMap;
>> import java.util.Iterator;
>>
>> public final class MessageConsumerClient {
>>
>> private static final Logger LOG = LoggerFactory.getLogger(Messag
>> eConsumerClient.class);
>>
>> private MessageConsumerClient() {
>> }
>>
>> public static void main(String[] args) throws Exception {
>>
>> LOG.info("About to run Kafka-camel integration...");
>>
>> CamelContext camelContext = new DefaultCamelContext();
>>
>> // Add route to send messages to Kafka
>>
>> camelContext.addRoutes(new RouteBuilder() {
>> public void configure() {
>> PropertiesComponent pc = 
>> getContext().getComponent("properties",
>> PropertiesComponent.class);
>> pc.setLocation("classpath:application.properties");
>>
>> log.info("About to start route: Kafka Server -> Log ");
>> onException(Exception.class).process(new Processor() {
>> @Override
>> public void process(Exchange exchange) throws
>> Exception {
>> System.out.println("Exception occurred!!");
>> }
>> });
>> from("kafka:{{consumer.topic}}
>> ?brokers={{kafka.host}}:{{kafka.port}}"
>> + "&maxPollRecords={{consumer.maxPollRecords}}"
>> + "&consumersCount={{consumer.consumersCount}}"
>> + "&seekTo={{consumer.seekTo}}"
>> + "&groupId={{consumer.group}}")
>> .routeId("FromKafka")
>> .log("${body}");
>> }
>> });
>>
>> camelContext.start();
>>
>> //
>> // change start
>> //
>> final Collection endpoints =
>> camelContext.getEndpoints();
>> for (Endpoint endpoint : endpoints) {
>> if (endpoint instanceof DefaultEndpoint) {
>> final DefaultEndpoint endpoint1 = (DefaultEndpoint)
>> endpoint;
>> end

mocking future routes

2018-01-17 Thread Yacov Schondorf
Hi,
I know how to mock/advise existing routes. But what do I do if one of my
routes creates new routes? For example, I can send a configuration message
to a configuration route which has an activeMQ From endpoint. The endpoint
is advised and replaced with direct:a, so I send a configuration message in
my test. The route continues by creating a new route from
activeMQ:someTopic to kafka:someTopic. This route fails to create because
there is no real ssl-enabled kafka and activeMQ in the test environment. Is
there anything that can be done?


Unable to view message body

2018-01-28 Thread Yacov Schondorf
Hi,
I am trying to log the contents of a message using the log component.
Following is my route:

from("jetty://http://localhost:8099/myApi
").noStreamCaching().to("log:myCategory?level=DEBUG&showBody=true&showHeaders=true");

Although I used noStreamCaching() in the route, and specified
showBody=true, I do not see the message body. Instead, I see:
Body: [Body is instance of org.apache.camel.StreamCache]

I run following commands from Firefox development console in order to test
the route:

var script = document.createElement('script');
script.type = 'text/javascript';
script.src = '
https://ajax.googleapis.com/ajax/libs/jquery/3.3.1/jquery.min.js'
document.head.appendChild(script);
$.ajax({
url: "http://localhost:8099/myApi";,
headers: {
"Content-Type":"application/json"
},
method: "POST",
dataType: "json",
data: "{\
'foo': 'bar'\
}"
})

Any ideas what I am doing wrong?


exceptionMessage() not working properly

2018-02-01 Thread Yacov Schondorf
I have the following global onException clause in my RouteBuilder configure
method:

onException(BadHeaderException.class)
.handled(true)
.transform(constant("Bad API request: " + exceptionMessage()))
.setHeader(Exchange.HTTP_RESPONSE_CODE, constant(400))

The route is as follows:

from(String.format("jetty://http://0.0.0.0:%d%s",port, KEEP_ALIVE_URI))
.process(new MyRequiredHeaderProcessor())
.log("My API called")
.setBody(simple(KEEP_ALIVE_SUCCESS_MESSAGE));

and  MyRequiredHeaderProcessor may throw BadHeaderException:

class MyRequiredHeaderProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
final Object myHeader = exchange.getIn().getHeader("MyRequiredHeader");
if (myHeader == null) {
throw new BadHeaderException ("Missing header");
}
if (!myHeader .equals(someValue)) {
throw new BadHeaderException("Incorrect  header value");
}
}
}

When I call the jetty URL without the required header, the request is
rejected with error code 400 - as expected - BUT the error message is:
 
Bad API request: simple{${exception.message}}

Shouldn't exceptionMessage() return the text I set in
MyRequiredHeaderProcessor?


Re: exceptionMessage() not working properly

2018-02-04 Thread Yacov Schondorf
Thank you! This worked for me:

onException(BadHeaderException.class)
 .handled(true)
* .transform(simple("Bad API request:  ${exception.message}"))*
 .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(400))


2018-02-02 11:33 GMT+02:00 Claus Ibsen :

> Hi
>
> The configure method in the RouteBuilder is only invoked once to setup
> the routes.
> And constant is creating a constant message.
>
> So what you need to do is to use simple instead to tell Camel to build
> a dynamic message with the exception message
>
> See the docs at: http://camel.apache.org/simple
>
>
>
> On Thu, Feb 1, 2018 at 8:43 PM, Yacov Schondorf
>  wrote:
> > I have the following global onException clause in my RouteBuilder
> configure
> > method:
> >
> > onException(BadHeaderException.class)
> > .handled(true)
> > .transform(constant("Bad API request: " + exceptionMessage()))
> > .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(400))
> >
> > The route is as follows:
> >
> > from(String.format("jetty://http://0.0.0.0:%d%s",port, KEEP_ALIVE_URI))
> > .process(new MyRequiredHeaderProcessor())
> > .log("My API called")
> > .setBody(simple(KEEP_ALIVE_SUCCESS_MESSAGE));
> >
> > and  MyRequiredHeaderProcessor may throw BadHeaderException:
> >
> > class MyRequiredHeaderProcessor implements Processor {
> > @Override
> > public void process(Exchange exchange) throws Exception {
> > final Object myHeader = exchange.getIn().getHeader("MyRequiredHeader");
> > if (myHeader == null) {
> > throw new BadHeaderException ("Missing header");
> > }
> > if (!myHeader .equals(someValue)) {
> > throw new BadHeaderException("Incorrect  header value");
> > }
> > }
> > }
> >
> > When I call the jetty URL without the required header, the request is
> > rejected with error code 400 - as expected - BUT the error message is:
> >  
> > Bad API request: simple{${exception.message}}
> >
> > Shouldn't exceptionMessage() return the text I set in
> > MyRequiredHeaderProcessor?
>
>
>
> --
> Claus Ibsen
> -
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>



-- 
יעקב שיינדורף
מנכ"ל ומנהל פיתוח,
מילות מפתח לאתרים
054-5750201
http://keywords-4-websites.com <http://keywords-4-websites.com/home.jsp>

יעקב שיינדורף
מנהל מוסיקלי StyleBrass
054-5750201
http://www.stylebrass.co.il


best approach to perform client certificate validation and restrict access to single IP

2018-03-19 Thread Yacov Schondorf
Hello,

I have a requirement to provide a service using Camel Jetty component and
SSL. I have the SSL working fine for server certificate validation by
clients.

The route basically looks like this:
from("jetty://https://0.0.0.0:thePort/theSecureAPI";).to(...)

The code that configures the SSL for the Jetty component looks like below
and works fine:

private void configureJettyComponentForSsl() {
KeyStoreParameters ksp = new KeyStoreParameters();
ksp.setResource(trustStorePath);
ksp.setPassword(trustStorePassword);
KeyManagersParameters kmp = new KeyManagersParameters();
kmp.setKeyStore(ksp);
kmp.setKeyPassword(keyPassword);
SSLContextParameters scp = new SSLContextParameters();
scp.setKeyManagers(kmp);
JettyHttpComponent jettyComponent =  getContext().getComponent("jetty",
JettyHttpComponent.class);
jettyComponent.setSslContextParameters(scp);
}

Now I need to add client certificate validation, and to restrict
connections to a particular IP. What would be the best approach?

Maybe using SslSocketConnectors like:

final HashMap portToConnectorMap = new HashMap<>();
portToConnectorMap.put(thePort, what-here?? )
jettyComponent.setSslSocketConnectors(portToConnectorMap);

Or, maybe I could accept the call into the route and use a processor that
would reject it if it does not come from the required IP? (how can I get
the client IP from within the route?) I would still need to perfor the
client certificate validation.

Can I use a spring security filter?

Any guidance would be welcome.


Re: mocking future routes

2018-04-24 Thread Yacov Schondorf
Hi,
So sorry for the late response; only now I was able to get back to this
issue.
I don't see how the stub component can help me here. This is a real route I
need to mock in a production code.
Consider this use case:
1. My route (let's call it route 1)  has a Jetty start endpoint, and is a
configuration service so that an incoming call would cause a configuration
change to the system. If the incoming call has some "flag" set to "true"
then a new route needs to be created (route 2). Route 2 has a Kafka
endpoint which I need to mock because the connection parameters are not
relevant at test time.
2. On my test, I advise route 1, and replace the Jetty endpoint with a
start endpoint, and send a message that should trigger creation of route 2.
Creation of route 2 fails because of the wrong Kafka parameters.

If I add "stub:" in route 2 ("Stub:kafka...") then in non-test time the
route will not work.

Any solution for this?


2018-02-01 18:58 GMT+02:00 Claus Ibsen :

> You can try when you create the new route, to add "stub:" as the
> prefix for your endpoints. Then you can use the stub component to mock
> them.
> http://camel.apache.org/stub
>
> On Wed, Jan 17, 2018 at 6:10 PM, Yacov Schondorf
>  wrote:
> > Hi,
> > I know how to mock/advise existing routes. But what do I do if one of my
> > routes creates new routes? For example, I can send a configuration
> message
> > to a configuration route which has an activeMQ From endpoint. The
> endpoint
> > is advised and replaced with direct:a, so I send a configuration message
> in
> > my test. The route continues by creating a new route from
> > activeMQ:someTopic to kafka:someTopic. This route fails to create because
> > there is no real ssl-enabled kafka and activeMQ in the test environment.
> Is
> > there anything that can be done?
>
>
>
> --
> Claus Ibsen
> -
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>



-- 
יעקב שיינדורף
מנכ"ל ומנהל פיתוח,
מילות מפתח לאתרים
054-5750201
http://keywords-4-websites.com <http://keywords-4-websites.com/home.jsp>

יעקב שיינדורף
מנהל מוסיקלי StyleBrass
054-5750201
http://www.stylebrass.co.il


encoding problem on Jetty consumer side

2018-06-18 Thread Yacov Schondorf
I have a route defined from a Jetty endpoint. I am trying to read a path
parameter and print its contents:

from("jetty:http://localhost:8099/my-service?httpMethodRestrict=post";)
.process(exchange -> {
String path = (String)
exchange.getIn().getHeader("path");
System.out.println("got path: " + path);
});
}

I am calling this via curl on Windows passing Hebrew characters:

curl http://localhost:8099/my-service -X post -d "path=שדגכ"

The output I am getting is:
path=

I tried
curl http://localhost:8099/my-service -X post -H "Content-Type: text/html;
charset=UTF-8" -d "path:שדגכ"
In this case the path is not added as a Header at all so the code above
produces null. I am able to see the path in the debug window
using exchange.getIn().getBody(String.class) but then i see that the result
is again path:

Any idea how I can get set a utf-8 encoding properly on Camel side and on
curl side?