They released Pulsar 2.4.2, and I was able to pull its dependencies and successfully submit the Flink job. It's able to receive messages from the Pulsar topic successfully. However, I still don't think I'm using the AggregateFunction correctly.
I added logging statements everywhere in my code, and I'm able to see my message reach the `add` method in the AggregateFunction that I implemented, but the getResult method is never called. In the code below, I also never see the: "Ran dataStream. Adding sink next" line appear in my log, and the only log statements from the JsonConcatenator class come from the `add` method, as shown below. DataStream<String> combinedEnvelopes = dataStream .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2 map(String incomingMessage) throws Exception { return mapToTuple(incomingMessage); } }) .keyBy(0) .window(EventTimeSessionWindows.withGap(Time.seconds(20))) .aggregate(new JsonConcatenator()); Logger logger = LoggerFactory.getLogger(StreamJob.class); logger.info("Ran dataStream. Adding sink next") ------------- private static class JsonConcatenator implements AggregateFunction<Tuple2<String, String>, Tuple2<String, String>, String> { Logger logger = LoggerFactory.getLogger(SplinklerJob.class); @Override public Tuple2<String, String> createAccumulator() { return new Tuple2<String, String>("",""); } @Override public Tuple2<String, String> add(Tuple2<String, String> value, Tuple2<String, String> accumulator) { logger.info("Running Add on value.f0: " + value.f0 + " and value.f1: " + value.f1); return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1); } @Override public String getResult(Tuple2<String, String> accumulator) { logger.info("Running getResult on accumulator.f1: " + accumulator.f1); return "[" + accumulator.f1 + "]"; } @Override public Tuple2<String, String> merge(Tuple2<String, String> a, Tuple2<String, String> b) { logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1 + " and b.f1: " + b.f1); return new Tuple2<>(a.f0, a.f1 + ", " + b.f1); } } Any ideas? Chris Miller-2 wrote > I hit the same problem, as far as I can tell it should be fixed in > Pulsar 2.4.2. The release of this has already passed voting so I hope it > should be available in a day or two. > > https://github.com/apache/pulsar/pull/5068 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/