Still trying to resolve this serialization issue. I was able to hack it by
'serializing' `Record` to String and then 'deserializing' it in coGroup,
but boy its so ugly.

So the bug is that it can't deserialize the case class that has the
structure (slightly different and more detailed than I stated above):
```
case class Record(name: Name, phone: Option[Phone], address:
Option[Address])

case class Name(givenName: Option[String], middleName: Option[String],
familyName: Option[String], generationSuffix: Option[String] = None)

trait Address{
  val city: String
  val state: String
  val country: String
  val latitude: Double
  val longitude: Double
  val postalCode: String
  val zip4: String
  val digest: String
}


case class PoBox(city: String,
                 state: String,
                 country: String,
                 latitude: Double,
                 longitude: Double,
                 postalCode: String,
                 zip4: String,
                 digest: String,
                 poBox: String
                ) extends Address

case class PostalAddress(city: String,
                         state: String,
                         country: String,
                         latitude: Double,
                         longitude: Double,
                         postalCode: String,
                         zip4: String,
                         digest: String,
                         preDir: String,
                         streetName: String,
                         streetType: String,
                         postDir: String,
                         house: String,
                         aptType: String,
                         aptNumber: String
                        ) extends Address
```

I would expect that serialization is one of Flink cornerstones and should
be well tested, so there is a high chance of me doing things wrongly, but I
can't really find anything unusual in my code.

Any suggestion what to try is highly welcomed.

Thanks,
Timur


On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <timur.fairu...@gmail.com>
wrote:

> Hello Robert,
>
> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
> with a cluster (that I didn't dig into), when I restarted the cluster I was
> able to go past it, so now I have the following exception:
>
> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
> at
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
> -> Filter (Filter at
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
> Reading Thread' terminated due to an exception: Serializer consumed more
> bytes than the record had. This indicates broken serialization. If you are
> using custom serialization types (Value or Writable), check their
> serialization methods. If you are using a Kryo-serialized type, check the
> corresponding Kryo serializer.
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception:
> Serializer consumed more bytes than the record had. This indicates broken
> serialization. If you are using custom serialization types (Value or
> Writable), check their serialization methods. If you are using a
> Kryo-serialized type, check the corresponding Kryo serializer.
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> at
> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.io.IOException: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
> at
> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> ... 5 more
>
> Thanks,
> Timur
>
> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> For the second exception, can you check the logs of the failing
>> taskmanager (10.105.200.137)?
>> I guess these logs some details on why the TM timed out.
>>
>>
>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>> We recently changed something related to the ExecutionConfig which has
>> lead to Kryo issues in the past.
>>
>>
>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov <
>> timur.fairu...@gmail.com> wrote:
>>
>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>> following exception:
>>>
>>> java.lang.IllegalStateException: Update task on instance
>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>> at
>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> at
>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>> at
>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>> after [10000 ms]
>>> at
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>> at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>> at
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>> at
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>
>>> Thanks,
>>> Timur
>>>
>>>
>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <
>>> timur.fairu...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm running a Flink program that is failing with the following
>>>> exception:
>>>>
>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>                     - Error while running the command.
>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>> execution failed: Job execution failed.
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>> at
>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>> at
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>> at
>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>> at
>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>> at
>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>> at
>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>> at scala.Option.foreach(Option.scala:257)
>>>> at
>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>> at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>> at
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>> at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>> at
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>> at
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>> CoGroup (CoGroup at
>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>> Filter (Filter at
>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> No more bytes left.
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>> at
>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>> ... 3 more
>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: No more bytes left.
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>> Caused by: java.io.EOFException: No more bytes left.
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>> at
>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>> at
>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>> at
>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>> at
>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>> at
>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>> at
>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>> at
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>> The simplified version of the code looks more or less like following:
>>>> ```
>>>> case class Name(first: String, last: String)
>>>> case class Phone(number: String)
>>>> case class Address(addr: String, city: String, country: String)
>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>> ...
>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>> String = ...
>>>> ...
>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>
>>>> val helper: DataSet[(Name, String)] = ...
>>>>
>>>> val result = data.filter(_.address.isDefined)
>>>>   .coGroup(helper)
>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>> e.address.get.country)))
>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>   .apply {resolutionFunc}
>>>>   .filter(_ != "")
>>>>
>>>> result.writeAsText(...)
>>>> ```
>>>>
>>>> This code fails only when I run it on the full dataset, when I split
>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>> complete successfully. I guess with smaller memory requirements
>>>> serialization/deserialization does not kick in.
>>>>
>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>> ```
>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>> classOf[ProtobufSerializer])
>>>>
>>>> ```
>>>> but every run takes significant time before failing, so any other
>>>> advice is appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>
>>>
>>
>

Reply via email to