svn commit: r1802676 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/impl/io/ test/org/apache/pig/data/ test/org/apache/pig/test/
Author: szita Date: Sat Jul 22 11:38:47 2017 New Revision: 1802676 URL: http://svn.apache.org/viewvc?rev=1802676=rev Log: PIG-3655: BinStorage and InterStorage approach to record markers is broken (szita) Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/PigConfiguration.java pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java pig/trunk/src/org/apache/pig/impl/io/InterStorage.java pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1802676=1802675=1802676=diff == --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Sat Jul 22 11:38:47 2017 @@ -38,6 +38,8 @@ OPTIMIZATIONS BUG FIXES +PIG-3655: BinStorage and InterStorage approach to record markers is broken (szita) + PIG-5274: TestEvalPipelineLocal#testSetLocationCalledInFE is failing in spark mode after PIG-5157 (nkollar via szita) PIG-4767: Partition filter not pushed down when filter clause references variable from another load path (knoguchi) Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1802676=1802675=1802676=diff == --- pig/trunk/src/org/apache/pig/PigConfiguration.java (original) +++ pig/trunk/src/org/apache/pig/PigConfiguration.java Sat Jul 22 11:38:47 2017 @@ -40,6 +40,24 @@ public class PigConfiguration { */ public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes"; + +/** + * Sets the length of record markers in binary files produces by Pig between jobs + * The longer the byte sequence means less chance of collision with actual data, + * shorter sequence means less overhead + */ +public static final String PIG_INTERSTORAGE_SYNCMARKER_SIZE = "pig.interstorage.syncmarker.size"; +public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX = 16; +public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT = 10; +public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN = 2; + +/** + * Defines the interval (in bytes) when a sync marker should be written into the binary file + */ +public static final String PIG_INTERSTORAGE_SYNCMARKER_INTERVAL = "pig.interstorage.syncmarker.interval"; +public static final long PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT = 2000; + + /** * Boolean value used to enable or disable fetching without a mapreduce job for DUMP. True by default */ Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1802676=1802675=1802676=diff == --- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original) +++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Sat Jul 22 11:38:47 2017 @@ -42,16 +42,23 @@ import org.apache.pig.data.Tuple; public class InterRecordReader extends RecordReader{ private long start; - private long pos; + private long lastDataPos; private long end; private BufferedPositionedInputStream in; private Tuple value = null; - public static final int RECORD_1 = 0x01; - public static final int RECORD_2 = 0x02; - public static final int RECORD_3 = 0x03; private DataInputStream inData = null; private static InterSedes sedes = InterSedesFactory.getInterSedesInstance(); + private byte[] syncMarker; + private long lastSyncPos = -1; + private long syncMarkerInterval; + private long dataBytesSeen = 0; + + public InterRecordReader(int syncMarkerLength, long syncMarkerInterval) { + this.syncMarker = new byte[syncMarkerLength]; + this.syncMarkerInterval = syncMarkerInterval; + } + public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; @@ -60,63 +67,131 @@ public class InterRecordReader extends R end = start + split.getLength(); final Path file = split.getPath(); -// open the file and seek to the start of the split +// open the file FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); -if (start != 0) { -fileIn.seek(start); + +// read the magic byte sequence serving as record marker but only if the file is not empty +if (!(start == 0 && end == 0)) { +fileIn.readFully(0, syncMarker, 0, syncMarker.length); } + +//seek
svn commit: r1802675 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/
Author: szita Date: Sat Jul 22 11:28:02 2017 New Revision: 1802675 URL: http://svn.apache.org/viewvc?rev=1802675=rev Log: PIG-5274: TestEvalPipelineLocal#testSetLocationCalledInFE is failing in spark mode after PIG-5157 (nkollar via szita) Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1802675=1802674=1802675=diff == --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Sat Jul 22 11:28:02 2017 @@ -38,6 +38,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5274: TestEvalPipelineLocal#testSetLocationCalledInFE is failing in spark mode after PIG-5157 (nkollar via szita) + PIG-4767: Partition filter not pushed down when filter clause references variable from another load path (knoguchi) PIG-5270: Typo in Pig Logging (FromAlaska49 via daijy) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1802675=1802674=1802675=diff == --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Sat Jul 22 11:28:02 2017 @@ -175,7 +175,6 @@ public class SparkLauncher extends Launc SparkPigStats sparkStats = (SparkPigStats) pigContext .getExecutionEngine().instantiatePigStats(); sparkStats.initialize(pigContext, sparkplan, jobConf); -UDFContext.getUDFContext().addJobConf(jobConf); PigStats.start(sparkStats); startSparkIfNeeded(jobConf, pigContext); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java?rev=1802675=1802674=1802675=diff == --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java Sat Jul 22 11:28:02 2017 @@ -60,7 +60,12 @@ public abstract class SparkShims impleme public static SparkShims getInstance() { if (sparkShims == null) { -String sparkVersion = UDFContext.getUDFContext().getJobConf().get(SPARK_VERSION, ""); +String sparkVersion; +if (UDFContext.getUDFContext().isFrontend()) { +sparkVersion = SparkContext.getOrCreate().version(); +} else { +sparkVersion = UDFContext.getUDFContext().getJobConf().get(SPARK_VERSION, ""); +} LOG.info("Initializing SparkShims for Spark version: " + sparkVersion); String sparkMajorVersion = getSparkMajorVersion(sparkVersion); try { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1802675=1802674=1802675=diff == --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Sat Jul 22 11:28:02 2017 @@ -232,8 +232,8 @@ public class SkewedJoinConverter impleme } return result; -} catch (Exception e) { -log.warn(e); +} catch (ExecException e) { +log.error(e); return null; } }
svn commit: r1802674 - /pig/trunk/CHANGES.txt
Author: szita Date: Sat Jul 22 11:18:59 2017 New Revision: 1802674 URL: http://svn.apache.org/viewvc?rev=1802674=rev Log: PIG-5157: Adding missing entry in CHANGES.txt Modified: pig/trunk/CHANGES.txt Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1802674=1802673=1802674=diff == --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Sat Jul 22 11:18:59 2017 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-5157: Upgrade to Spark 2.0 (nkollar via liyunzhang) + PIG-5237: Fix DOT file parsing to enable DOT-based physical plan testing (YaShock via szita) PIG-5269: MapReduceLauncher and MRJobStats imports org.python.google.common.collect.Lists instead of org.google.common.collect.Lists (nkollar via szita)