[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread rxin
Github user rxin closed the pull request at:

https://github.com/apache/spark/pull/15696


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15696#discussion_r85860719
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
 ---
@@ -218,22 +210,24 @@ object WriteOutput extends Logging {
 
 final def filePrefix(split: Int, uuid: String, bucketId: Option[Int]): 
String = {
   val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-  f"part-r-$split%05d-$uuid$bucketString"
+  f"part-$split%05d-$uuid$bucketString"
--- End diff --

Is this still used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15696#discussion_r85859487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.SparkHadoopWriter
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.sql.internal.SQLConf
+
+
+object FileCommitProtocol {
+  class TaskCommitMessage(obj: Any) extends Serializable
+
+  object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
+}
+
+
+/**
+ * An interface to define how a Spark job commits its outputs. 
Implementations must be serializable.
+ *
+ * The proper call sequence is:
+ *
+ * 1. Driver calls setupJob.
+ * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
+ *(or abortTask if task failed).
+ * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
+ *failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
+ */
+abstract class FileCommitProtocol {
+  import FileCommitProtocol._
+
+  /**
+   * Setups up a job. Must be called on the driver before any other 
methods can be invoked.
+   */
+  def setupJob(jobContext: JobContext): Unit
+
+  /**
+   * Commits a job after the writes succeed. Must be called on the driver.
+   */
+  def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit
+
+  /**
+   * Aborts a job after the writes fail. Must be called on the driver.
+   *
+   * Calling this function is a best-effort attempt, because it is 
possible that the driver
+   * just crashes (or killed) before it can call abort.
+   */
+  def abortJob(jobContext: JobContext): Unit
+
+  /**
+   * Sets up a task within a job.
+   * Must be called before any other task related methods can be invoked.
+   */
+  def setupTask(taskContext: TaskAttemptContext): Unit
+
+  /**
+   * Notifies the commit protocol to add a new file, and gets back the 
full path that should be
+   * used. Must be called on the executors when running tasks.
+   *
+   * A full file path consists of the following parts:
+   *  1. the base path
+   *  2. some sub-directory within the base path, used to specify 
partitioning
+   *  3. file prefix, usually some unique job id with the task id
+   *  4. bucket id
+   *  5. source specific file extension, e.g. ".snappy.parquet"
+   *
+   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 
and 5, and the rest
+   * are left to the commit protocol implementation to decide.
+   */
+  def addTaskTempFile(taskContext: TaskAttemptContext, dir: 
Option[String], ext: String): String
+
+  /**
+   * Commits a task after the writes succeed. Must be called on the 
executors when running tasks.
+   */
+  def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
+
+  /**
+   * Aborts a task after the writes have failed. Must be called on the 
executors when running tasks.
+   */
+  def abortTask(taskContext: TaskAttemptContext): Unit
--- End diff --

Is this also best-effort?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
--

[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15696#discussion_r85860514
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.SparkHadoopWriter
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.sql.internal.SQLConf
+
+
+object FileCommitProtocol {
+  class TaskCommitMessage(obj: Any) extends Serializable
+
+  object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
+}
+
+
+/**
+ * An interface to define how a Spark job commits its outputs. 
Implementations must be serializable.
+ *
+ * The proper call sequence is:
+ *
+ * 1. Driver calls setupJob.
+ * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
+ *(or abortTask if task failed).
+ * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
+ *failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
+ */
+abstract class FileCommitProtocol {
+  import FileCommitProtocol._
+
+  /**
+   * Setups up a job. Must be called on the driver before any other 
methods can be invoked.
+   */
+  def setupJob(jobContext: JobContext): Unit
+
+  /**
+   * Commits a job after the writes succeed. Must be called on the driver.
+   */
+  def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit
+
+  /**
+   * Aborts a job after the writes fail. Must be called on the driver.
+   *
+   * Calling this function is a best-effort attempt, because it is 
possible that the driver
+   * just crashes (or killed) before it can call abort.
+   */
+  def abortJob(jobContext: JobContext): Unit
+
+  /**
+   * Sets up a task within a job.
+   * Must be called before any other task related methods can be invoked.
+   */
+  def setupTask(taskContext: TaskAttemptContext): Unit
+
+  /**
+   * Notifies the commit protocol to add a new file, and gets back the 
full path that should be
+   * used. Must be called on the executors when running tasks.
+   *
+   * A full file path consists of the following parts:
+   *  1. the base path
+   *  2. some sub-directory within the base path, used to specify 
partitioning
+   *  3. file prefix, usually some unique job id with the task id
+   *  4. bucket id
+   *  5. source specific file extension, e.g. ".snappy.parquet"
+   *
+   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 
and 5, and the rest
+   * are left to the commit protocol implementation to decide.
+   */
+  def addTaskTempFile(taskContext: TaskAttemptContext, dir: 
Option[String], ext: String): String
+
+  /**
+   * Commits a task after the writes succeed. Must be called on the 
executors when running tasks.
+   */
+  def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
+
+  /**
+   * Aborts a task after the writes have failed. Must be called on the 
executors when running tasks.
+   */
+  def abortTask(taskContext: TaskAttemptContext): Unit
+}
+
+
+/**
+ * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop 
OutputCommitter
+ * (from the newer mapreduce API, not the old mapred API).
+ *
+ * Unlike Hadoop's OutputCommitter, this implementation is serializable.
+ */
+class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean)
--- End diff 

[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15696#discussion_r85859055
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.SparkHadoopWriter
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.sql.internal.SQLConf
+
+
+object FileCommitProtocol {
+  class TaskCommitMessage(obj: Any) extends Serializable
+
+  object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
+}
+
+
+/**
+ * An interface to define how a Spark job commits its outputs. 
Implementations must be serializable.
+ *
+ * The proper call sequence is:
+ *
+ * 1. Driver calls setupJob.
+ * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
+ *(or abortTask if task failed).
+ * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
+ *failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
+ */
+abstract class FileCommitProtocol {
+  import FileCommitProtocol._
+
+  /**
+   * Setups up a job. Must be called on the driver before any other 
methods can be invoked.
+   */
+  def setupJob(jobContext: JobContext): Unit
+
+  /**
+   * Commits a job after the writes succeed. Must be called on the driver.
+   */
+  def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit
+
+  /**
+   * Aborts a job after the writes fail. Must be called on the driver.
+   *
+   * Calling this function is a best-effort attempt, because it is 
possible that the driver
+   * just crashes (or killed) before it can call abort.
+   */
+  def abortJob(jobContext: JobContext): Unit
+
+  /**
+   * Sets up a task within a job.
+   * Must be called before any other task related methods can be invoked.
+   */
+  def setupTask(taskContext: TaskAttemptContext): Unit
+
+  /**
+   * Notifies the commit protocol to add a new file, and gets back the 
full path that should be
+   * used. Must be called on the executors when running tasks.
+   *
+   * A full file path consists of the following parts:
+   *  1. the base path
+   *  2. some sub-directory within the base path, used to specify 
partitioning
+   *  3. file prefix, usually some unique job id with the task id
+   *  4. bucket id
+   *  5. source specific file extension, e.g. ".snappy.parquet"
+   *
+   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 
and 5, and the rest
+   * are left to the commit protocol implementation to decide.
+   */
+  def addTaskTempFile(taskContext: TaskAttemptContext, dir: 
Option[String], ext: String): String
--- End diff --

s/add/new?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15696#discussion_r85859041
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.SparkHadoopWriter
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.sql.internal.SQLConf
+
+
+object FileCommitProtocol {
+  class TaskCommitMessage(obj: Any) extends Serializable
+
+  object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
+}
+
+
+/**
+ * An interface to define how a Spark job commits its outputs. 
Implementations must be serializable.
--- End diff --

add: since the same committer instance setup on the driver will be used for 
tasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/15696#discussion_r85859292
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.SparkHadoopWriter
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.sql.internal.SQLConf
+
+
+object FileCommitProtocol {
+  class TaskCommitMessage(obj: Any) extends Serializable
+
+  object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
+}
+
+
+/**
+ * An interface to define how a Spark job commits its outputs. 
Implementations must be serializable.
+ *
+ * The proper call sequence is:
+ *
+ * 1. Driver calls setupJob.
+ * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
+ *(or abortTask if task failed).
+ * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
+ *failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
+ */
+abstract class FileCommitProtocol {
+  import FileCommitProtocol._
+
+  /**
+   * Setups up a job. Must be called on the driver before any other 
methods can be invoked.
+   */
+  def setupJob(jobContext: JobContext): Unit
+
+  /**
+   * Commits a job after the writes succeed. Must be called on the driver.
+   */
+  def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit
+
+  /**
+   * Aborts a job after the writes fail. Must be called on the driver.
+   *
+   * Calling this function is a best-effort attempt, because it is 
possible that the driver
+   * just crashes (or killed) before it can call abort.
+   */
+  def abortJob(jobContext: JobContext): Unit
+
+  /**
+   * Sets up a task within a job.
+   * Must be called before any other task related methods can be invoked.
+   */
+  def setupTask(taskContext: TaskAttemptContext): Unit
+
+  /**
+   * Notifies the commit protocol to add a new file, and gets back the 
full path that should be
+   * used. Must be called on the executors when running tasks.
--- End diff --

add: Note that the returned temp file may have an arbitrary path. The 
commit protocol only promises that the file will be at the location specified 
by the arguments after job commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15696#discussion_r85848357
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
---
@@ -141,15 +139,14 @@ class SimpleTextOutputWriter(
   }
 }
 
-class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String)
-  extends TextOutputFormat[NullWritable, Text] {
+class AppendingTextOutputFormat(path: String) extends 
TextOutputFormat[NullWritable, Text] {
 
   val numberFormat = NumberFormat.getInstance()
   numberFormat.setMinimumIntegerDigits(5)
   numberFormat.setGroupingUsed(false)
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
-new Path(stagingDir, fileNamePrefix)
+new Path(path)
--- End diff --

(This is now specified explicitly in OutputWriterFactory.getFileExtension)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15696#discussion_r85848150
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
---
@@ -141,15 +139,14 @@ class SimpleTextOutputWriter(
   }
 }
 
-class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String)
-  extends TextOutputFormat[NullWritable, Text] {
+class AppendingTextOutputFormat(path: String) extends 
TextOutputFormat[NullWritable, Text] {
 
   val numberFormat = NumberFormat.getInstance()
   numberFormat.setMinimumIntegerDigits(5)
   numberFormat.setGroupingUsed(false)
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
-new Path(stagingDir, fileNamePrefix)
+new Path(path)
--- End diff --

Nope -- no more extension coming from Hadoop.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15696#discussion_r85847969
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
---
@@ -141,15 +139,14 @@ class SimpleTextOutputWriter(
   }
 }
 
-class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String)
-  extends TextOutputFormat[NullWritable, Text] {
+class AppendingTextOutputFormat(path: String) extends 
TextOutputFormat[NullWritable, Text] {
 
   val numberFormat = NumberFormat.getInstance()
   numberFormat.setMinimumIntegerDigits(5)
   numberFormat.setGroupingUsed(false)
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
-new Path(stagingDir, fileNamePrefix)
+new Path(path)
--- End diff --

`+ extension`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15696: [SPARK-18024][SQL] Introduce an internal commit p...

2016-10-31 Thread rxin
GitHub user rxin opened a pull request:

https://github.com/apache/spark/pull/15696

[SPARK-18024][SQL] Introduce an internal commit protocol API - WIP

## What changes were proposed in this pull request?
This is a work-in-progress pull request to introduce a commit protocol API 
that can be used by both streaming and batch to unify the write path.

## How was this patch tested?
TBD

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rxin/spark SPARK-18024

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15696.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15696


commit 72c4294bb401ff3795363d3c0bb436bb56844630
Author: Reynold Xin 
Date:   2016-10-31T17:56:49Z

WIP - commit API

commit 2a613516dd469bca5ed4d7b0f17f678e9e70e267
Author: Reynold Xin 
Date:   2016-10-31T17:57:18Z

Add commit protocol itself




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org