> currently targeting mid February for the next major StateFun release

Thank you Gordon for sharing this information. As I could now see in the Flink 
Blog you bring out new versions frequently. I’m a newbie to Flink and I 
supposed a match of the docs of Statefun and Flink. But of course it is not a 
problem, I was just astonished.

Thanks,
Stephan


Von: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Gesendet: Dienstag, 12. Jänner 2021 10:07
An: Stephan Pelikan <stephan.peli...@phactum.at>
Cc: user@flink.apache.org
Betreff: Re: Statefun with RabbitMQ consumes message but does not run statefun

Hi,

There is no lock-step of releasing a new StateFun release when a new Flink 
release goes out. StateFun and Flink have individual releasing schemes and 
schedules.

Usually, for new major StateFun version releases, we will upgrade its Flink 
dependency to the latest available version.
We are currently targeting mid February for the next major StateFun release, 
which by then the Flink dependency will be upgraded to 1.12.x.
In the meantime, if you'd like to work against Flink 1.12.x with StateFun, you 
might have to resort to building the artifacts yourself.

Cheers,
Gordon

On Tue, Jan 12, 2021 at 3:57 PM Stephan Pelikan 
<stephan.peli...@phactum.at<mailto:stephan.peli...@phactum.at>> wrote:
I found the reason: There is a class incompatibility because I changed from
    Statefun 2.2.1 + Flink 1.11.1
to
    Statefun 2.2.1 + Flink 1.12.0

But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3.

Is there a possibility to use the newest version of Flink in combination with 
the newest version of Statefun? I’m wondering why there is no Statefun version 
matching the current stable version of Flink?

Stephan


Von: Stephan Pelikan 
<stephan.peli...@phactum.at<mailto:stephan.peli...@phactum.at>>
Gesendet: Montag, 11. Jänner 2021 19:37
An: user@flink.apache.org<mailto:user@flink.apache.org>
Betreff: Statefun with RabbitMQ consumes message but does not run statefun

Hi,

I try to use RabbitMQ as a Source. My source consumes messages of the queue but 
the statefun is not execution – not even created.

This is my main function:

1 public static void main(String[] args) throws Exception {
2
3     final var env = StreamExecutionEnvironment.getExecutionEnvironment();
4
5     env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class);
6
7     env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
8     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
9     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
10     
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
11
12     final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
13     statefunConfig.setFlinkJobName("test");
14     statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
15
16     final var connectionConfig = new RMQConnectionConfig.Builder()
17             .setHost("localhost")
18             .setUserName("guest")
19             .setPassword("guest")
20             .setPort(5672)
21             .setVirtualHost("test")
22             .setPrefetchCount(5000)
23             .build();
24
25     final var deserializationSchema = new 
TypeInformationSerializationSchema<>(
26             new ProtobufTypeInformation<>(Any.class), env.getConfig());
27     final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, 
true, deserializationSchema);
28
29     final var source = env
30             .addSource(rmqSource, TEST_INGRESS)
31             .setParallelism(1)
32             .map(msg -> {
33                 return RoutableMessageBuilder
34                     .builder()
35                     .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())
36                     .withMessageBody(msg)
37                     .build();
38             });
39
40     StatefulFunctionDataStreamBuilder
41             .builder("test")
42             .withDataStreamAsIngress(source)
43             .withFunctionProvider(MyStatefun.TYPE, unused -> {
44                 return new MyStatefun();
45             })
46             .withEgressId(MyStatefun.EGRESS)
47             .withConfiguration(statefunConfig)
48             .build(env)
49             .getDataStreamForEgressId(MyStatefun.EGRESS)
50             .addSink(new PrintSinkFunction<>(true));
51
52     env.execute();
53
54 }

A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 
is never called. The message is reportingly consumed but never acknowledged or 
processed. Before using RabbitMQ I used a custom SourceFunction to fake input 
data and it worked well.

To setup things I use a local environment but logging does not show up any 
errors. Before my current problem I had another error during message 
deserialization and it wasn’t reported either. Unfortunately I didn’t manage to 
get the exception in the log/stdout. I had to use the debugger to find the 
reason of the former problem. In this situation now the debugger shows no 
thrown or caught exceptions. That’s way I stuck.

Of course I would like to know what’s the problem with my code. But I guess it 
is not obviously. Maybe some can give me a hint how to turn on exception 
logging which might help to get closer to the origin of the phenomenon.

Thanks in advance,
Stephan

Reply via email to