svn commit: r1802676 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/impl/io/ test/org/apache/pig/data/ test/org/apache/pig/test/

2017-07-22 Thread szita
Author: szita
Date: Sat Jul 22 11:38:47 2017
New Revision: 1802676

URL: http://svn.apache.org/viewvc?rev=1802676=rev
Log:
PIG-3655: BinStorage and InterStorage approach to record markers is broken 
(szita)

Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java
pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java
pig/trunk/test/org/apache/pig/test/TestFRJoin2.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1802676=1802675=1802676=diff
==
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Jul 22 11:38:47 2017
@@ -38,6 +38,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3655: BinStorage and InterStorage approach to record markers is broken 
(szita)
+
 PIG-5274: TestEvalPipelineLocal#testSetLocationCalledInFE is failing in spark 
mode after PIG-5157 (nkollar via szita)
 
 PIG-4767: Partition filter not pushed down when filter clause references 
variable from another load path (knoguchi)

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1802676=1802675=1802676=diff
==
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Sat Jul 22 11:38:47 2017
@@ -40,6 +40,24 @@ public class PigConfiguration {
  */
 public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = 
"pig.auto.local.input.maxbytes";
 
+
+/**
+ * Sets the length of record markers in binary files produces by Pig 
between jobs
+ * The longer the byte sequence means less chance of collision with actual 
data,
+ * shorter sequence means less overhead
+ */
+public static final String PIG_INTERSTORAGE_SYNCMARKER_SIZE = 
"pig.interstorage.syncmarker.size";
+public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX = 16;
+public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT = 10;
+public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN = 2;
+
+/**
+ * Defines the interval (in bytes) when a sync marker should be written 
into the binary file
+ */
+public static final String PIG_INTERSTORAGE_SYNCMARKER_INTERVAL = 
"pig.interstorage.syncmarker.interval";
+public static final long PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT = 
2000;
+
+
 /**
  * Boolean value used to enable or disable fetching without a mapreduce 
job for DUMP. True by default
  */

Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1802676=1802675=1802676=diff
==
--- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Sat Jul 22 
11:38:47 2017
@@ -42,16 +42,23 @@ import org.apache.pig.data.Tuple;
 public class InterRecordReader extends RecordReader {
 
   private long start;
-  private long pos;
+  private long lastDataPos;
   private long end;
   private BufferedPositionedInputStream in;
   private Tuple value = null;
-  public static final int RECORD_1 = 0x01;
-  public static final int RECORD_2 = 0x02;
-  public static final int RECORD_3 = 0x03;
   private DataInputStream inData = null;
   private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
 
+  private byte[] syncMarker;
+  private long lastSyncPos = -1;
+  private long syncMarkerInterval;
+  private long dataBytesSeen = 0;
+
+  public InterRecordReader(int syncMarkerLength, long syncMarkerInterval) {
+  this.syncMarker = new byte[syncMarkerLength];
+  this.syncMarkerInterval = syncMarkerInterval;
+  }
+
   public void initialize(InputSplit genericSplit,
  TaskAttemptContext context) throws IOException {
 FileSplit split = (FileSplit) genericSplit;
@@ -60,63 +67,131 @@ public class InterRecordReader extends R
 end = start + split.getLength();
 final Path file = split.getPath();
 
-// open the file and seek to the start of the split
+// open the file
 FileSystem fs = file.getFileSystem(job);
 FSDataInputStream fileIn = fs.open(split.getPath());
-if (start != 0) {
-fileIn.seek(start);
+
+// read the magic byte sequence serving as record marker but only if the 
file is not empty
+if (!(start == 0 && end == 0)) {
+fileIn.readFully(0, syncMarker, 0, syncMarker.length);
 }
+
+//seek 

svn commit: r1802675 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/

2017-07-22 Thread szita
Author: szita
Date: Sat Jul 22 11:28:02 2017
New Revision: 1802675

URL: http://svn.apache.org/viewvc?rev=1802675=rev
Log:
PIG-5274: TestEvalPipelineLocal#testSetLocationCalledInFE is failing in spark 
mode after PIG-5157 (nkollar via szita)

Modified:
pig/trunk/CHANGES.txt

pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java

pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1802675=1802674=1802675=diff
==
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Jul 22 11:28:02 2017
@@ -38,6 +38,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5274: TestEvalPipelineLocal#testSetLocationCalledInFE is failing in spark 
mode after PIG-5157 (nkollar via szita)
+
 PIG-4767: Partition filter not pushed down when filter clause references 
variable from another load path (knoguchi)
 
 PIG-5270: Typo in Pig Logging (FromAlaska49 via daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1802675=1802674=1802675=diff
==
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Sat Jul 22 11:28:02 2017
@@ -175,7 +175,6 @@ public class SparkLauncher extends Launc
 SparkPigStats sparkStats = (SparkPigStats) pigContext
 .getExecutionEngine().instantiatePigStats();
 sparkStats.initialize(pigContext, sparkplan, jobConf);
-UDFContext.getUDFContext().addJobConf(jobConf);
 PigStats.start(sparkStats);
 
 startSparkIfNeeded(jobConf, pigContext);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java?rev=1802675=1802674=1802675=diff
==
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkShims.java
 Sat Jul 22 11:28:02 2017
@@ -60,7 +60,12 @@ public abstract class SparkShims impleme
 
 public static SparkShims getInstance() {
 if (sparkShims == null) {
-String sparkVersion = 
UDFContext.getUDFContext().getJobConf().get(SPARK_VERSION, "");
+String sparkVersion;
+if (UDFContext.getUDFContext().isFrontend()) {
+sparkVersion = SparkContext.getOrCreate().version();
+} else {
+sparkVersion = 
UDFContext.getUDFContext().getJobConf().get(SPARK_VERSION, "");
+}
 LOG.info("Initializing SparkShims for Spark version: " + 
sparkVersion);
 String sparkMajorVersion = getSparkMajorVersion(sparkVersion);
 try {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1802675=1802674=1802675=diff
==
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 Sat Jul 22 11:28:02 2017
@@ -232,8 +232,8 @@ public class SkewedJoinConverter impleme
 }
 
 return result;
-} catch (Exception e) {
-log.warn(e);
+} catch (ExecException e) {
+log.error(e);
 return null;
 }
 }




svn commit: r1802674 - /pig/trunk/CHANGES.txt

2017-07-22 Thread szita
Author: szita
Date: Sat Jul 22 11:18:59 2017
New Revision: 1802674

URL: http://svn.apache.org/viewvc?rev=1802674=rev
Log:
PIG-5157: Adding missing entry in CHANGES.txt

Modified:
pig/trunk/CHANGES.txt

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1802674=1802673=1802674=diff
==
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Jul 22 11:18:59 2017
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-5157: Upgrade to Spark 2.0 (nkollar via liyunzhang)
+
 PIG-5237: Fix DOT file parsing to enable DOT-based physical plan testing 
(YaShock via szita)
 
 PIG-5269: MapReduceLauncher and MRJobStats imports 
org.python.google.common.collect.Lists instead of 
org.google.common.collect.Lists (nkollar via szita)