spark git commit: [SPARK-18219] Move commit protocol API (internal) from sql/core to core module

2016-11-03 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 96cc1b567 -> 937af592e


[SPARK-18219] Move commit protocol API (internal) from sql/core to core module

## What changes were proposed in this pull request?
This patch moves the new commit protocol API from sql/core to core module, so 
we can use it in the future in the RDD API.

As part of this patch, I also moved the speficiation of the random uuid for the 
write path out of the commit protocol, and instead pass in a job id.

## How was this patch tested?
N/A

Author: Reynold Xin 

Closes #15731 from rxin/SPARK-18219.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/937af592
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/937af592
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/937af592

Branch: refs/heads/master
Commit: 937af592e65f4dd878aafcabf8fe2cfe7fa3d9b3
Parents: 96cc1b56
Author: Reynold Xin 
Authored: Thu Nov 3 02:42:48 2016 -0700
Committer: Reynold Xin 
Committed: Thu Nov 3 02:42:48 2016 -0700

--
 .../spark/internal/io/FileCommitProtocol.scala  | 126 +
 .../io/HadoopMapReduceCommitProtocol.scala  | 111 
 .../datasources/FileCommitProtocol.scala| 257 ---
 .../datasources/FileFormatWriter.scala  |   3 +-
 .../InsertIntoHadoopFsRelationCommand.scala |   6 +-
 .../SQLHadoopMapReduceCommitProtocol.scala  |  72 ++
 .../execution/streaming/FileStreamSink.scala|   9 +-
 .../streaming/ManifestFileCommitProtocol.scala  |   6 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   4 +-
 9 files changed, 327 insertions(+), 267 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/937af592/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala 
b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
new file mode 100644
index 000..fb80205
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.util.Utils
+
+
+/**
+ * An interface to define how a single Spark job commits its outputs. Two 
notes:
+ *
+ * 1. Implementations must be serializable, as the committer instance 
instantiated on the driver
+ *will be used for tasks on executors.
+ * 2. Implementations should have a constructor with either 2 or 3 arguments:
+ *(jobId: String, path: String) or (jobId: String, path: String, isAppend: 
Boolean).
+ * 3. A committer should not be reused across multiple Spark jobs.
+ *
+ * The proper call sequence is:
+ *
+ * 1. Driver calls setupJob.
+ * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
+ *(or abortTask if task failed).
+ * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
+ *failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
+ */
+abstract class FileCommitProtocol {
+  import FileCommitProtocol._
+
+  /**
+   * Setups up a job. Must be called on the driver before any other methods 
can be invoked.
+   */
+  def setupJob(jobContext: JobContext): Unit
+
+  /**
+   * Commits a job after the writes succeed. Must be called on the driver.
+   */
+  def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): 
Unit
+
+  /**
+   * Aborts a job after the writes fail. Must be called on the driver.
+   *
+   * Calling this function is a best-effort attempt, because it is possible 
that the driver
+   * just crashes (or killed) before it can call abort.
+   */
+  def abortJob(jobContext: JobContext): Unit
+
+  /**
+   * Sets up a task within a job.
+   * Must be called before any other task related methods can be invoked.
+   */
+  def setupTask(taskContext: TaskAttemptCon

spark git commit: [SPARK-18219] Move commit protocol API (internal) from sql/core to core module

2016-11-03 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 c4c5328f2 -> bc7f05f5f


[SPARK-18219] Move commit protocol API (internal) from sql/core to core module

## What changes were proposed in this pull request?
This patch moves the new commit protocol API from sql/core to core module, so 
we can use it in the future in the RDD API.

As part of this patch, I also moved the speficiation of the random uuid for the 
write path out of the commit protocol, and instead pass in a job id.

## How was this patch tested?
N/A

Author: Reynold Xin 

Closes #15731 from rxin/SPARK-18219.

(cherry picked from commit 937af592e65f4dd878aafcabf8fe2cfe7fa3d9b3)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc7f05f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc7f05f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc7f05f5

Branch: refs/heads/branch-2.1
Commit: bc7f05f5f03653c623190b8178bcbe981a41c2f3
Parents: c4c5328
Author: Reynold Xin 
Authored: Thu Nov 3 02:42:48 2016 -0700
Committer: Reynold Xin 
Committed: Thu Nov 3 02:43:03 2016 -0700

--
 .../spark/internal/io/FileCommitProtocol.scala  | 126 +
 .../io/HadoopMapReduceCommitProtocol.scala  | 111 
 .../datasources/FileCommitProtocol.scala| 257 ---
 .../datasources/FileFormatWriter.scala  |   3 +-
 .../InsertIntoHadoopFsRelationCommand.scala |   6 +-
 .../SQLHadoopMapReduceCommitProtocol.scala  |  72 ++
 .../execution/streaming/FileStreamSink.scala|   9 +-
 .../streaming/ManifestFileCommitProtocol.scala  |   6 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   4 +-
 9 files changed, 327 insertions(+), 267 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala 
b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
new file mode 100644
index 000..fb80205
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.util.Utils
+
+
+/**
+ * An interface to define how a single Spark job commits its outputs. Two 
notes:
+ *
+ * 1. Implementations must be serializable, as the committer instance 
instantiated on the driver
+ *will be used for tasks on executors.
+ * 2. Implementations should have a constructor with either 2 or 3 arguments:
+ *(jobId: String, path: String) or (jobId: String, path: String, isAppend: 
Boolean).
+ * 3. A committer should not be reused across multiple Spark jobs.
+ *
+ * The proper call sequence is:
+ *
+ * 1. Driver calls setupJob.
+ * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
+ *(or abortTask if task failed).
+ * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
+ *failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
+ */
+abstract class FileCommitProtocol {
+  import FileCommitProtocol._
+
+  /**
+   * Setups up a job. Must be called on the driver before any other methods 
can be invoked.
+   */
+  def setupJob(jobContext: JobContext): Unit
+
+  /**
+   * Commits a job after the writes succeed. Must be called on the driver.
+   */
+  def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): 
Unit
+
+  /**
+   * Aborts a job after the writes fail. Must be called on the driver.
+   *
+   * Calling this function is a best-effort attempt, because it is possible 
that the driver
+   * just crashes (or killed) before it can call abort.
+   */
+  def abortJob(jobContext: JobContext): Unit
+
+  /**
+   * Sets up a task within a job.
+   * Must be called