Hi,

I try to use RabbitMQ as a Source. My source consumes messages of the queue but 
the statefun is not execution - not even created.

This is my main function:

1 public static void main(String[] args) throws Exception {
2
3     final var env = StreamExecutionEnvironment.getExecutionEnvironment();
4
5     env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class);
6
7     env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
8     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
9     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
10     
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
11
12     final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
13     statefunConfig.setFlinkJobName("test");
14     statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
15
16     final var connectionConfig = new RMQConnectionConfig.Builder()
17             .setHost("localhost")
18             .setUserName("guest")
19             .setPassword("guest")
20             .setPort(5672)
21             .setVirtualHost("test")
22             .setPrefetchCount(5000)
23             .build();
24
25     final var deserializationSchema = new 
TypeInformationSerializationSchema<>(
26             new ProtobufTypeInformation<>(Any.class), env.getConfig());
27     final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, 
true, deserializationSchema);
28
29     final var source = env
30             .addSource(rmqSource, TEST_INGRESS)
31             .setParallelism(1)
32             .map(msg -> {
33                 return RoutableMessageBuilder
34                     .builder()
35                     .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())
36                     .withMessageBody(msg)
37                     .build();
38             });
39
40     StatefulFunctionDataStreamBuilder
41             .builder("test")
42             .withDataStreamAsIngress(source)
43             .withFunctionProvider(MyStatefun.TYPE, unused -> {
44                 return new MyStatefun();
45             })
46             .withEgressId(MyStatefun.EGRESS)
47             .withConfiguration(statefunConfig)
48             .build(env)
49             .getDataStreamForEgressId(MyStatefun.EGRESS)
50             .addSink(new PrintSinkFunction<>(true));
51
52     env.execute();
53
54 }

A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 
is never called. The message is reportingly consumed but never acknowledged or 
processed. Before using RabbitMQ I used a custom SourceFunction to fake input 
data and it worked well.

To setup things I use a local environment but logging does not show up any 
errors. Before my current problem I had another error during message 
deserialization and it wasn't reported either. Unfortunately I didn't manage to 
get the exception in the log/stdout. I had to use the debugger to find the 
reason of the former problem. In this situation now the debugger shows no 
thrown or caught exceptions. That's way I stuck.

Of course I would like to know what's the problem with my code. But I guess it 
is not obviously. Maybe some can give me a hint how to turn on exception 
logging which might help to get closer to the origin of the phenomenon.

Thanks in advance,
Stephan

Reply via email to