jiangpengcheng commented on PR #23628:
URL: https://github.com/apache/pulsar/pull/23628#issuecomment-2500747511
@lhotari ok, I thought we need to add integration tests to the CI
I just verified it with below steps:
1. checkout pulsar to previous version
(eddf395631811a731fe9c0284b44fd2f6efd2026)
2. update the `LoggingWindowFunction` like below:
```
package org.apache.pulsar.functions.api.examples.window;
import java.util.Collection;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;
import org.slf4j.Logger;
/**
* A function that demonstrates how to redirect logging to a topic.
* In this particular example, for every input string, the function
* does some logging. If --logTopic topic is specified, these log statements
* end up in that specified pulsar topic.
*/
public class LoggingWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext
context) throws Exception {
Logger log = context.getLogger();
log.info("tenant: {}, namespace: {}, instanceId: {}, replicas: {}",
context.getTenant(), context.getNamespace(),
context.getInstanceId(), context.getNumInstances());
for (Record<String> record : inputs) {
log.info(record + "-window-log");
}
return null;
}
}
```
3. build the `pulsar-functions/java-examples` project, it generated a `--jar
pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar`
4. checkout pulsar to master (7909d2dfdb4aad8053c133ce6a00d5dddf0b9db8)
5. start the pulsar locally:
```
./bin/pulsar standalone
```
6. create the window function
```
./bin/pulsar-admin functions create --tenant public --namespace default
--name window-function --className
org.apache.pulsar.functions.api.examples.window.LoggingWindowFunction --inputs
persistent://public/default/test-window-input --output
persistent://public/default/test-window-output --log-topic
persistent://public/default/test-window-logs --jar
pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar --cpu
0.1 --window-length-count 10 --sliding-interval-count 5
```
7. produce 5 messages to the input topic:
```
./bin/pulsar-client produce -m "test-message" --value-schema string -n 5
persistent://public/default/test-window-input
```
8. consume from the log topic:
```
./bin/pulsar-client consume -n 0 -s mysub --subscription-position Earliest
persistent://public/default/test-window-logs
```
9. the function works correctly and I got expected messages from the log
topic:
```
----- got message -----
key:[null], properties:[fqn=public/default/window-function, instance=0,
loglevel=INFO], content:Window Config: WindowConfig(windowLengthCount=10,
windowLengthDurationMs=null, slidingIntervalCount=5,
slidingIntervalDurationMs=null, lateDataTopic=null, maxLagMs=null,
watermarkEmitIntervalMs=null, timestampExtractorClassName=null,
actualWindowFunctionClassName=org.apache.pulsar.functions.api.examples.window.LoggingWindowFunction,
processingGuarantees=ATLEAST_ONCE)
----- got message -----
key:[null], properties:[fqn=public/default/window-function, instance=0,
loglevel=INFO], content:tenant: public, namespace: default, instanceId: 0,
replicas: 1
----- got message -----
key:[null], properties:[fqn=public/default/window-function, instance=0,
loglevel=INFO],
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@32afdbf6-window-log
----- got message -----
key:[null], properties:[fqn=public/default/window-function, instance=0,
loglevel=INFO],
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@467cda10-window-log
----- got message -----
key:[null], properties:[fqn=public/default/window-function, instance=0,
loglevel=INFO],
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@3e25cfb6-window-log
----- got message -----
key:[null], properties:[fqn=public/default/window-function, instance=0,
loglevel=INFO],
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@3997d10c-window-log
----- got message -----
key:[null], properties:[fqn=public/default/window-function, instance=0,
loglevel=INFO],
content:org.apache.pulsar.functions.source.PulsarFunctionRecord@752bcda3-window-log
----- got message -----
key:[null], properties:[fqn=public/default/window-function, instance=0,
loglevel=INFO], content:[persistent://public/default/test-window-input]
[public/default/window-function] [ef1db] Prefetched messages: 0 --- Consume
throughput received: 0.08 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.08 ack/s
--- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
----- got message -----
key:[null], properties:[fqn=public/default/window-function, instance=0,
loglevel=INFO], content:[persistent://public/default/test-window-logs]
[standalone-23-1] --- Publish throughput: 0.12 msg/s --- 0.00 Mbit/s ---
Latency: med: 117.000 ms - 95pct: 117.000 ms - 99pct: 117.000 ms - 99.9pct:
117.000 ms - max: 117.000 ms --- BatchSize: med: 6.000 - 95pct: 6.000 - 99pct:
6.000 - 99.9pct: 6.000 - max: 6.000 --- MsgSize: med: 437.000 bytes - 95pct:
437.000 bytes - 99pct: 437.000 bytes - 99.9pct: 437.000 bytes - max: 437.000
bytes --- Ack received rate: 0.12 ack/s --- Failed messages: 0 --- Pending
messages: 1
2024-11-26T12:54:54,980+0000 [pulsar-timer-16-1] INFO
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl -
[persistent://public/default/test-window-logs] [mysub] [9415c] Prefetched
messages: 0 --- Consume throughput received: 0.15 msgs/s --- 0.00 Mbit/s ---
Ack sent rate: 0.15 ack/s --- Failed messages: 0 --- batch messages: 0
---Failed acks: 0
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]