[ 
https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831458#comment-15831458
 ] 

Marc Hübner commented on FLINK-4719:
------------------------------------

I get a similar exception, when spilling to disk to process big amounts of avro 
files. It might not be related, because at first, flink failed to use the avro 
serializer. But after enforcing the avro serializer it still failed to 
deserialize after spilling to disk. So I am assuming that the problem is 
actually located in "SpillingAdaptiveSpanningRecordDeserializer". I am using 
scala and flink 1.1.4.

My Code (removed the import for the Document avro class, the io utils are 
reading from hdfs):
{code:java}
import java.io.IOException

import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.solr.common.params.MapSolrParams
import org.apache.solr.update.processor.TextProfileSignature
import org.slf4j.LoggerFactory

object DeduplicationFilterJob {
  private val LOG = LoggerFactory.getLogger(getClass)

  def main(args: Array[String]) {
    if (args.length != 1) {
      printUsage()
      System.exit(-1)
    }
    try {
      LOG.info("Loading configuration from file " + args(0))
      val conf = ParameterTool.fromPropertiesFile(args(0)).getConfiguration
      val exitVal: Int = run(conf)
      System.exit(exitVal)
    } catch {
      case e: IOException =>
        LOG.error("Could not load job configuration", e)
        printUsage()
    }
  }

  private def printUsage() = {
    println(s"Usage: java -cp [classpath] ${getClass.getName} config")
    println("config - a valid FS path to a config/properties file")
  }

  private def run(conf: Configuration): Int = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setGlobalJobParameters(conf)
    env.getConfig.enableForceAvro()

    if (conf.getInteger("env.parallelism", -1) > 0)
      env.setParallelism(conf.getInteger("env.parallelism", -1))

    val documents: DataSet[Document] =
      DatasetIOUtils.readDocuments(env, conf.getString("input.path", null), 
conf.getString("input.format", "avro"))
      .name("Read documents")

    val signatureDocumentPairs: DataSet[(String, Document)] = 
documents.map((document: Document) => {
      val contentSignature = new TextProfileSignature
      contentSignature.init(new MapSolrParams(new java.util.HashMap[String, 
String]()))
      contentSignature.add(document.getText)
      val signature = new String(contentSignature.getSignature, "UTF-8")
      (signature, document)
    }).name("Compute document signatures")

    val deduplicatedDocuments = signatureDocumentPairs
      .groupBy(0)
      .reduceGroup(_.next()._2)
      .name("Select first document per signature group")

    DatasetIOUtils.writeDocuments(
      conf.getString("output.path", null),
      conf.getString("output.format", "avro"),
      deduplicatedDocuments)

    val executionResult: JobExecutionResult = env.execute("deduplication 
filter")
    if (executionResult != null) 1 else -1
  }

}
{code}

The stacktrace:
{code}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:805)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'GroupReduce 
(Select first document per signature group)' , caused an error: Error obtaining 
the sorted input: Thread 'SortMerger Reading Thread' terminated due to an 
exception: 795228258
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:462)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: 795228258
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: 795228258
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 795228258
        at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
        at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:162)
        at 
org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:123)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
        at 
org.apache.flink.api.java.typeutils.runtime.AvroSerializer.deserialize(AvroSerializer.java:123)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
        at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:155)
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:73)
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
{code}

> KryoSerializer random exception
> -------------------------------
>
>                 Key: FLINK-4719
>                 URL: https://issues.apache.org/jira/browse/FLINK-4719
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.1.1
>            Reporter: Flavio Pompermaier
>            Assignee: Nico Kruber
>              Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when 
> using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here 
> can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at 
> main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted 
> input: Thread 'SortMerger spilling thread' terminated due to an exception: 
> Unable to find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: Unable to 
> find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>         at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>         ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
> java.ttil.HashSet
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>         at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>         at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>         at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>         at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:348)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-serialized type, check the corresponding Kryo 
> serializer.
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>     at java.util.ArrayList.elementData(ArrayList.java:418)
>     at java.util.ArrayList.get(ArrayList.java:431)
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>     at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> {code}
> {code}
> java.lang.RuntimeException: Cannot instantiate class.
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>       at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>       at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: 
> it.okkam.flink.test.model.pojo.VdhicleEvent
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>       ... 10 more
> {code}
> {code}
> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>         at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>         at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>         at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>         at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>         at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>         at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>       at java.util.ArrayList.elementData(ArrayList.java:418)
>       at java.util.ArrayList.get(ArrayList.java:431)
>       at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>       at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>       at 
> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>       at 
> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>       at 
> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
>       at 
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to