That did it, thanks Matei!
On Mon, Sep 2, 2013 at 2:30 PM, Matei Zaharia <[email protected]>wrote: > So I think the problem might be that BytesWritable.getBytes() can return > an array bigger than the actual bytes used (see > http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/BytesWritable.html#getBytes()). > It just returns a backing array that can be reused across records. Try > using copyBytes instead: > http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/BytesWritable.html#copyBytes(). > > Matei > > On Sep 2, 2013, at 9:57 AM, Gary Malouf <[email protected]> wrote: > > Hi Matei, > > I've been experimenting in the interpreter to get this to work: > > import org.apache.hadoop.io._; > import > com.mediacrossing.data.interfaces.PurchasedImpressionMessage.PurchasedImpression; > val hdfsBase="hdfs://nn-01:8020/"; > val data = hdfsBase + > "flume/impressions/yr=2013/mo=06/d=16/logger=dn-01s1/Impr.1371340801351"; > val inputs = sc.sequenceFile[LongWritable, BytesWritable](data); > val lineItems = inputs.map({ case(_, value: BytesWritable) => > PurchasedImpression.parseFrom(value.getBytes()).getBuyerData.getLineItemId > }); > val counts = lineItems.map(li => (li, 1)).reduceByKey(_ + _); > counts.foreach { case (li, count) => println(li+": "+count) } > > After all of that input, I get the following: > > 13/09/02 16:55:25 INFO spark.SparkContext: Starting job: foreach at > <console>:27 > 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Registering RDD 4 > (reduceByKey at <console>:24) > 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Got job 0 (foreach at > <console>:27) with 2 output partitions (allowLocal=false) > 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Final stage: Stage 0 > (reduceByKey at <console>:24) > 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Parents of final stage: > List(Stage 1) > 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Missing parents: List(Stage > 1) > 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Submitting Stage 1 > (MapPartitionsRDD[4] at reduceByKey at <console>:24), which has no missing > parents > 13/09/02 16:55:25 INFO scheduler.DAGScheduler: Submitting 2 missing tasks > from Stage 1 (MapPartitionsRDD[4] at reduceByKey at <console>:24) > 13/09/02 16:55:25 INFO cluster.ClusterScheduler: Adding task set 1.0 with > 2 tasks > 13/09/02 16:55:25 INFO cluster.TaskSetManager: Starting task 1.0:0 as TID > 0 on executor 0: spark-02 (non-preferred, not one of dn-03, dn-02, dn-01) > 13/09/02 16:55:25 INFO cluster.TaskSetManager: Serialized task 1.0:0 as > 2052 bytes in 110 ms > 13/09/02 16:55:29 INFO cluster.TaskSetManager: Lost TID 0 (task 1.0:0) > 13/09/02 16:55:29 INFO cluster.TaskSetManager: Loss was due to > com.google.protobuf.InvalidProtocolBufferException > com.google.protobuf.InvalidProtocolBufferException: Protocol message > contained an invalid tag (zero). > at > com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:68) > at > com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108) > at > com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression$Builder.mergeFrom(PurchasedImpressionMessage.java:3639) > at > com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression$Builder.mergeFrom(PurchasedImpressionMessage.java:3371) > at > com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:300) > at > com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238) > at > com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:162) > at > com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:716) > at > com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:238) > at > com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:153) > at > com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:709) > at > com.mediacrossing.data.interfaces.PurchasedImpressionMessage$PurchasedImpression.parseFrom(PurchasedImpressionMessage.java:3305) > at > $line8.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:22) > at > $line8.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:22) > at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) > at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) > at scala.collection.Iterator$$anon$22.hasNext(Iterator.scala:458) > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > at scala.collection.Iterator$$anon$22.foreach(Iterator.scala:451) > at spark.Aggregator.combineValuesByKey(Aggregator.scala:20) > at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69) > at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69) > at spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:19) > at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) > at spark.RDD.iterator(RDD.scala:196) > at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:127) > at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:75) > at spark.executor.Executor$TaskRunner.run(Executor.scala:100) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > > Thanks, > > Gary > > > > > > > On Sun, Sep 1, 2013 at 1:52 PM, Gary Malouf <[email protected]> wrote: > >> We are using Spark 0.7.3 compiled (and running) against Hadood >> 2.0.0-mr1-cdh4.2.1. >> >> When I read a sequence file in, I have a series of key-value pairs >> (specifically, the keys are longs and the values are byte arrays). When I >> use the scala-based Scoobi library to parse each byte-array into a protobuf >> message, I have no issues. However, when I try to parse the values in >> these sequence files into the protobuf messages they were created as, I get >> the following: >> >> com.google.protobuf.InvalidProtocolBufferException: Protocol message >> contained an invalid tag (zero) >> >> Has anyone else experiened this before? Is there anything special that >> must be done when reading in the sequence files? >> >> > >
