[
https://issues.apache.org/jira/browse/PIG-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635847#comment-16635847
]
Satish Subhashrao Saley commented on PIG-5359:
----------------------------------------------
Updated patch to review board
> Reduce time spent in split serialization
> ----------------------------------------
>
> Key: PIG-5359
> URL: https://issues.apache.org/jira/browse/PIG-5359
> Project: Pig
> Issue Type: Improvement
> Reporter: Satish Subhashrao Saley
> Assignee: Satish Subhashrao Saley
> Priority: Major
>
> 1. Unnecessary serialization of splits in Tez.
> In LoaderProcessor, pig calls
>
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java#L172]
> {code:java}
> tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf,
> false, 0));
> {code}
> It ends up serializing the splits, just to print log.
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L317]
> {code:java}
> public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf,
> boolean groupSplits, boolean sortSplits, int targetTasks)
> throws IOException, ClassNotFoundException, InterruptedException {
> ....
> ....
> LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ",
> SerializedSize: "
> + splitInfoMem.getSplitsProto().getSerializedSize());
> return splitInfoMem;
> {code}
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L106]
> {code:java}
> public MRSplitsProto getSplitsProto() {
> if (isNewSplit) {
> try {
> return createSplitsProto(newFormatSplits, new
> SerializationFactory(conf));
> {code}
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L152-L170]
> {code:java}
> private static MRSplitsProto createSplitsProto(
> org.apache.hadoop.mapreduce.InputSplit[] newSplits,
> SerializationFactory serializationFactory) throws IOException,
> InterruptedException {
> MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
> for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
> splitsBuilder.addSplits(MRInputHelpers.createSplitProto(newSplit,
> serializationFactory));
> }
> return splitsBuilder.build();
> }
> {code}
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L221-L259]
> 2. In TezDagBuilder, if splitsSerializedSize > spillThreshold, then the
> InputSplits serialized in MRSplitsProto are not used by Pig and it serializes
> again directly to disk via JobSplitWriter.createSplitFiles. So the InputSplit
> serialization logic is called again which is wasteful and expensive in cases
> like HCat.
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L946-L947]
> {code:java}
> MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
> int splitsSerializedSize = splitsProto.getSerializedSize();
> {code}
> The getSplitsProto, creates MRSplitsProto which consists of list of
> MRSplitProto. MRSplitProto has serialized bytes of each InputFormat. If
> splitsSerializedSize > spillThreshold, pig writes the splits to disk via
> {code:java}
> if(splitsSerializedSize > spillThreshold) {
> inputPayLoad.setBoolean(
>
> org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
> false);
> // Write splits to disk
> Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
> log.info("Writing input splits to " + inputSplitsDir
> + " for vertex " + vertex.getName()
> + " as the serialized size in memory is "
> + splitsSerializedSize + ". Configured "
> + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
> + " is " + spillThreshold);
> inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
> (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf,
> fs);
> {code}
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L960]
>
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java#L302-L314]
> Solution:
> 1. Do not serialize the split in LoaderProcessor.java
> 2. In TezDagBuilder.java, serialize each input split and keep adding its
> size and if it exceeds spillThreshold, then write the splits to disk reusing
> the serialized buffers for each split.
>
> Thank you [~rohini] for identifying the issue.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)