xinyiZzz opened a new pull request, #42216:
URL: https://github.com/apache/doris/pull/42216
Before each get queue, will set sink task dependency ready.
so if the sink task put queue faster than the fetch result get queue, the
queue size will always be 10.
Be sure to set sink dependency ready before getting queue.
otherwise, if queue is emptied after sink task put queue and before block
dependency, get queue will stuck and will never set sink dependency ready.
Fix:
```
WARN org.apache.doris.flink.backend.BackendClient [] - Get
next from Doris BE{host='', port=9060} failed.
org.apache.doris.shaded.org.apache.thrift.transport.TTransportException:
java.net.SocketTimeoutException: Read timed out
at
org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:179)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.shaded.org.apache.thrift.transport.TTransport.readAll(TTransport.java:109)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:464)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:362)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:245)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.shaded.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.sdk.thrift.TDorisExternalService$Client.recvGetNext(TDorisExternalService.java:92)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.sdk.thrift.TDorisExternalService$Client.getNext(TDorisExternalService.java:79)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.flink.backend.BackendClient.getNext(BackendClient.java:185)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:243)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:71)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:34)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:140)
~[flink-connector-files-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
[flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_191]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
~[?:1.8.0_191]
at java.net.SocketInputStream.read(SocketInputStream.java:171)
~[?:1.8.0_191]
at java.net.SocketInputStream.read(SocketInputStream.java:141)
~[?:1.8.0_191]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
~[?:1.8.0_191]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
~[?:1.8.0_191]
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
~[?:1.8.0_191]
at
org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:177)
~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
... 24 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]