Hi Simone,

I suppose that you use messageStream.keyBy(…).window(…) right? .windowAll() is 
not applicable to keyedStreams.

Some follow up questions are:

In your logs, do you see any error messages? 
What does your RowToQuery() sink do? Can it be that it blocks and the back 
pressure makes all the pipeline stall?
To check that, you can: 
        1) check the webui for backpressure metrics
        2) replace your sink with a dummy one that just prints whatever it 
receives
        3) or even put a flatmap after reading from Kafka (before the keyBy()) 
that prints the elements before sending 
                them downstream, so that you know if the consumer keeps on 
reading.

Let us know what is the result for the previous.

Thanks,
Kostas

> On May 16, 2017, at 10:44 AM, simone <simone.povosca...@gmail.com> wrote:
> 
> Hi to all,
> 
> I have a problem with Flink and Kafka queues.
> 
> I have a Producer that puts some Rows into a data Sink represented by a kafka 
> queue and a Consumer that reads from this sink and process Rows in buckets of 
> N elements using custom trigger function
> messageStream.keyBy(0)
>         .windowAll(GlobalWindows.create())
>         .trigger(CountWithTimeoutTrigger.of(Time.seconds(30), N))
>         .apply(new RowToQuery());
> 
> 
> The problem is that the Consumer, stop to consume data once reached about 
> 1000 rows.
> With N = 20 the consumer process 50 buckets for a total of 1000 elements. 
> With N = 21 the consumer process 48 buckets for a total of 1008 elements.
> With N = 68 the consumer process 15 buckets for a total of 1020 elements. And 
> so on... 
> The same happens also without using a custom trigger function, but with 
> simple CountTrigger function:
> 
> messageStream.keyBy(0)
>         .windowAll(GlobalWindows.create())
>          .trigger(PurgingTrigger.of(CountTrigger.of(N)))
>          .apply(new RowToQuery());
> How is it possible? Is there any properties on Consumer to be set in order to 
> process more data?
> 
> Thanks,
> 
> Simone.

Reply via email to