I have setup a Flume-NG 1.4.0-cdh4.5.0 spooldir source agent to pickup a CSV file from a directory and pass it to Spark Streaming Flume Stream 0.9.0-incubating. When the file has 2 rows in it, there is no error. Once I add another row, then I get the following error. Mind you, each row has 75 columns so there are 75 values all coming from an ad server event log.
14/03/20 18:49:20 ERROR executor.Executor: Exception in task ID 1387 java.io.StreamCorruptedException: invalid stream header: 353A3030 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:800) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:297) at org.apache.spark.util.Utils$.deserialize(Utils.scala:61) at org.apache.spark.streaming.flume.SparkFlumeEvent$$anonfun$readExternal$1.apply(FlumeInputDStream.scala:73) at org.apache.spark.streaming.flume.SparkFlumeEvent$$anonfun$readExternal$1.apply(FlumeInputDStream.scala:69) at scala.collection.immutable.Range.foreach(Range.scala:141) at org.apache.spark.streaming.flume.SparkFlumeEvent.readExternal(FlumeInputDStream.scala:69) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1809) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:104) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) 14/03/20 18:49:20 WARN scheduler.TaskSetManager: Lost TID 1387 (task 1848.0:0) 14/03/20 18:49:20 WARN scheduler.TaskSetManager: Loss was due to java.io.StreamCorruptedException java.io.StreamCorruptedException: invalid stream header: 353A3030 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:800) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:297) at org.apache.spark.util.Utils$.deserialize(Utils.scala:61) at org.apache.spark.streaming.flume.SparkFlumeEvent$$anonfun$readExternal$1.apply(FlumeInputDStream.scala:73) at org.apache.spark.streaming.flume.SparkFlumeEvent$$anonfun$readExternal$1.apply(FlumeInputDStream.scala:69) at scala.collection.immutable.Range.foreach(Range.scala:141) at org.apache.spark.streaming.flume.SparkFlumeEvent.readExternal(FlumeInputDStream.scala:69) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1809) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:104) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) 14/03/20 18:49:20 ERROR scheduler.TaskSetManager: Task 1848.0:0 failed 1 times; aborting job If there is a way to make this work, please let me know or explain to me what is going on? Thanks, Ben -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Flume-Corrupted-Stream-Error-tp2964.html Sent from the Apache Spark User List mailing list archive at Nabble.com.