svn commit: r1816557 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java

2017-11-28 Thread rohini
Author: rohini
Date: Tue Nov 28 15:53:22 2017
New Revision: 1816557

URL: http://svn.apache.org/viewvc?rev=1816557=rev
Log:
PIG-5314: Abort method is not implemented in PigProcessor (satishsaley via 
rohini)

Modified:
pig/trunk/CHANGES.txt

pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1816557=1816556=1816557=diff
==
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Nov 28 15:53:22 2017
@@ -60,6 +60,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5314: Abort method is not implemented in PigProcessor (satishsaley via 
rohini)
+
 PIG-5307: NPE in TezOperDependencyParallelismEstimator (rohini)
 
 PIG-5272: BagToTuple output schema is incorrect (juen1jp via rohini)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1816557=1816556=1816557=diff
==
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 Tue Nov 28 15:53:22 2017
@@ -115,6 +115,7 @@ public class PigProcessor extends Abstra
 
 public static String sampleVertex;
 public static Map sampleMap;
+private volatile boolean isAborted = false;
 
 public PigProcessor(ProcessorContext context) {
 super(context);
@@ -305,9 +306,12 @@ public class PigProcessor extends Abstra
 }
 
 if (!fileOutputs.isEmpty()) {
-while (!getContext().canCommit()) {
+while (!getContext().canCommit() && !isAborted) {
 Thread.sleep(100);
 }
+if (isAborted) {
+return;
+}
 for (MROutput fileOutput : fileOutputs){
 fileOutput.flush();
 if (fileOutput.isCommitRequired()) {
@@ -464,4 +468,11 @@ public class PigProcessor extends Abstra
 }
 }
 
+// TODO add @Override when we upgrade to Tez 0.9 dependency
+public void abort() {
+isAborted = true;
+LOG.warn("Aborting execution");
+abortOutput();
+}
+
 }




svn commit: r1816554 - /pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

2017-11-28 Thread szita
Author: szita
Date: Tue Nov 28 15:25:15 2017
New Revision: 1816554

URL: http://svn.apache.org/viewvc?rev=1816554=rev
Log:
PIG-5316: Initialize mapred.task.id property for PoS jobs (fix)

Modified:

pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1816554=1816553=1816554=diff
==
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Tue Nov 28 15:25:15 2017
@@ -42,7 +42,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
@@ -75,6 +74,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.BroadcastConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
@@ -183,7 +183,7 @@ public class SparkLauncher extends Launc
 jobGroupID = String.format("%s-%s",sparkContext.getConf().getAppId(),
 UUID.randomUUID().toString());
 jobConf.set(MRConfiguration.JOB_ID,jobGroupID);
-jobConf.set(MRConfiguration.TASK_ID, new TaskAttemptID().toString());
+jobConf.set(MRConfiguration.TASK_ID, 
HadoopShims.getNewTaskAttemptID().toString());
 
 sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster",
 false);




svn commit: r1816542 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

2017-11-28 Thread szita
Author: szita
Date: Tue Nov 28 13:14:51 2017
New Revision: 1816542

URL: http://svn.apache.org/viewvc?rev=1816542=rev
Log:
PIG-5316: Initialize mapred.task.id property for PoS jobs (nkollar via szita)

Modified:
pig/trunk/CHANGES.txt

pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1816542=1816541=1816542=diff
==
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Nov 28 13:14:51 2017
@@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley
  
 IMPROVEMENTS
 
+PIG-5316: Initialize mapred.task.id property for PoS jobs (nkollar via szita)
+
 PIG-5302: Remove HttpClient dependency (nkollar via szita)
 
 PIG-5305: Enable yarn-client mode execution of tests in Spark (1) mode (szita)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1816542=1816541=1816542=diff
==
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Tue Nov 28 13:14:51 2017
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
@@ -182,6 +183,7 @@ public class SparkLauncher extends Launc
 jobGroupID = String.format("%s-%s",sparkContext.getConf().getAppId(),
 UUID.randomUUID().toString());
 jobConf.set(MRConfiguration.JOB_ID,jobGroupID);
+jobConf.set(MRConfiguration.TASK_ID, new TaskAttemptID().toString());
 
 sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster",
 false);