[ 
https://issues.apache.org/jira/browse/PIG-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100087#comment-16100087
 ] 

Adam Szita commented on PIG-3655:
---------------------------------

It seems like this is an effect of code in JoinGroupSparkConverter.java and 
PairRDDFunctions.scala.

If the POPackage operator returns a 
[POStatus.STATUS_NULL|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java#L211],
 instead of skipping and awaiting the next tuple [as MR mode 
does|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java#L438]
 the method in Spark converter returns with null. This null will be picked up 
by the caller in Spark code and then [gets written into the (FS) 
outputstream|https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1113].

This is how we end up with NULLs in the files.

What I can think of as a fix is to return with a special kind of Tuple which 
the record writer knows it has to skip; as it can be seen in 
[^PIG-3655.sparkNulls.2.patch]. Although this seems like a hack, the other 
option I can think of is to skip NULLs while reading in InterRecordReader - but 
then it's just handling the sympthoms and not the root cause.
Any ideas [~rohini], [~kellyzly]?

> BinStorage and InterStorage approach to record markers is broken
> ----------------------------------------------------------------
>
>                 Key: PIG-3655
>                 URL: https://issues.apache.org/jira/browse/PIG-3655
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.2.0, 0.3.0, 0.4.0, 0.5.0, 0.6.0, 0.7.0, 0.8.0, 0.8.1, 
> 0.9.0, 0.9.1, 0.9.2, 0.10.0, 0.11, 0.10.1, 0.12.0, 0.11.1
>            Reporter: Jeff Plaisance
>            Assignee: Adam Szita
>             Fix For: 0.18.0
>
>         Attachments: PIG-3655.0.patch, PIG-3655.1.patch, PIG-3655.2.patch, 
> PIG-3655.3.patch, PIG-3655.4.patch, PIG-3655.5.patch, 
> PIG-3655.sparkNulls.2.patch, PIG-3655.sparkNulls.patch
>
>
> The way that the record readers for these storage formats seek to the first 
> record in an input split is to find the byte sequence 1 2 3 110 for 
> BinStorage or 1 2 3 19-21|28-30|36-45 for InterStorage. If this sequence 
> occurs in the data for any reason (for example the integer 16909166 stored 
> big endian encodes to the byte sequence for BinStorage) other than to mark 
> the start of a tuple it can cause mysterious failures in pig jobs because the 
> record reader will try to decode garbage and fail.
> For this approach of using an unlikely sequence to mark record boundaries, it 
> is important to reduce the probability of the sequence occuring naturally in 
> the data by ensuring that your record marker is sufficiently long. Hadoop 
> SequenceFile uses 128 bits for this and randomly generates the sequence for 
> each file (selecting a fixed, predetermined value opens up the possibility of 
> a mean person intentionally sending you that value). This makes it extremely 
> unlikely that collisions will occur. In the long run I think that pig should 
> also be doing this.
> As a quick fix it might be good to save the current position in the file 
> before entering readDatum, and if an exception is thrown seek back to the 
> saved position and resume trying to find the next record marker.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to