Hi,
Why do i receiving an acknowledged messages from PulsarSource?
When i startup the below program (*1), I receiving same messages from
PulsarSource every time.
On the other hand, programs using PulsarClient will never receive acknowledged
messages.(*2)
How can i receive only unacknowledged messages with PulsarSource?
---(*1)---
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setParallelism(1);
env.enableCheckpointing(1000);
CheckpointConfig cc = env.getCheckpointConfig();
boolean ck = cc.isCheckpointingEnabled();
PulsarSource<Person> source = PulsarSource.builder()
.setServiceUrl("pulsar://192.168.1.10:6650")
.setStartCursor(StartCursor.earliest())
.setTopics("persistent://public/default/my-topic")
.setSubscriptionName("my-subscription")
.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
.setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1000)
.setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_TIME, 2000L)
.setDeserializationSchema(Schema.JSON(Person.class), Person.class)
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar
Source")
.addSink(new SinkFunction<Person>() {
@Override
public void invoke(Person value,
Context context) throws Exception {
LOG.info(value.Name);
}
});
env.execute("sample");
}
----------
---(*2)---
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://192.168.1.10:6650")
.build();
Consumer<String> consumer = client
.newConsumer(Schema.STRING)
.subscriptionName("my-subscription")
.topic("persistent://public/default/my-topic")
.subscribe();
while (true) {
Message<String> msg = consumer.receive();
System.out.println("Received message: " +
msg.getValue());
consumer.acknowledge(msg);
}
----------
Java18
Run on Eclipse
Flink 1.18.1
flink-connector-pulsar 4.1.0-1.18.0
Pulsar 3.1.2 standalone on Ubuntu VM (192.168.1.10:6650)
Best regards.
Tatsu