Author: szita Date: Wed Aug 9 08:41:45 2017 New Revision: 1804497 URL: http://svn.apache.org/viewvc?rev=1804497&view=rev Log: PIG-5283: Configuration is not passed to SparkPigSplits on the backend (szita)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1804497&r1=1804496&r2=1804497&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Aug 9 08:41:45 2017 @@ -40,6 +40,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5283: Configuration is not passed to SparkPigSplits on the backend (szita) + PIG-5284: Fix flakyness introduced by PIG-3655 (szita) PIG-5278: Unit test failures because of PIG-5264 (nkollar via rohini) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java?rev=1804497&r1=1804496&r2=1804497&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java Wed Aug 9 08:41:45 2017 @@ -22,16 +22,21 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.util.List; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.pig.PigConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import com.google.common.collect.Lists; + /** * Wrapper class for PigSplits in Spark mode * @@ -124,11 +129,13 @@ public interface SparkPigSplit extends W @Override public void readFields(DataInput is) throws IOException { + this.getConf().readFields(is); pigSplit.readFields(is); } @Override public void write(DataOutput os) throws IOException { + SparkPigSplitsUtils.writeConfigForPigSplits(this.getConf(), os); pigSplit.write(os); } @@ -242,11 +249,13 @@ public interface SparkPigSplit extends W @Override public void readFields(DataInput is) throws IOException { + this.getConf().readFields(is); pigSplit.readFields(is); } @Override public void write(DataOutput os) throws IOException { + SparkPigSplitsUtils.writeConfigForPigSplits(this.getConf(), os); pigSplit.write(os); } @@ -301,4 +310,32 @@ public interface SparkPigSplit extends W } } + public static class SparkPigSplitsUtils { + + private static final List<String> PIGSPLIT_CONFIG_KEYS = Lists.newArrayList( + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + PigConfiguration.PIG_COMPRESS_INPUT_SPLITS + ); + + /** + * Writes a subset of the originalConf into the output stream os. Only keys in PIG_SPLIT_CONFIG_KEYS are + * considered due to optimization purposes. During deseralization on a Spark executor we need to take care of + * setting the configuration manually because Spark only sets an empty Configuration instance on the PigSplit. + * @param originalConf + * @param os + * @throws IOException + */ + public static void writeConfigForPigSplits(Configuration originalConf, DataOutput os) throws IOException { + Configuration conf = new Configuration(false); + for (String key : PIGSPLIT_CONFIG_KEYS) { + String value = originalConf.get(key); + if (value != null) { + conf.set(key, value); + } + } + conf.write(os); + } + + } + }