Hey!

I've been trying to benchmark a change I've made to Beam, and I'm interested in how encoder performance affect the performance. I'm using nexmark as a base, but it seems that if I set the numEvents to above 991683. Here is the parameters I'm using to execute this:


/home/rhermes/build/beam/gradlew -p /home/rhermes/build/beam 
-Pnexmark.runner=":runners:flink:1.10" -Pnexmark.args="
--runner=FlinkRunner
--streaming=true
--manageResources=false
--monitorJobs=true
--debug=true
--flinkMaster=[local]
--query=0
--fasterCopy=false
--numEvents=991684
--coderStrategy=JAVA
" :sdks:java:testing:nexmark:run


If I change 991684 to 991683, then it works. I arrived at this by doing a binary search by hand. It's the exact same limit if JAVA is changed with AVRO. With HAND it works fine.

My current guess is that the 991684 event overflows some buffer or something. Also the Events implement the KnownSize interface, which is not correct for coders other than the HAND coders. I don't know if this plays a role, just something to note.

The exception thrown ends like this.

       at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        ... 4 more
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
Caught exception while processing timer.
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1073)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1041)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1503)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1492)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:255)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{java.lang.RuntimeException: Failed to decode encoded 
key: [-84, -19, 0, 5, 117, 114, 0, 2, 91, 66, -84, -13, 23, -8, 6, 8, 84, -32, 
2, 0, 0, 120, 112, 0, 0, 0, -128, -25, 24, -44, -16, 51, 108, 70, -105, -38, 3, 
-36, -31, 123, 41, 89, 75, -6, 121, 72, -19, 41, 125, 98, 101, -40, -103, -38, 
121
, 51, 89, -16, 16, 28, -123, 38, 101, -75, 13, -96, 33, 93, 64, 109, 118, -108, 
-26, 101, 47, 99, 28, 107, 83, -93, -24, 9, -48, -97, -96, 69, 106, -42, 16, 
45, 62, 54, -118, 36, 6, 66, 15, -112, 103, 95, -23, -8, -92, 101, -48, -20, 
-66, -57, -83, 40, 78, -110, -7, -1, 113, 94, 13, -9, -79, -72, 99, 55, -90, 
-13, 47, 53, -128, -39, -119, -47, 86, -91, -78, 77, -18, 17, -61, 69, -57, 
-120, 96, -101, 91, -11, -83, -56, 38, 29, -18, -115, 60, -12, 49, -96, 36]}
        ... 11 more
Caused by: java.lang.RuntimeException: Failed to decode encoded key: [-84, -19, 
0, 5, 117, 114, 0, 2, 91, 66, -84, -13, 23, -8, 6, 8, 84, -32, 2, 0, 0, 120, 
112, 0, 0, 0, -128, -25, 24, -44, -16, 51, 108, 70, -105, -38, 3, -36, -31, 
123, 41, 89, 75, -6, 121, 72, -19, 41, 125, 98, 101, -40, -103, -38, 121, 51, 
89, -16, 16, 28, -123, 38, 101, -75, 13, -96, 33, 93, 64, 109, 118, -108, -26, 
101, 47, 99, 28, 107, 83, -93, -24, 9, -48, -97, -96, 69, 106, -42, 16, 45, 62, 
54, -118, 36, 6, 66, 15, -112, 103, 95, -23, -8, -92, 101, -48, -20, -66, -57, 
-83, 40, 78, -110, -7, -1, 113, 94, 13, -9, -79, -72, 99, 55, -90, -13, 47, 53, 
-128, -39, -119, -47, 86, -91, -78, 77, -18, 17, -61, 69, -57, -120, 96, -101, 
91, -11, -83, -56, 38, 29, -18, -115, 60, -12, 49, -96, 36]
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils.decodeKey(FlinkKeyUtils.java:66)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.getKey(FlinkStateInternals.java:182)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:177)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1501)
        ... 10 more
Caused by: java.lang.IllegalArgumentException: Forbidden IOException when 
reading from InputStream
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:121)
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils.decodeKey(FlinkKeyUtils.java:63)
        ... 16 more
Caused by: java.io.EOFException: reached end of stream after reading 152 bytes; 
13996 bytes expected
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:780)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:762)
        at 
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:108)
        at 
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
        ... 18 more

Does anyone have any ideas as to what this might be or how I could go about debugging this? Any help is much appriciated.

Regards
Teodor Spæren

Reply via email to