I mean the timeout should likely happens in the sending queue of the redis lib if the concurrency number is low.
---------org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(StreamRecord<IN>)---- public void processElement(StreamRecord<IN> element) throws Exception { final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); if (timeout > 0L) { // register a timeout for this AsyncStreamRecordBufferEntry long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer( timeoutTimestamp, new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { streamRecordBufferEntry.completeExceptionally( new TimeoutException("Async function call has timed out.")); } }); // Cancel the timer once we've completed the stream record buffer entry. This will remove // the register trigger task streamRecordBufferEntry.onComplete( (StreamElementQueueEntry<Collection<OUT>> value) -> { timerFuture.cancel(true); }, executor); } addAsyncBufferEntry(streamRecordBufferEntry); userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry); } ------------- The timer would be set even the entry wait for the free queue slot. It seems a bug? Because if timeout happens before the addAsyncBufferEntry returns, and it would step into asyncInvoke anyways, but the "future" would be failed afterwards immediately. 2018-01-04 21:31 GMT+08:00 Jinhua Luo <luajit...@gmail.com>: > 2018-01-04 21:04 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: >> Memory usage should grow linearly with the number of windows you have active >> at any given time, the number of keys and the number of different window >> operations you have. > > But the memory usage is still too much, especially when the > incremental aggregation is used. > >> Regarding the async I/O writing to redis, I see that you give a capacity of >> 10000 which means that there will possibly be 10000 concurrent connections >> to Redis. This might be a bit to much so could you try reducing that to >> avoid timeouts? > > It's not related to that part. In fact, I commented the async io codes > and test, the memory usage is almost the same. > > And, on the contrary, I need to increase the concurrency number, > because I have totally millions of aggregation results to sent per > min! > If the number is low, it would trigger timeout (yes, even the timeout > value is 30 seconds, I think it's related to the single connection > model of lettuce lib).