They released Pulsar 2.4.2, and I was able to pull its dependencies and
successfully submit the Flink job. 
It's able to receive messages from the Pulsar topic successfully. However, I
still don't think I'm using the AggregateFunction correctly.

I added logging statements everywhere in my code, and I'm able to see my
message reach the `add` method in the AggregateFunction that I implemented,
but the getResult method is never called. 

In the code below, I also never see the:
 "Ran dataStream. Adding sink next"
line appear in my log, and the only log statements from the JsonConcatenator
class come from the `add` method, as shown below. 



DataStream<String> combinedEnvelopes = dataStream
    .map(new MapFunction<String, Tuple2&lt;String, String>>() {
        @Override
        public Tuple2 map(String incomingMessage) throws Exception {
            return mapToTuple(incomingMessage);
        }
    })
    .keyBy(0)
    .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
    .aggregate(new JsonConcatenator());

Logger logger = LoggerFactory.getLogger(StreamJob.class);
logger.info("Ran dataStream. Adding sink next")

-------------

private static class JsonConcatenator
        implements AggregateFunction<Tuple2&lt;String, String>,
Tuple2<String, String>, String> {
    Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
    @Override
    public Tuple2<String, String> createAccumulator() {
        return new Tuple2<String, String>("","");
    }

    @Override
    public Tuple2<String, String> add(Tuple2<String, String> value,
Tuple2<String, String> accumulator) {
        logger.info("Running Add on value.f0: " + value.f0 + " and value.f1:
" + value.f1);
        return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
    }

    @Override
    public String getResult(Tuple2<String, String> accumulator) {
        logger.info("Running getResult on accumulator.f1: " +
accumulator.f1);
        return "[" + accumulator.f1 + "]";
    }

    @Override
    public Tuple2<String, String> merge(Tuple2<String, String> a,
Tuple2<String, String> b) {
        logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1
+ " and b.f1: " + b.f1);
        return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
    }
}




Any ideas? 


Chris Miller-2 wrote
> I hit the same problem, as far as I can tell it should be fixed in 
> Pulsar 2.4.2. The release of this has already passed voting so I hope it 
> should be available in a day or two.
> 
> https://github.com/apache/pulsar/pull/5068





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to