I mean the timeout should likely happens in the sending queue of the
redis lib if the concurrency number is low.


---------org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(StreamRecord<IN>)----


public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new
StreamRecordQueueEntry<>(element);

if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout +
getProcessingTimeService().getCurrentProcessingTime();

final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
streamRecordBufferEntry.completeExceptionally(
new TimeoutException("Async function call has timed out."));
}
});

// Cancel the timer once we've completed the stream record buffer
entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
},
executor);
}

addAsyncBufferEntry(streamRecordBufferEntry);

userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}

-------------


The timer would be set even the entry wait for the free queue slot.
It seems a bug? Because if timeout happens before the
addAsyncBufferEntry returns, and it would step into asyncInvoke
anyways, but the "future" would be failed afterwards immediately.


2018-01-04 21:31 GMT+08:00 Jinhua Luo <luajit...@gmail.com>:
> 2018-01-04 21:04 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>> Memory usage should grow linearly with the number of windows you have active 
>> at any given time, the number of keys and the number of different window 
>> operations you have.
>
> But the memory usage is still too much, especially when the
> incremental aggregation is used.
>
>> Regarding the async I/O writing to redis, I see that you give a capacity of 
>> 10000 which means that there will possibly be 10000 concurrent connections 
>> to Redis. This might be a bit to much so could you try reducing that to 
>> avoid timeouts?
>
> It's not related to that part. In fact, I commented the async io codes
> and test, the memory usage is almost the same.
>
> And, on the contrary, I need to increase the concurrency number,
> because I have totally millions of aggregation results to sent per
> min!
> If the number is low, it would trigger timeout (yes, even the timeout
> value is 30 seconds, I think it's related to the single connection
> model of lettuce lib).

Reply via email to