Hey Timur,

I'm sorry about this bad experience.

>From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<timur.fairu...@gmail.com> wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <timur.fairu...@gmail.com> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <timur.fairu...@gmail.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: 
>>>>> No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>

Reply via email to