Hi, Flink 1.13.1 Scala 2.12.4
I have an implementation of an AbstractStreamOperator, where in it's processElement function enqueues an element to a queue which is pooled from a background thread. When processing the elements in the background, I use the Output class to emit elements downstream with a timestamp. [image: image.png] Is there any problem using Output from a background thread? I didn't see anything in the docs and looking in the implementation I didn't see anything that could be potentially problematic. The reason I'm asking this is that I'm seeing some weird behavior, particularly with runtime exceptions happening with Flink SQL auto generated code and I'm wondering if this may be causing issues. I was previously using a MailboxExecutor to emit elements downstream from the ASO, but that turned out to be problematic on its own as it doesn't honor backpressure from downstream operators. For example, I'm seeing the following exception at runtime from one of my Flink SQL apps: flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager java.lang.IndexOutOfBoundsException: null flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:375) ~[hunting-pipeline.jar:0.1-SNAPSHOT] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.table.data.binary.BinarySegmentUtils.bitGet(BinarySegmentUtils.java:461) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:157) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at KeyProjection$245.apply(Unknown Source) ~[?:?] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at KeyProjection$245.apply(Unknown Source) ~[?:?] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:49) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:28) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:526) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:514) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:204) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.1.jar:1.13.1] flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager at java.lang.Thread.run(Thread.java:829) ~[?:?] And this is the runtime generated code for KeyProjection: public class KeyProjection$56 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> { org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(1); org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out); public KeyProjection$56(Object[] references) throws Exception { } @Override public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) { org.apache.flink.table.data.binary.BinaryStringData field$57; boolean isNull$57;outWriter.reset();isNull$57 = in1.isNullAt(0); field$57 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$57) { field$57 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)); } if (isNull$57) { outWriter.setNullAt(0); } else { outWriter.writeString(0, field$57); }outWriter.complete(); return out; } } -- Best Regards, Yuval Itzchakov.