Hey!
I've been trying to benchmark a change I've made to Beam, and I'm
interested in how encoder performance affect the performance. I'm using
nexmark as a base, but it seems that if I set the numEvents to above
991683. Here is the parameters I'm using to execute this:
/home/rhermes/build/beam/gradlew -p /home/rhermes/build/beam
-Pnexmark.runner=":runners:flink:1.10" -Pnexmark.args="
--runner=FlinkRunner
--streaming=true
--manageResources=false
--monitorJobs=true
--debug=true
--flinkMaster=[local]
--query=0
--fasterCopy=false
--numEvents=991684
--coderStrategy=JAVA
" :sdks:java:testing:nexmark:run
If I change 991684 to 991683, then it works. I arrived at this by doing
a binary search by hand. It's the exact same limit if JAVA is changed
with AVRO. With HAND it works fine.
My current guess is that the 991684 event overflows some buffer or
something. Also the Events implement the KnownSize interface, which is
not correct for coders other than the HAND coders. I don't know if this
plays a role, just something to note.
The exception thrown ends like this.
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
Caught exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1073)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1041)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1503)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1492)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:255)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{java.lang.RuntimeException: Failed to decode encoded
key: [-84, -19, 0, 5, 117, 114, 0, 2, 91, 66, -84, -13, 23, -8, 6, 8, 84, -32,
2, 0, 0, 120, 112, 0, 0, 0, -128, -25, 24, -44, -16, 51, 108, 70, -105, -38, 3,
-36, -31, 123, 41, 89, 75, -6, 121, 72, -19, 41, 125, 98, 101, -40, -103, -38,
121
, 51, 89, -16, 16, 28, -123, 38, 101, -75, 13, -96, 33, 93, 64, 109, 118, -108,
-26, 101, 47, 99, 28, 107, 83, -93, -24, 9, -48, -97, -96, 69, 106, -42, 16,
45, 62, 54, -118, 36, 6, 66, 15, -112, 103, 95, -23, -8, -92, 101, -48, -20,
-66, -57, -83, 40, 78, -110, -7, -1, 113, 94, 13, -9, -79, -72, 99, 55, -90,
-13, 47, 53, -128, -39, -119, -47, 86, -91, -78, 77, -18, 17, -61, 69, -57,
-120, 96, -101, 91, -11, -83, -56, 38, 29, -18, -115, 60, -12, 49, -96, 36]}
... 11 more
Caused by: java.lang.RuntimeException: Failed to decode encoded key: [-84, -19,
0, 5, 117, 114, 0, 2, 91, 66, -84, -13, 23, -8, 6, 8, 84, -32, 2, 0, 0, 120,
112, 0, 0, 0, -128, -25, 24, -44, -16, 51, 108, 70, -105, -38, 3, -36, -31,
123, 41, 89, 75, -6, 121, 72, -19, 41, 125, 98, 101, -40, -103, -38, 121, 51,
89, -16, 16, 28, -123, 38, 101, -75, 13, -96, 33, 93, 64, 109, 118, -108, -26,
101, 47, 99, 28, 107, 83, -93, -24, 9, -48, -97, -96, 69, 106, -42, 16, 45, 62,
54, -118, 36, 6, 66, 15, -112, 103, 95, -23, -8, -92, 101, -48, -20, -66, -57,
-83, 40, 78, -110, -7, -1, 113, 94, 13, -9, -79, -72, 99, 55, -90, -13, 47, 53,
-128, -39, -119, -47, 86, -91, -78, 77, -18, 17, -61, 69, -57, -120, 96, -101,
91, -11, -83, -56, 38, 29, -18, -115, 60, -12, 49, -96, 36]
at
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils.decodeKey(FlinkKeyUtils.java:66)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.getKey(FlinkStateInternals.java:182)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:177)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1501)
... 10 more
Caused by: java.lang.IllegalArgumentException: Forbidden IOException when
reading from InputStream
at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:121)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils.decodeKey(FlinkKeyUtils.java:63)
... 16 more
Caused by: java.io.EOFException: reached end of stream after reading 152 bytes;
13996 bytes expected
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:780)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:762)
at
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:108)
at
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
... 18 more
Does anyone have any ideas as to what this might be or how I could go
about debugging this? Any help is much appriciated.
Regards
Teodor Spæren