Hi,

Flink 1.13.1
Scala 2.12.4

I have an implementation of an AbstractStreamOperator, where in it's
processElement function enqueues an element to a queue which is pooled from
a background thread.

When processing the elements in the background, I use the Output class to
emit elements downstream with a timestamp.

[image: image.png]

Is there any problem using Output from a background thread? I didn't see
anything in the docs and looking in the implementation I didn't see
anything that could be potentially problematic.

The reason I'm asking this is that I'm seeing some weird behavior,
particularly with runtime exceptions happening with Flink SQL auto
generated code and I'm wondering if this may be causing issues. I was
previously using a MailboxExecutor to emit elements downstream from the
ASO, but that turned out to be problematic on its own as it doesn't honor
backpressure from downstream operators.

For example, I'm seeing the following exception at runtime from one of my
Flink SQL apps:

flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager java.lang.IndexOutOfBoundsException: null
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:375)
~[hunting-pipeline.jar:0.1-SNAPSHOT]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.table.data.binary.BinarySegmentUtils.bitGet(BinarySegmentUtils.java:461)
~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:157)
~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at KeyProjection$245.apply(Unknown Source) ~[?:?]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at KeyProjection$245.apply(Unknown Source) ~[?:?]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:49)
~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:28)
~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:526)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:514)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:204)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[flink-dist_2.12-1.13.1.jar:1.13.1]
flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6
jobmanager      at java.lang.Thread.run(Thread.java:829) ~[?:?]

And this is the runtime generated code for KeyProjection:

public class KeyProjection$56 implements
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
org.apache.flink.table.data.binary.BinaryRowData> {
org.apache.flink.table.data.binary.BinaryRowData out = new
org.apache.flink.table.data.binary.BinaryRowData(1);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new
org.apache.flink.table.data.writer.BinaryRowWriter(out);  public
KeyProjection$56(Object[] references) throws Exception {  }  @Override
  public org.apache.flink.table.data.binary.BinaryRowData
apply(org.apache.flink.table.data.RowData in1) {
    org.apache.flink.table.data.binary.BinaryStringData field$57;
boolean isNull$57;outWriter.reset();isNull$57 = in1.isNullAt(0);
field$57 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$57) {
  field$57 = ((org.apache.flink.table.data.binary.BinaryStringData)
in1.getString(0));
}
if (isNull$57) {
  outWriter.setNullAt(0);
} else {
  outWriter.writeString(0, field$57);
}outWriter.complete();    return out;
  }
}

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to