[ https://issues.apache.org/jira/browse/PIG-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100087#comment-16100087 ]
Adam Szita commented on PIG-3655: --------------------------------- It seems like this is an effect of code in JoinGroupSparkConverter.java and PairRDDFunctions.scala. If the POPackage operator returns a [POStatus.STATUS_NULL|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java#L211], instead of skipping and awaiting the next tuple [as MR mode does|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java#L438] the method in Spark converter returns with null. This null will be picked up by the caller in Spark code and then [gets written into the (FS) outputstream|https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1113]. This is how we end up with NULLs in the files. What I can think of as a fix is to return with a special kind of Tuple which the record writer knows it has to skip; as it can be seen in [^PIG-3655.sparkNulls.2.patch]. Although this seems like a hack, the other option I can think of is to skip NULLs while reading in InterRecordReader - but then it's just handling the sympthoms and not the root cause. Any ideas [~rohini], [~kellyzly]? > BinStorage and InterStorage approach to record markers is broken > ---------------------------------------------------------------- > > Key: PIG-3655 > URL: https://issues.apache.org/jira/browse/PIG-3655 > Project: Pig > Issue Type: Bug > Affects Versions: 0.2.0, 0.3.0, 0.4.0, 0.5.0, 0.6.0, 0.7.0, 0.8.0, 0.8.1, > 0.9.0, 0.9.1, 0.9.2, 0.10.0, 0.11, 0.10.1, 0.12.0, 0.11.1 > Reporter: Jeff Plaisance > Assignee: Adam Szita > Fix For: 0.18.0 > > Attachments: PIG-3655.0.patch, PIG-3655.1.patch, PIG-3655.2.patch, > PIG-3655.3.patch, PIG-3655.4.patch, PIG-3655.5.patch, > PIG-3655.sparkNulls.2.patch, PIG-3655.sparkNulls.patch > > > The way that the record readers for these storage formats seek to the first > record in an input split is to find the byte sequence 1 2 3 110 for > BinStorage or 1 2 3 19-21|28-30|36-45 for InterStorage. If this sequence > occurs in the data for any reason (for example the integer 16909166 stored > big endian encodes to the byte sequence for BinStorage) other than to mark > the start of a tuple it can cause mysterious failures in pig jobs because the > record reader will try to decode garbage and fail. > For this approach of using an unlikely sequence to mark record boundaries, it > is important to reduce the probability of the sequence occuring naturally in > the data by ensuring that your record marker is sufficiently long. Hadoop > SequenceFile uses 128 bits for this and randomly generates the sequence for > each file (selecting a fixed, predetermined value opens up the possibility of > a mean person intentionally sending you that value). This makes it extremely > unlikely that collisions will occur. In the long run I think that pig should > also be doing this. > As a quick fix it might be good to save the current position in the file > before entering readDatum, and if an exception is thrown seek back to the > saved position and resume trying to find the next record marker. -- This message was sent by Atlassian JIRA (v6.4.14#64029)