svn commit: r1816557 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Author: rohini Date: Tue Nov 28 15:53:22 2017 New Revision: 1816557 URL: http://svn.apache.org/viewvc?rev=1816557=rev Log: PIG-5314: Abort method is not implemented in PigProcessor (satishsaley via rohini) Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1816557=1816556=1816557=diff == --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue Nov 28 15:53:22 2017 @@ -60,6 +60,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5314: Abort method is not implemented in PigProcessor (satishsaley via rohini) + PIG-5307: NPE in TezOperDependencyParallelismEstimator (rohini) PIG-5272: BagToTuple output schema is incorrect (juen1jp via rohini) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1816557=1816556=1816557=diff == --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Tue Nov 28 15:53:22 2017 @@ -115,6 +115,7 @@ public class PigProcessor extends Abstra public static String sampleVertex; public static MapsampleMap; +private volatile boolean isAborted = false; public PigProcessor(ProcessorContext context) { super(context); @@ -305,9 +306,12 @@ public class PigProcessor extends Abstra } if (!fileOutputs.isEmpty()) { -while (!getContext().canCommit()) { +while (!getContext().canCommit() && !isAborted) { Thread.sleep(100); } +if (isAborted) { +return; +} for (MROutput fileOutput : fileOutputs){ fileOutput.flush(); if (fileOutput.isCommitRequired()) { @@ -464,4 +468,11 @@ public class PigProcessor extends Abstra } } +// TODO add @Override when we upgrade to Tez 0.9 dependency +public void abort() { +isAborted = true; +LOG.warn("Aborting execution"); +abortOutput(); +} + }
svn commit: r1816554 - /pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Author: szita Date: Tue Nov 28 15:25:15 2017 New Revision: 1816554 URL: http://svn.apache.org/viewvc?rev=1816554=rev Log: PIG-5316: Initialize mapred.task.id property for PoS jobs (fix) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1816554=1816553=1816554=diff == --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Tue Nov 28 15:25:15 2017 @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.PigWarning; @@ -75,6 +74,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.spark.converter.BroadcastConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter; @@ -183,7 +183,7 @@ public class SparkLauncher extends Launc jobGroupID = String.format("%s-%s",sparkContext.getConf().getAppId(), UUID.randomUUID().toString()); jobConf.set(MRConfiguration.JOB_ID,jobGroupID); -jobConf.set(MRConfiguration.TASK_ID, new TaskAttemptID().toString()); +jobConf.set(MRConfiguration.TASK_ID, HadoopShims.getNewTaskAttemptID().toString()); sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster", false);
svn commit: r1816542 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Author: szita Date: Tue Nov 28 13:14:51 2017 New Revision: 1816542 URL: http://svn.apache.org/viewvc?rev=1816542=rev Log: PIG-5316: Initialize mapred.task.id property for PoS jobs (nkollar via szita) Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1816542=1816541=1816542=diff == --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue Nov 28 13:14:51 2017 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-5316: Initialize mapred.task.id property for PoS jobs (nkollar via szita) + PIG-5302: Remove HttpClient dependency (nkollar via szita) PIG-5305: Enable yarn-client mode execution of tests in Spark (1) mode (szita) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1816542=1816541=1816542=diff == --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Tue Nov 28 13:14:51 2017 @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.PigWarning; @@ -182,6 +183,7 @@ public class SparkLauncher extends Launc jobGroupID = String.format("%s-%s",sparkContext.getConf().getAppId(), UUID.randomUUID().toString()); jobConf.set(MRConfiguration.JOB_ID,jobGroupID); +jobConf.set(MRConfiguration.TASK_ID, new TaskAttemptID().toString()); sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster", false);