Thank you Till. I will try to run with new binaries today. As I have mentioned, the error is reproducible only on a full dataset, so coming up with sample input data may be problematic (not to mention that the real data can't be shared). I'll see if I can replicate it, but could take a bit longer. Thank you very much for your effort.
On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Timur, > > I’ve got good and not so good news. Let’s start with the not so good news. > I couldn’t reproduce your problem but the good news is that I found a bug > in the duplication logic of the OptionSerializer. I’ve already committed > a patch to the master to fix it. > > Thus, I wanted to ask you, whether you could try out the latest master and > check whether your problem still persists. If that’s the case, could you > send me your complete code with sample input data which reproduces your > problem? > > Cheers, > Till > > > On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Could this be caused by the disabled reference tracking in our Kryo >> serializer? From the stack trace it looks like its failing when trying to >> deserialize the traits that are wrapped in Options. >> >> On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <u...@apache.org> wrote: >> >>> Hey Timur, >>> >>> I'm sorry about this bad experience. >>> >>> From what I can tell, there is nothing unusual with your code. It's >>> probably an issue with Flink. >>> >>> I think we have to wait a little longer to hear what others in the >>> community say about this. >>> >>> @Aljoscha, Till, Robert: any ideas what might cause this? >>> >>> – Ufuk >>> >>> >>> On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov >>> <timur.fairu...@gmail.com> wrote: >>> > Still trying to resolve this serialization issue. I was able to hack >>> it by >>> > 'serializing' `Record` to String and then 'deserializing' it in >>> coGroup, but >>> > boy its so ugly. >>> > >>> > So the bug is that it can't deserialize the case class that has the >>> > structure (slightly different and more detailed than I stated above): >>> > ``` >>> > case class Record(name: Name, phone: Option[Phone], address: >>> > Option[Address]) >>> > >>> > case class Name(givenName: Option[String], middleName: Option[String], >>> > familyName: Option[String], generationSuffix: Option[String] = None) >>> > >>> > trait Address{ >>> > val city: String >>> > val state: String >>> > val country: String >>> > val latitude: Double >>> > val longitude: Double >>> > val postalCode: String >>> > val zip4: String >>> > val digest: String >>> > } >>> > >>> > >>> > case class PoBox(city: String, >>> > state: String, >>> > country: String, >>> > latitude: Double, >>> > longitude: Double, >>> > postalCode: String, >>> > zip4: String, >>> > digest: String, >>> > poBox: String >>> > ) extends Address >>> > >>> > case class PostalAddress(city: String, >>> > state: String, >>> > country: String, >>> > latitude: Double, >>> > longitude: Double, >>> > postalCode: String, >>> > zip4: String, >>> > digest: String, >>> > preDir: String, >>> > streetName: String, >>> > streetType: String, >>> > postDir: String, >>> > house: String, >>> > aptType: String, >>> > aptNumber: String >>> > ) extends Address >>> > ``` >>> > >>> > I would expect that serialization is one of Flink cornerstones and >>> should be >>> > well tested, so there is a high chance of me doing things wrongly, but >>> I >>> > can't really find anything unusual in my code. >>> > >>> > Any suggestion what to try is highly welcomed. >>> > >>> > Thanks, >>> > Timur >>> > >>> > >>> > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov < >>> timur.fairu...@gmail.com> >>> > wrote: >>> >> >>> >> Hello Robert, >>> >> >>> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an >>> issue >>> >> with a cluster (that I didn't dig into), when I restarted the cluster >>> I was >>> >> able to go past it, so now I have the following exception: >>> >> >>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup >>> (CoGroup >>> >> at >>> >> >>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158)) >>> >> -> Filter (Filter at >>> >> >>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))' >>> >> , caused an error: Error obtaining the sorted input: Thread >>> 'SortMerger >>> >> Reading Thread' terminated due to an exception: Serializer consumed >>> more >>> >> bytes than the record had. This indicates broken serialization. If >>> you are >>> >> using custom serialization types (Value or Writable), check their >>> >> serialization methods. If you are using a Kryo-serialized type, check >>> the >>> >> corresponding Kryo serializer. >>> >> at >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>> >> at >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> >> at java.lang.Thread.run(Thread.java:745) >>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>> input: >>> >> Thread 'SortMerger Reading Thread' terminated due to an exception: >>> >> Serializer consumed more bytes than the record had. This indicates >>> broken >>> >> serialization. If you are using custom serialization types (Value or >>> >> Writable), check their serialization methods. If you are using a >>> >> Kryo-serialized type, check the corresponding Kryo serializer. >>> >> at >>> >> >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>> >> at >>> >> >>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>> >> at >>> >> >>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >>> >> at >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>> >> ... 3 more >>> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>> >> terminated due to an exception: Serializer consumed more bytes than >>> the >>> >> record had. This indicates broken serialization. If you are using >>> custom >>> >> serialization types (Value or Writable), check their serialization >>> methods. >>> >> If you are using a Kryo-serialized type, check the corresponding Kryo >>> >> serializer. >>> >> at >>> >> >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>> >> Caused by: java.io.IOException: Serializer consumed more bytes than >>> the >>> >> record had. This indicates broken serialization. If you are using >>> custom >>> >> serialization types (Value or Writable), check their serialization >>> methods. >>> >> If you are using a Kryo-serialized type, check the corresponding Kryo >>> >> serializer. >>> >> at >>> >> org.apache.flink.runtime.io >>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >>> >> at >>> >> org.apache.flink.runtime.io >>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>> >> at >>> >> org.apache.flink.runtime.io >>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>> >> at >>> >> >>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>> >> at >>> >> >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>> >> at >>> >> >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>> >> at >>> >> >>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104) >>> >> at >>> >> org.apache.flink.runtime.io >>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254) >>> >> at >>> >> org.apache.flink.runtime.io >>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259) >>> >> at org.apache.flink.types.StringValue.readString(StringValue.java:771) >>> >> at >>> >> >>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>> >> at >>> >> >>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74) >>> >> at >>> >> >>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>> >> at >>> >> >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >>> >> at >>> >> >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>> >> at >>> >> >>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>> >> at >>> >> org.apache.flink.runtime.io >>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>> >> ... 5 more >>> >> >>> >> Thanks, >>> >> Timur >>> >> >>> >> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <rmetz...@apache.org> >>> >> wrote: >>> >>> >>> >>> For the second exception, can you check the logs of the failing >>> >>> taskmanager (10.105.200.137)? >>> >>> I guess these logs some details on why the TM timed out. >>> >>> >>> >>> >>> >>> Are you on 1.0.x or on 1.1-SNAPSHOT? >>> >>> We recently changed something related to the ExecutionConfig which >>> has >>> >>> lead to Kryo issues in the past. >>> >>> >>> >>> >>> >>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov >>> >>> <timur.fairu...@gmail.com> wrote: >>> >>>> >>> >>>> Trying to use ProtobufSerializer -- program consistently fails with >>> the >>> >>>> following exception: >>> >>>> >>> >>>> java.lang.IllegalStateException: Update task on instance >>> >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - >>> URL: >>> >>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due >>> to: >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) >>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:228) >>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:227) >>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) >>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) >>> >>>> at >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>> >>>> at >>> >>>> >>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) >>> >>>> at >>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) >>> >>>> at >>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) >>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>> >>>> at >>> >>>> >>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) >>> >>>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> >>>> at >>> >>>> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> >>>> at >>> >>>> >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> >>>> at >>> >>>> >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >>> >>>> [Actor[akka.tcp:// >>> flink@10.105.200.137:48990/user/taskmanager#1418296501]] >>> >>>> after [10000 ms] >>> >>>> at >>> >>>> >>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) >>> >>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) >>> >>>> at >>> >>>> >>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) >>> >>>> at >>> >>>> >>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) >>> >>>> at >>> >>>> >>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) >>> >>>> at >>> >>>> >>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) >>> >>>> at >>> >>>> >>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) >>> >>>> at >>> >>>> >>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) >>> >>>> at >>> >>>> >>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>> >>>> at java.lang.Thread.run(Thread.java:745) >>> >>>> >>> >>>> I'm at my wits' end now, any suggestions are highly appreciated. >>> >>>> >>> >>>> Thanks, >>> >>>> Timur >>> >>>> >>> >>>> >>> >>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov >>> >>>> <timur.fairu...@gmail.com> wrote: >>> >>>>> >>> >>>>> Hello, >>> >>>>> >>> >>>>> I'm running a Flink program that is failing with the following >>> >>>>> exception: >>> >>>>> >>> >>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend >>> >>>>> - Error while running the command. >>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The >>> program >>> >>>>> execution failed: Job execution failed. >>> >>>>> at >>> org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>> >>>>> at >>> org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>> >>>>> at >>> org.apache.flink.client.program.Client.runBlocking(Client.java:315) >>> >>>>> at >>> >>>>> >>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) >>> >>>>> at >>> >>>>> >>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) >>> >>>>> at >>> >>>>> >>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>> >>>>> at >>> >>>>> >>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>> >>>>> at scala.Option.foreach(Option.scala:257) >>> >>>>> at >>> >>>>> >>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) >>> >>>>> at >>> >>>>> >>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) >>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> >>>>> at >>> >>>>> >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> >>>>> at >>> >>>>> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>> >>>>> at >>> >>>>> >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>> >>>>> at >>> >>>>> >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>> >>>>> at >>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>> >>>>> at >>> >>>>> >>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>> >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>> >>>>> at >>> >>>>> >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>> >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>> >>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>> Job >>> >>>>> execution failed. >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>> >>>>> at >>> >>>>> >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>> >>>>> at >>> >>>>> >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>> >>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>> >>>>> at >>> >>>>> >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >>> >>>>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> >>>>> at >>> >>>>> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >>> >>>>> at >>> >>>>> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >>> >>>>> at >>> >>>>> >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> >>>>> at >>> >>>>> >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>>>> Caused by: java.lang.Exception: The data preparation for task >>> 'CHAIN >>> >>>>> CoGroup (CoGroup at >>> >>>>> >>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> >>> >>>>> Filter (Filter at >>> >>>>> >>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , >>> >>>>> caused an error: Error obtaining the sorted input: Thread >>> 'SortMerger >>> >>>>> Reading Thread' terminated due to an exception: No more bytes left. >>> >>>>> at >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> >>>>> at java.lang.Thread.run(Thread.java:745) >>> >>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>> >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>> exception: No >>> >>>>> more bytes left. >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >>> >>>>> at >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>> >>>>> ... 3 more >>> >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>> >>>>> terminated due to an exception: No more bytes left. >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>> >>>>> Caused by: java.io.EOFException: No more bytes left. >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) >>> >>>>> at com.esotericsoftware.kryo.io >>> .Input.readUtf8_slow(Input.java:542) >>> >>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535) >>> >>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465) >>> >>>>> at >>> >>>>> >>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198) >>> >>>>> at >>> >>>>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>> >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >>> >>>>> at >>> >>>>> >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>> >>>>> at >>> >>>>> org.apache.flink.runtime.io >>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) >>> >>>>> at >>> >>>>> org.apache.flink.runtime.io >>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>> >>>>> at >>> >>>>> org.apache.flink.runtime.io >>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>> >>>>> at >>> >>>>> >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>> >>>>> >>> >>>>> The simplified version of the code looks more or less like >>> following: >>> >>>>> ``` >>> >>>>> case class Name(first: String, last: String) >>> >>>>> case class Phone(number: String) >>> >>>>> case class Address(addr: String, city: String, country: String) >>> >>>>> case class Record(n: Name, phone: Option[Phone], addr: >>> Option[Address]) >>> >>>>> ... >>> >>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => >>> >>>>> String = ... >>> >>>>> ... >>> >>>>> val data = env.readCsvFile[MySchema](...).map(Record(_)) >>> >>>>> >>> >>>>> val helper: DataSet[(Name, String)] = ... >>> >>>>> >>> >>>>> val result = data.filter(_.address.isDefined) >>> >>>>> .coGroup(helper) >>> >>>>> .where(e => LegacyDigest.buildMessageDigest((e.name, >>> >>>>> e.address.get.country))) >>> >>>>> .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2))) >>> >>>>> .apply {resolutionFunc} >>> >>>>> .filter(_ != "") >>> >>>>> >>> >>>>> result.writeAsText(...) >>> >>>>> ``` >>> >>>>> >>> >>>>> This code fails only when I run it on the full dataset, when I >>> split >>> >>>>> the `data` on smaller chunks (`helper` always stays the same), I'm >>> able to >>> >>>>> complete successfully. I guess with smaller memory requirements >>> >>>>> serialization/deserialization does not kick in. >>> >>>>> >>> >>>>> I'm trying now to explicitly set Protobuf serializer for Kryo: >>> >>>>> ``` >>> >>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record], >>> >>>>> classOf[ProtobufSerializer]) >>> >>>>> >>> >>>>> ``` >>> >>>>> but every run takes significant time before failing, so any other >>> >>>>> advice is appreciated. >>> >>>>> >>> >>>>> Thanks, >>> >>>>> Timur >>> >>>> >>> >>>> >>> >>> >>> >> >>> > >>> >> >