[GitHub] spark issue #19497: [SPARK-21549][CORE] Respect OutputFormats with no/invali...

2017-10-17 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19497
  
I guess one aspect of `saveAsNewAPIHadoopFile` is that it calls ` 
jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", path)`, and 
`Configuration.set(String key, String value)` has a check for null key or value.

If handling of paths is to be done in the committer, 
`saveAsNewAPIHadoopFile` should really be looking @ path and calling 
jobConfiguration.unset("mapreduce.output.fileoutputformat.outputdir) if 
path==null. 

Looking at how Hadoop's FileOutputFormat implementations work, they can 
handle a null/undefined output dir property, *but not an empty one*.

```java
public static Path getOutputPath(JobContext job) {
   String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);
return name == null ? null: new Path(name);
```  

Which implies that `saveAsNewHadoopFile("")` might want to unset the config 
option too, so offloading the problem of what happens on an empty path to the 
committer. Though I'd recommend checking to see what meaningful exceptions 
actually get raised in this situation when the committer is the normal 
FileOutputFormat/FileOutputCommitter setup


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL] data source v2 write path

2017-10-17 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r145096772
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.text.SimpleDateFormat
+import java.util.{Collections, Date, List => JList, Locale, Optional, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataSourceV2Reader, ReadTask}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A HDFS based transactional writable data source.
+ * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/jobId/` to `target`.
+ */
+class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+
+  private val schema = new StructType().add("i", "long").add("j", "long")
+
+  class Reader(path: String, conf: Configuration) extends 
DataSourceV2Reader {
+override def readSchema(): StructType = schema
+
+override def createReadTasks(): JList[ReadTask[Row]] = {
+  val dataPath = new Path(path)
+  val fs = dataPath.getFileSystem(conf)
+  if (fs.exists(dataPath)) {
+fs.listStatus(dataPath).filterNot { status =>
+  val name = status.getPath.getName
+  name.startsWith("_") || name.startsWith(".")
+}.map { f =>
+  val serializableConf = new SerializableConfiguration(conf)
+  new SimpleCSVReadTask(f.getPath.toUri.toString, 
serializableConf): ReadTask[Row]
+}.toList.asJava
+  } else {
+Collections.emptyList()
+  }
+}
+  }
+
+  class Writer(path: String, conf: Configuration) extends 
DataSourceV2Writer {
+// We can't get the real spark job id here, so we use a timestamp and 
random UUID to simulate
+// a unique job id.
+protected val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date()) +
+  "-" + UUID.randomUUID()
+
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  new SimpleCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {
+  val finalPath = new Path(path)
+  val jobPath = new Path(new Path(finalPath, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  try {
+for (file <- fs.listStatus(jobPath).map(_.getPath)) {
+  val dest = new Path(finalPath, file.getName)
+  if(!fs.rename(file, dest)) {
+throw new IOException(s"failed to rename($file, $dest)")
+  }
+}
+  } finally {
+fs.delete(jobPath, true)
+  }
+}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {
+  val jobPath = new Path(new Path(path, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  fs.delete(jobPath, true)
+}
+  }
+
+  class InternalRowWriter(path: String, conf: Configuration)
+extends Writer(path, conf) with SupportsWriteInte

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r144948292
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.text.SimpleDateFormat
+import java.util.{Collections, Date, List => JList, Locale, Optional, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataSourceV2Reader, ReadTask}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A HDFS based transactional writable data source.
+ * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/jobId/` to `target`.
+ */
+class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+
+  private val schema = new StructType().add("i", "long").add("j", "long")
+
+  class Reader(path: String, conf: Configuration) extends 
DataSourceV2Reader {
+override def readSchema(): StructType = schema
+
+override def createReadTasks(): JList[ReadTask[Row]] = {
+  val dataPath = new Path(path)
+  val fs = dataPath.getFileSystem(conf)
+  if (fs.exists(dataPath)) {
+
fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f =>
+  val serializableConf = new SerializableConfiguration(conf)
+  new SimpleCSVReadTask(f.getPath.toUri.toString, 
serializableConf): ReadTask[Row]
+}.toList.asJava
+  } else {
+Collections.emptyList()
+  }
+}
+  }
+
+  class Writer(path: String, conf: Configuration) extends 
DataSourceV2Writer {
+// We can't get the real spark job id here, so we use a timestamp and 
random UUID to simulate
+// a unique job id.
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date()) +
+  "-" + UUID.randomUUID()
+
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  new SimpleCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {
+  val finalPath = new Path(path)
+  val jobPath = new Path(new Path(finalPath, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  try {
+for (file <- fs.listStatus(jobPath).map(_.getPath)) {
+  fs.rename(file, new Path(finalPath, file.getName))
+}
+  } finally {
+fs.delete(jobPath, true)
+  }
+}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {
+  val jobPath = new Path(new Path(path, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  fs.delete(jobPath, true)
+}
+  }
+
+  class InternalRowWriter(path: String, conf: Configuration)
+extends DataSourceV2Writer with SupportsWriteInternalRow {
+
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date())
+
+override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+  new InternalRowCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))

[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-10-16 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
thanks for the review everyone!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r144823664
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.text.SimpleDateFormat
+import java.util.{Collections, Date, List => JList, Locale, Optional, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataSourceV2Reader, ReadTask}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A HDFS based transactional writable data source.
+ * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/jobId/` to `target`.
+ */
+class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+
+  private val schema = new StructType().add("i", "long").add("j", "long")
+
+  class Reader(path: String, conf: Configuration) extends 
DataSourceV2Reader {
+override def readSchema(): StructType = schema
+
+override def createReadTasks(): JList[ReadTask[Row]] = {
+  val dataPath = new Path(path)
+  val fs = dataPath.getFileSystem(conf)
+  if (fs.exists(dataPath)) {
+
fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f =>
+  val serializableConf = new SerializableConfiguration(conf)
+  new SimpleCSVReadTask(f.getPath.toUri.toString, 
serializableConf): ReadTask[Row]
+}.toList.asJava
+  } else {
+Collections.emptyList()
+  }
+}
+  }
+
+  class Writer(path: String, conf: Configuration) extends 
DataSourceV2Writer {
+// We can't get the real spark job id here, so we use a timestamp and 
random UUID to simulate
+// a unique job id.
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date()) +
+  "-" + UUID.randomUUID()
+
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  new SimpleCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {
+  val finalPath = new Path(path)
+  val jobPath = new Path(new Path(finalPath, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  try {
+for (file <- fs.listStatus(jobPath).map(_.getPath)) {
+  fs.rename(file, new Path(finalPath, file.getName))
+}
+  } finally {
+fs.delete(jobPath, true)
+  }
+}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {
+  val jobPath = new Path(new Path(path, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  fs.delete(jobPath, true)
+}
+  }
+
+  class InternalRowWriter(path: String, conf: Configuration)
+extends DataSourceV2Writer with SupportsWriteInternalRow {
+
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date())
+
+override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+  new InternalRowCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r144823298
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.text.SimpleDateFormat
+import java.util.{Collections, Date, List => JList, Locale, Optional, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataSourceV2Reader, ReadTask}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A HDFS based transactional writable data source.
+ * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/jobId/` to `target`.
+ */
+class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+
+  private val schema = new StructType().add("i", "long").add("j", "long")
+
+  class Reader(path: String, conf: Configuration) extends 
DataSourceV2Reader {
+override def readSchema(): StructType = schema
+
+override def createReadTasks(): JList[ReadTask[Row]] = {
+  val dataPath = new Path(path)
+  val fs = dataPath.getFileSystem(conf)
+  if (fs.exists(dataPath)) {
+
fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f =>
+  val serializableConf = new SerializableConfiguration(conf)
+  new SimpleCSVReadTask(f.getPath.toUri.toString, 
serializableConf): ReadTask[Row]
+}.toList.asJava
+  } else {
+Collections.emptyList()
+  }
+}
+  }
+
+  class Writer(path: String, conf: Configuration) extends 
DataSourceV2Writer {
+// We can't get the real spark job id here, so we use a timestamp and 
random UUID to simulate
+// a unique job id.
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date()) +
+  "-" + UUID.randomUUID()
+
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  new SimpleCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {
+  val finalPath = new Path(path)
+  val jobPath = new Path(new Path(finalPath, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  try {
+for (file <- fs.listStatus(jobPath).map(_.getPath)) {
+  fs.rename(file, new Path(finalPath, file.getName))
+}
+  } finally {
+fs.delete(jobPath, true)
+  }
+}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {
+  val jobPath = new Path(new Path(path, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  fs.delete(jobPath, true)
+}
+  }
+
+  class InternalRowWriter(path: String, conf: Configuration)
+extends DataSourceV2Writer with SupportsWriteInternalRow {
+
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date())
+
+override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+  new InternalRowCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r144822800
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.text.SimpleDateFormat
+import java.util.{Collections, Date, List => JList, Locale, Optional, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataSourceV2Reader, ReadTask}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A HDFS based transactional writable data source.
+ * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/jobId/` to `target`.
+ */
+class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+
+  private val schema = new StructType().add("i", "long").add("j", "long")
+
+  class Reader(path: String, conf: Configuration) extends 
DataSourceV2Reader {
+override def readSchema(): StructType = schema
+
+override def createReadTasks(): JList[ReadTask[Row]] = {
+  val dataPath = new Path(path)
+  val fs = dataPath.getFileSystem(conf)
+  if (fs.exists(dataPath)) {
+
fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f =>
+  val serializableConf = new SerializableConfiguration(conf)
+  new SimpleCSVReadTask(f.getPath.toUri.toString, 
serializableConf): ReadTask[Row]
+}.toList.asJava
+  } else {
+Collections.emptyList()
+  }
+}
+  }
+
+  class Writer(path: String, conf: Configuration) extends 
DataSourceV2Writer {
+// We can't get the real spark job id here, so we use a timestamp and 
random UUID to simulate
+// a unique job id.
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date()) +
+  "-" + UUID.randomUUID()
+
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  new SimpleCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {
+  val finalPath = new Path(path)
+  val jobPath = new Path(new Path(finalPath, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  try {
+for (file <- fs.listStatus(jobPath).map(_.getPath)) {
+  fs.rename(file, new Path(finalPath, file.getName))
--- End diff --

Treat rename returning 0 as a failure. It's an ugly mess as, say HDFS 
returns 0 for "missing source file", other things do it for minor no ops 
(src==dest). Because of HDFS, I'd have something like

```scala
val dest = new Path(finalPath, file.getName
if(!rename(file, dest)) {
  throw new IOException(s"failed to rename($file, $dest)")
}
```

Then helps us write the spec and tests for HADOOP-11452 and make public a 
rename which throws exceptions on failures


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r144822829
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.text.SimpleDateFormat
+import java.util.{Collections, Date, List => JList, Locale, Optional, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataSourceV2Reader, ReadTask}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A HDFS based transactional writable data source.
+ * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/jobId/` to `target`.
+ */
+class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+
+  private val schema = new StructType().add("i", "long").add("j", "long")
+
+  class Reader(path: String, conf: Configuration) extends 
DataSourceV2Reader {
+override def readSchema(): StructType = schema
+
+override def createReadTasks(): JList[ReadTask[Row]] = {
+  val dataPath = new Path(path)
+  val fs = dataPath.getFileSystem(conf)
+  if (fs.exists(dataPath)) {
+
fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f =>
+  val serializableConf = new SerializableConfiguration(conf)
+  new SimpleCSVReadTask(f.getPath.toUri.toString, 
serializableConf): ReadTask[Row]
+}.toList.asJava
+  } else {
+Collections.emptyList()
+  }
+}
+  }
+
+  class Writer(path: String, conf: Configuration) extends 
DataSourceV2Writer {
+// We can't get the real spark job id here, so we use a timestamp and 
random UUID to simulate
+// a unique job id.
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date()) +
+  "-" + UUID.randomUUID()
+
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  new SimpleCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {
+  val finalPath = new Path(path)
+  val jobPath = new Path(new Path(finalPath, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  try {
+for (file <- fs.listStatus(jobPath).map(_.getPath)) {
+  fs.rename(file, new Path(finalPath, file.getName))
+}
+  } finally {
+fs.delete(jobPath, true)
+  }
+}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {
+  val jobPath = new Path(new Path(path, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  fs.delete(jobPath, true)
+}
+  }
+
+  class InternalRowWriter(path: String, conf: Configuration)
+extends DataSourceV2Writer with SupportsWriteInternalRow {
+
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date())
+
+override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+  new InternalRowCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r144821527
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.text.SimpleDateFormat
+import java.util.{Collections, Date, List => JList, Locale, Optional, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataSourceV2Reader, ReadTask}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A HDFS based transactional writable data source.
+ * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/jobId/` to `target`.
+ */
+class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+
+  private val schema = new StructType().add("i", "long").add("j", "long")
+
+  class Reader(path: String, conf: Configuration) extends 
DataSourceV2Reader {
+override def readSchema(): StructType = schema
+
+override def createReadTasks(): JList[ReadTask[Row]] = {
+  val dataPath = new Path(path)
+  val fs = dataPath.getFileSystem(conf)
+  if (fs.exists(dataPath)) {
+
fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f =>
--- End diff --

Consider also filtering startsWith(.), in case that gets picked up on 
localFS.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r144821389
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.text.SimpleDateFormat
+import java.util.{Collections, Date, List => JList, Locale, Optional, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataSourceV2Reader, ReadTask}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A HDFS based transactional writable data source.
+ * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/jobId/` to `target`.
+ */
+class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+
+  private val schema = new StructType().add("i", "long").add("j", "long")
+
+  class Reader(path: String, conf: Configuration) extends 
DataSourceV2Reader {
+override def readSchema(): StructType = schema
+
+override def createReadTasks(): JList[ReadTask[Row]] = {
+  val dataPath = new Path(path)
+  val fs = dataPath.getFileSystem(conf)
+  if (fs.exists(dataPath)) {
+
fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f =>
+  val serializableConf = new SerializableConfiguration(conf)
+  new SimpleCSVReadTask(f.getPath.toUri.toString, 
serializableConf): ReadTask[Row]
+}.toList.asJava
+  } else {
+Collections.emptyList()
+  }
+}
+  }
+
+  class Writer(path: String, conf: Configuration) extends 
DataSourceV2Writer {
+// We can't get the real spark job id here, so we use a timestamp and 
random UUID to simulate
+// a unique job id.
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date()) +
+  "-" + UUID.randomUUID()
+
+override def createWriterFactory(): DataWriterFactory[Row] = {
+  new SimpleCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))
+}
+
+override def commit(messages: Array[WriterCommitMessage]): Unit = {
+  val finalPath = new Path(path)
+  val jobPath = new Path(new Path(finalPath, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  try {
+for (file <- fs.listStatus(jobPath).map(_.getPath)) {
+  fs.rename(file, new Path(finalPath, file.getName))
+}
+  } finally {
+fs.delete(jobPath, true)
+  }
+}
+
+override def abort(messages: Array[WriterCommitMessage]): Unit = {
+  val jobPath = new Path(new Path(path, "_temporary"), jobId)
+  val fs = jobPath.getFileSystem(conf)
+  fs.delete(jobPath, true)
+}
+  }
+
+  class InternalRowWriter(path: String, conf: Configuration)
+extends DataSourceV2Writer with SupportsWriteInternalRow {
+
+private val jobId = new SimpleDateFormat("MMddHHmmss", 
Locale.US).format(new Date())
+
+override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+  new InternalRowCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))

[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19487
  
The more I see of the committer internals, the less confident I am about 
understanding any of it.
If your committer isn't writing stuff out, it doesn't need to have any 
value of mapred.output.dir at all, does it? If it does use it, it'll handle an 
invalid entry in setupJob/setupTask by throwing an exception there. So the goal 
of the stuff above it should be to make sure it gets to deal with validating 
its inputs.

Hadoop trunk adds a new 
[PathOutputCommitter](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java)
 class for committers: it's the useful getters of `FileOutputCommitter` pulled 
up so allowing other committers to provide things like spark the info they need 
without looking into properties like mapred.output.dir. Have a look at that 
class and if there is something extra you want pulled up, let me know before 
Hadoop 3.0 ships & I'll see what I can do





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
done. Not writing 0-byte files will offer significant speedup against 
object stores, where the cost of a call to getFileStatus() can take hundreds of 
millis. I look forward to it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19448
  
> But, if I were working on a Spark distribution at a vendor, this is 
something I would definitely include because it's such a useful feature.

I concur :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19487
  
"" can come in via configuration files; I'd treat that the same as null. 
Things which aren't valid URIs though, that's something you want to bounce


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19487
  
Looking a bit more at this. I see it handles """ as well as empty, and also 
other forms of invalid URI which Path can't handle today ("multiple colons 
except with file:// on windows, etc).  And as long as you don't call 
`absPathStagingDir` you don't get an exception there.

do you actually want the code to downgrade if an invalid URI is passed in, 
that is: only skip if the path is empty/null, but not for other things like a 
path of "::" ? As there you may want to reject it. In which case you'd 
change the `hasValidPath` query to just looking at the string, and reinstate 
the test for the path with an invalid non-empty URI

 BTW, "test:" is possibly a valid path for "the home directory in the 
schema 'test'.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19487: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-10-13 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19487#discussion_r144545827
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -60,15 +71,6 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
*/
   private def absPathStagingDir: Path = new Path(path, "_temporary-" + 
jobId)
--- End diff --

what if path is null here when passed in? I'd actually expect an 
IllegalArgumentException being raised, at least from looking at 
Path.checkParthArg()


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
The latest PR update pulls in @dongjoon-hyun's new test; to avoid merge 
conflict in the Insert suite I've rebased against master.

1.  Everything handles missing files on output
2. There's only one logInfo at the end of the execute call, so if many 
empty files are created, the logs aren't too noisy.
3. There is now some implicit counting of how many files were missing `= 
submittedFiles - numFiles`; this isn't aggregated and reported though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19448
  
PS, for people who are interested in dynamic committers, 
[MAPREDUCE-6823](https://issues.apache.org/jira/browse/MAPREDUCE-6823) is 
something to look at. It allows you to switch committers under pretty much 
everything *other than parquet*...this patch helps make Parquet manageable too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19448
  
Thanks for reviewing this/getting it in. Personally, I had it in the 
"improvement" category rather than bug fix. If it wasn't for that line in the 
docs, there'd be no ambiguity about improve/vs fix, and there is always a 
lower-risk way to fix doc/code mismatch: change the docs.

But I'm grateful for it being in; with the backport to branch-2 ryan should 
be able to use it semi-immediately


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
Noted :)
@dongjoon-hyun : is the issue with ORC that if there's nothing to write, it 
doesn't generate a file (so avoiding that issue with sometimes you get 0-byte 
ORC files & things downstream fail)?

If so, the warning message which @gatorsmile has proposed is potentially 
going to mislead people into worrying about a problem which isn't there. and 
the numFiles metric is going to mislead.

I'm starting to worry about how noisy the log would be, both there and when 
working with s3 when it's playing delayed visibility (rarer).

1. What if this patch just logged at debug: less noise, but still something 
there if people are trying to debug a mismatch?
1. if there's no file found, numFiles doesn't get incremented. 
1. I count the number of files actually submitted
1. And in `getFinalStats()` log @ info if there is a mismatch

This would line things up in future for actually returning the list of 
expected vs actual files up as a metric where it could be reported.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...

2017-10-13 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18979#discussion_r144505454
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
 ---
@@ -57,7 +60,14 @@ class BasicWriteTaskStatsTracker(hadoopConf: 
Configuration)
   private def getFileSize(filePath: String): Long = {
 val path = new Path(filePath)
 val fs = path.getFileSystem(hadoopConf)
-fs.getFileStatus(path).getLen()
+try {
+  fs.getFileStatus(path).getLen()
+} catch {
+  case e: FileNotFoundException =>
+// may arise against eventually consistent object stores
+logInfo(s"File $path is not yet visible", e)
--- End diff --

say "Reported file size in job summary may be invalid"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...

2017-10-13 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19487
  
LGTM. I'm going stick out today a slight roll of my PathOutputCommitter 
class which is one layer above FileOutputCommitter : lets people write 
committers without output & work paths, yet avoid going near complexity that is 
FileOutputFormat. See 
[PathOutputCommitter](https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/com/hortonworks/spark/cloud/commit/PathOutputCommitProtocol.scala);
 if I get my changes into Hadoop 3 then this committer will work for all 
versions of Hadoop 3.x, even though the S3A stuff is targeting Hadoop 3.1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-12 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144381367
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,10 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
--- End diff --

yes. there is that. Options: do something complicated with a static field 
to only print ones. Log at debug so people only see the message if they are 
trying to track things down.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-12 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144375059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,11 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
+  || classOf[ParquetOutputCommitter].isAssignableFrom(committerClass),
+  s"Committer $committerClass is not a ParquetOutputCommitter and 
cannot create job summaries."
++ " Set Parquet option " + ParquetOutputFormat.ENABLE_JOB_SUMMARY + " 
to false.")
--- End diff --

I'd thought about that; didn't look any better or worse. Will change it for 
log message.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-12 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144239543
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,10 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
--- End diff --

There's another option which is "log @ warn and continue". If someone has 
changed the committer, they get the consequences. That could also permit 
someone with a modified committer to generate schema summaries if they 
chose/permitted.

IT'd simplify this patch, need the tests tweaked...I'd change the SQLConf 
text with the committer key to say "if the committer isn't a 
ParquetOutputCommitter then don't expect summaries"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-12 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144238941
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.FileNotFoundException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.parquet.hadoop.{ParquetOutputCommitter, 
ParquetOutputFormat}
+
+import org.apache.spark.{LocalSparkContext, SparkFunSuite}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test logic related to choice of output commtters
+ */
+class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils
+  with LocalSparkContext {
+
+  private val PARQUET_COMMITTER = 
classOf[ParquetOutputCommitter].getCanonicalName
+
+  protected var spark: SparkSession = _
+
+  /**
+   * Create a new [[SparkSession]] running in local-cluster mode with 
unsafe and codegen enabled.
+   */
+  override def beforeAll(): Unit = {
+super.beforeAll()
+spark = SparkSession.builder()
+  .master("local-cluster[2,1,1024]")
+  .appName("testing")
+  .getOrCreate()
+  }
+
+  override def afterAll(): Unit = {
+if (spark != null) {
+  spark.stop()
+  spark = null
+}
+super.afterAll()
--- End diff --

good point


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-11 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144065810
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,13 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
+  && 
!classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
+  // output summary is requested, but the class is not a Parquet 
Committer
+  throw new RuntimeException(s"Committer $committerClass is not a 
ParquetOutputCommitter" +
+s" and cannot create job summaries.")
--- End diff --

aah. in the move to require() everything is going back onto a single line. 
so now moot


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-11 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144065074
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,13 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
+  && 
!classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
+  // output summary is requested, but the class is not a Parquet 
Committer
+  throw new RuntimeException(s"Committer $committerClass is not a 
ParquetOutputCommitter" +
+s" and cannot create job summaries.")
--- End diff --

aah



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-11 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144065041
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.FileNotFoundException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.parquet.hadoop.{ParquetOutputCommitter, 
ParquetOutputFormat}
+
+import org.apache.spark.{LocalSparkContext, SparkFunSuite}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test logic related to choice of output commtters
+ */
+class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils
+  with LocalSparkContext {
+
+  private val PARQUET_COMMITTER = 
classOf[ParquetOutputCommitter].getCanonicalName
+
+  protected var spark: SparkSession = _
+
+  /**
+   * Create a new [[SparkSession]] running in local-cluster mode with 
unsafe and codegen enabled.
+   */
+  override def beforeAll(): Unit = {
+super.beforeAll()
+spark = SparkSession.builder()
+  .master("local-cluster[2,1,1024]")
+  .appName("testing")
+  .getOrCreate()
+  }
+
+  override def afterAll(): Unit = {
+spark.stop()
+spark = null
--- End diff --

done, + will add a check for spark==null so if a failure happens during 
setup, the exception doesn't get lost in teardown


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-10-11 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
@viirya : the new data writer API will allow for a broader set of stats to 
be propagated back from workers. When you are working with the object stores, 
an useful stat to get back is throttle count & retry count as they can be the 
cause of why things are slow ... and if it is due to throttling, throwing more 
workers at the job will actually slow things down. They'd be the ones to look 
at first


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-11 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r143992362
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.FileNotFoundException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.parquet.hadoop.{ParquetOutputCommitter, 
ParquetOutputFormat}
+
+import org.apache.spark.{LocalSparkContext, SparkFunSuite}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test logic related to choice of output commtters
+ */
+class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils
+  with LocalSparkContext {
+
+  private val PARQUET_COMMITTER = 
classOf[ParquetOutputCommitter].getCanonicalName
+
+  protected var spark: SparkSession = _
+
+  /**
+   * Create a new [[SparkSession]] running in local-cluster mode with 
unsafe and codegen enabled.
+   */
+  override def beforeAll(): Unit = {
+super.beforeAll()
+spark = SparkSession.builder()
+  .master("local-cluster[2,1,1024]")
+  .appName("testing")
+  .getOrCreate()
+  }
+
+  override def afterAll(): Unit = {
+spark.stop()
+spark = null
+  }
+
+  test("alternative output committer, merge schema") {
+intercept[RuntimeException] {
+  val stat = writeDataFrame(MarkingFileOutput.COMMITTER, true, true)
+  logError(s"Created marker file $stat")
+}
+  }
+
+  test("alternative output committer, no merge schema") {
+writeDataFrame(MarkingFileOutput.COMMITTER, false, true)
--- End diff --

OK


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-11 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r143992319
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,13 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
+  && 
!classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
+  // output summary is requested, but the class is not a Parquet 
Committer
+  throw new RuntimeException(s"Committer $committerClass is not a 
ParquetOutputCommitter" +
+s" and cannot create job summaries.")
--- End diff --

Depends on the policy about "what to do if it's not a parquet committer 
*and* the option for  job summaries is set. It could just mean "you don't get 
summaries", which worksforme :). May want to log at info though?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-11 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r143992018
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,13 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
+  && 
!classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
+  // output summary is requested, but the class is not a Parquet 
Committer
+  throw new RuntimeException(s"Committer $committerClass is not a 
ParquetOutputCommitter" +
--- End diff --

will do


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-10-10 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
Has anyone had a look at this recently? 

The problem still exists, and while downstream filesystems can address if 
they recognise the use case & lie about values, they will be returning invalid 
values to the caller: spark will be reporting the wrong values. At least with 
this PR Spark will get to make the decisions about how to react itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-09 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19269
  
+1 for the ability to return statistics: the remote stores have lots of 
information which committers may return


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-09 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r143530841
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.sources.v2.writer.*;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaSimpleWritableDataSource implements DataSourceV2, 
ReadSupport, WriteSupport {
+  private StructType schema = new StructType().add("i", "long").add("j", 
"long");
+
+  class Reader implements DataSourceV2Reader {
+private String path;
+
+Reader(String path) {
+  this.path = path;
+}
+
+@Override
+public StructType readSchema() {
+  return schema;
+}
+
+@Override
+public List> createReadTasks() {
+  return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path));
+}
+  }
+
+  static class JavaSimpleCSVReadTask implements ReadTask, 
DataReader {
+private String path;
+private volatile Iterator lines;
+private volatile String currentLine;
+
+JavaSimpleCSVReadTask(Iterator lines) {
+  this.lines = lines;
+}
+
+JavaSimpleCSVReadTask(String path) {
+  this.path = path;
+}
+
+@Override
+public DataReader createReader() {
+  assert path != null;
+  try {
+if (Files.exists(Paths.get(path))) {
+  return new 
JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator());
+} else {
+  return new JavaSimpleCSVReadTask(Collections.emptyIterator());
+}
+  } catch (IOException e) {
+throw new RuntimeException(e);
--- End diff --

I see. I'd just declare `testXYZ throws Throwable` and not worry about what 
gets raised internally: saves on try/catch translation logic


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...

2017-10-06 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19448
  
+ @rdblue 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-06 Thread steveloughran
GitHub user steveloughran opened a pull request:

https://github.com/apache/spark/pull/19448

[SPARK-22217] [SQL] ParquetFileFormat to support arbitrary OutputCommitters

## What changes were proposed in this pull request?

`ParquetFileFormat` to relax its requirement of output committer class from 
`org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and 
implicitly Hadoop `FileOutputCommitter` to any committer implementing 
`org.apache.hadoop.mapreduce.OutputCommitter`

This enables output committers which don't write to the filesystem the way 
`FileOutputCommitter` does to save parquet data from a dataframe: at present 
you cannot do this.

Because a committer which isn't a subclass of `ParquetOutputCommitter`, it 
checks to see if the context has requested summary metadata by setting 
`parquet.enable.summary-metadata`. If true, and the committer class isn't a 
parquet committer, it raises a RuntimeException with an error message.

(It could downgrade, of course, but raising an exception makes it clear 
there won't be an summary. It also makes the behaviour testable.)

## How was this patch tested?

The patch includes a test suite, `ParquetCommitterSuite`, with a new 
committer, `MarkingFileOutputCommitter` which extends `FileOutputCommitter` and 
writes a marker file in the destination directory. The presence of the marker 
file can be used to verify the new committer was used. The tests then try the 
combinations of Parquet committer summary/no-summary and marking committer 
summary/no-summary. 

| committer | summary | outcome |
|---|-|-|
| parquet   | true| success |
| parquet   | false   | success |
| marking   | false   | success with marker |
| marking   | true| exception |

All tests are happy.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark 
cloud/SPARK-22217-committer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19448.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19448


commit e6fdbdcf4118283abd22f7b14586ed742d225657
Author: Steve Loughran 
Date:   2017-07-12T10:42:51Z

SPARK-22217 tuning ParquetOutputCommitter to support any committer class, 
provided saveSummaries is disabled. With Tests

Change-Id: I19872dc1c095068ed5a61985d53cb7258bd9a9bb




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-10-03 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19294
  
@szhem that null path support in `FileOutputCommitter` came with the App 
Master recovery work of 
[MAPREDUCE-3711](https://issues.apache.org/jira/browse/MAPREDUCE-3711); its, 
trying to minimise the amount of HDFS IO done during the recovery process.

I don't think that's a codepath spark goes near; in the normal execution 
paths, `FileOutputFormat` & `FileOutputCommitter` will need output paths.

(disclaimer: the more I read of that code, the less I understand it. do not 
treat my opinions as normative in any way)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19368: [SPARK-22146] FileNotFoundException while reading ORC fi...

2017-10-02 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19368
  
Looking @ this, things would be a lot less brittle if there wasn't a round 
trip Path -> String -> Path. I'm thinking of Windows paths here in particular. 
Other than tests, which pass in a string (possibly from File.getAbsolutePath); 
everything appears be downconverting the path and then up again.

Is there any fundamental reason why Path isn't being passed around? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19269
  
One other thing that would be good now and invaluable in future is for the 
`DataWriter.commit()` call to return a `Map[String,Long]` of statistics 
alongside the message sent to the committer. The spec should say these 
statistics "MUST be specific to the writer/its thread", so that aggregating the 
stats across all tasks produces valid output.

What does this to? Lets the writers provide real statistics about the cost 
of operations. If you look at the changes to {{FileFormatWriter}} to collect 
stats, it's just listing the written file after close() and returning file size 
as its sole metric. We are moving to instrumenting more of the Hadoop output 
streams with statistics collection, and, once there's an API for getting at the 
values, would allow the driver to aggregate stats from the writer for the 
writes and the commits. Examples: bytes written, files written,  records 
written, # of 503/throttle events sent back from S3, # of network failures and 
retried operations, ...etc. Once the writers start collecting this data, 
there's motivation for the layers underneath to collect more and publish what 
they get. As an example, here's [the data collected by 
`S3AOutputStream`](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentat
 ion.java#L765), exposed via `OutputStream.toString()` as well as fed to hadoop 
metrics. As well as bytes written, it tracks blocks PUT, retries on completion 
operations, and how many times a block PUT failed and had to be retried. That 
means that a job can have results like "it took X seconds and wrote Y bytes but 
it had to repost Z bytes of data, which made things slow"

There's a side issue: what is the proposed mandated re-entrancy policy in 
the writers?

Is expectation that `DataWriter.write()` will always be called by a single 
thread, and therefore no need to implement thread safe writes to the output 
stream, or is there a risk that >1 thread may write sequentially (preventing 
thread local storage for collecting stats) or even simultaneously. (if so, the 
example is in trouble as the java.io APIs say no need to support re-entrancy, 
even if HDFS does. Again, this is the kind of thing where some specification 
can highlight the policy, otherwise people will code against the 
implementation, which is precisely why HDFS DFSOutputStreams are stuck doing 
thread-safety writes (HBase, see).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r142005126
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: 
LogicalPlan)
+  extends RunnableCommand {
+  override def children: Seq[LogicalPlan] = query :: Nil
+
+  override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): 
Seq[Row] = {
+assert(children.length == 1)
+
+val writeTask = writer match {
+  case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
+  case _ => new 
RowToInternalRowDataWriteFactory(writer.createWriterFactory(), query.schema)
+}
+
+val rdd = children.head.execute()
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+logInfo(s"Start processing data source writer: $writer")
+
+try {
+  sparkSession.sparkContext.runJob(
+rdd,
+(context: TaskContext, iter: Iterator[InternalRow]) =>
+  DataWritingSparkTask.run(writeTask, context, iter),
+rdd.partitions.indices,
+(index, message: WriterCommitMessage) => messages(index) = message
+  )
+
+  writer.commit(messages)
+  logInfo(s"Data source writer $writer committed.")
+} catch {
+  case cause: Throwable =>
+writer.abort()
+logError(s"Data source writer $writer aborted.")
+throw new SparkException("Writing job aborted.", cause)
+
+}
+
+Nil
+  }
+}
+
+object DataWritingSparkTask extends Logging {
+  def run(
+  writeTask: DataWriteFactory[InternalRow],
+  context: TaskContext,
+  iter: Iterator[InternalRow]): WriterCommitMessage = {
+val dataWriter =
+  writeTask.createWriter(context.stageId(), context.partitionId(), 
context.attemptNumber())
+
+// write the data and commit this writer.
+Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+  iter.foreach(dataWriter.write)
+  dataWriter.commit()
--- End diff --

good to log something here, at least @ debug


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r142005072
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: 
LogicalPlan)
+  extends RunnableCommand {
+  override def children: Seq[LogicalPlan] = query :: Nil
+
+  override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): 
Seq[Row] = {
+assert(children.length == 1)
+
+val writeTask = writer match {
+  case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
+  case _ => new 
RowToInternalRowDataWriteFactory(writer.createWriterFactory(), query.schema)
+}
+
+val rdd = children.head.execute()
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+logInfo(s"Start processing data source writer: $writer")
--- End diff --

maybe add #of partitions in the log, helps provide a hint of how long it's 
going to take. If a job hangs, this'll be the last entry in the log, so it's 
good to be informative


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r142004971
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.sources.v2.writer.*;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaSimpleWritableDataSource implements DataSourceV2, 
ReadSupport, WriteSupport {
+  private StructType schema = new StructType().add("i", "long").add("j", 
"long");
+
+  class Reader implements DataSourceV2Reader {
+private String path;
+
+Reader(String path) {
+  this.path = path;
+}
+
+@Override
+public StructType readSchema() {
+  return schema;
+}
+
+@Override
+public List> createReadTasks() {
+  return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path));
+}
+  }
+
+  static class JavaSimpleCSVReadTask implements ReadTask, 
DataReader {
+private String path;
+private volatile Iterator lines;
+private volatile String currentLine;
+
+JavaSimpleCSVReadTask(Iterator lines) {
+  this.lines = lines;
+}
+
+JavaSimpleCSVReadTask(String path) {
+  this.path = path;
+}
+
+@Override
+public DataReader createReader() {
+  assert path != null;
+  try {
+if (Files.exists(Paths.get(path))) {
+  return new 
JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator());
+} else {
+  return new JavaSimpleCSVReadTask(Collections.emptyIterator());
+}
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+}
+
+@Override
+public boolean next() {
+  if (lines.hasNext()) {
+currentLine = lines.next();
+return true;
+  } else {
+return false;
+  }
+}
+
+@Override
+public Row get() {
+  String[] values = currentLine.split(",");
+  assert values.length == 2;
+  long l1 = Long.valueOf(values[0]);
+  long l2 = Long.valueOf(values[1]);
+  return new GenericRow(new Object[] {l1, l2});
+}
+
+@Override
+public void close() throws IOException {
+
+}
+  }
+
+  @Override
+  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+return new Reader(options.get("path").get());
+  }
+
+
+  class Writer implements DataSourceV2Writer {
+private String path;
+
+Writer(String path) {
+  this.path = path;
+}
+
+@Override
+public DataWriteFacto

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r142004889
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.sources.v2.writer.*;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaSimpleWritableDataSource implements DataSourceV2, 
ReadSupport, WriteSupport {
+  private StructType schema = new StructType().add("i", "long").add("j", 
"long");
+
+  class Reader implements DataSourceV2Reader {
+private String path;
+
+Reader(String path) {
+  this.path = path;
+}
+
+@Override
+public StructType readSchema() {
+  return schema;
+}
+
+@Override
+public List> createReadTasks() {
+  return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path));
+}
+  }
+
+  static class JavaSimpleCSVReadTask implements ReadTask, 
DataReader {
+private String path;
+private volatile Iterator lines;
+private volatile String currentLine;
+
+JavaSimpleCSVReadTask(Iterator lines) {
+  this.lines = lines;
+}
+
+JavaSimpleCSVReadTask(String path) {
+  this.path = path;
+}
+
+@Override
+public DataReader createReader() {
+  assert path != null;
+  try {
+if (Files.exists(Paths.get(path))) {
+  return new 
JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator());
+} else {
+  return new JavaSimpleCSVReadTask(Collections.emptyIterator());
+}
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+}
+
+@Override
+public boolean next() {
+  if (lines.hasNext()) {
+currentLine = lines.next();
+return true;
+  } else {
+return false;
+  }
+}
+
+@Override
+public Row get() {
+  String[] values = currentLine.split(",");
+  assert values.length == 2;
+  long l1 = Long.valueOf(values[0]);
+  long l2 = Long.valueOf(values[1]);
+  return new GenericRow(new Object[] {l1, l2});
+}
+
+@Override
+public void close() throws IOException {
+
+}
+  }
+
+  @Override
+  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+return new Reader(options.get("path").get());
+  }
+
+
+  class Writer implements DataSourceV2Writer {
+private String path;
+
+Writer(String path) {
+  this.path = path;
+}
+
+@Override
+public DataWriteFacto

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r142004831
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.sources.v2.writer.*;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaSimpleWritableDataSource implements DataSourceV2, 
ReadSupport, WriteSupport {
+  private StructType schema = new StructType().add("i", "long").add("j", 
"long");
+
+  class Reader implements DataSourceV2Reader {
+private String path;
+
+Reader(String path) {
+  this.path = path;
+}
+
+@Override
+public StructType readSchema() {
+  return schema;
+}
+
+@Override
+public List> createReadTasks() {
+  return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path));
+}
+  }
+
+  static class JavaSimpleCSVReadTask implements ReadTask, 
DataReader {
+private String path;
+private volatile Iterator lines;
+private volatile String currentLine;
+
+JavaSimpleCSVReadTask(Iterator lines) {
+  this.lines = lines;
+}
+
+JavaSimpleCSVReadTask(String path) {
+  this.path = path;
+}
+
+@Override
+public DataReader createReader() {
+  assert path != null;
+  try {
+if (Files.exists(Paths.get(path))) {
+  return new 
JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator());
+} else {
+  return new JavaSimpleCSVReadTask(Collections.emptyIterator());
+}
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+}
+
+@Override
+public boolean next() {
+  if (lines.hasNext()) {
+currentLine = lines.next();
+return true;
+  } else {
+return false;
+  }
+}
+
+@Override
+public Row get() {
+  String[] values = currentLine.split(",");
+  assert values.length == 2;
+  long l1 = Long.valueOf(values[0]);
+  long l2 = Long.valueOf(values[1]);
+  return new GenericRow(new Object[] {l1, l2});
+}
+
+@Override
+public void close() throws IOException {
+
+}
+  }
+
+  @Override
+  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+return new Reader(options.get("path").get());
+  }
+
+
+  class Writer implements DataSourceV2Writer {
+private String path;
+
+Writer(String path) {
+  this.path = path;
+}
+
+@Override
+public DataWriteFacto

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r142004814
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.sources.v2.writer.*;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaSimpleWritableDataSource implements DataSourceV2, 
ReadSupport, WriteSupport {
+  private StructType schema = new StructType().add("i", "long").add("j", 
"long");
+
+  class Reader implements DataSourceV2Reader {
+private String path;
+
+Reader(String path) {
+  this.path = path;
+}
+
+@Override
+public StructType readSchema() {
+  return schema;
+}
+
+@Override
+public List> createReadTasks() {
+  return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path));
+}
+  }
+
+  static class JavaSimpleCSVReadTask implements ReadTask, 
DataReader {
+private String path;
+private volatile Iterator lines;
+private volatile String currentLine;
+
+JavaSimpleCSVReadTask(Iterator lines) {
+  this.lines = lines;
+}
+
+JavaSimpleCSVReadTask(String path) {
+  this.path = path;
+}
+
+@Override
+public DataReader createReader() {
+  assert path != null;
+  try {
+if (Files.exists(Paths.get(path))) {
+  return new 
JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator());
+} else {
+  return new JavaSimpleCSVReadTask(Collections.emptyIterator());
+}
+  } catch (IOException e) {
+throw new RuntimeException(e);
--- End diff --

Why the translation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r142004778
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: 
LogicalPlan)
+  extends RunnableCommand {
+  override def children: Seq[LogicalPlan] = query :: Nil
+
+  override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): 
Seq[Row] = {
+assert(children.length == 1)
+
+val writeTask = writer match {
+  case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
+  case _ => new 
RowToInternalRowDataWriteFactory(writer.createWriterFactory(), query.schema)
+}
+
+val rdd = children.head.execute()
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+logInfo(s"Start processing data source writer: $writer")
+
+try {
+  sparkSession.sparkContext.runJob(
+rdd,
+(context: TaskContext, iter: Iterator[InternalRow]) =>
+  DataWritingSparkTask.run(writeTask, context, iter),
+rdd.partitions.indices,
+(index, message: WriterCommitMessage) => messages(index) = message
+  )
+
+  writer.commit(messages)
+  logInfo(s"Data source writer $writer committed.")
+} catch {
+  case cause: Throwable =>
+writer.abort()
--- End diff --

this may raise an exception too...better to use 
`Utils.tryWithSafeFinallyAndFailureCallbacks()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19269
  

People may know that I'm busy with some S3 committers which work with 
Hadoop MapReduce & Spark, with an import of Ryan's commtter into the Hadoop 
codebase. Thisa includes changes to s3a to support that and alternative design 
which relies on a consistent s3, a spark committer (not in the spark codebase 
itself) to handle it there, plust the tests & documentation of how things 
actually commit work. For that, the conclusion I've reached is: nobody really 
knows what's going on, and its a miracle things work at all.

FWIW, I think I now have what is the [closest thing to 
documentation](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md)
 of what goes on at the Hadoop FS Committer layer, based on code, tests, 
stepping through operations in an IDE, and other archaeology.

Based on that experience, I think a key deliverable ought to be some 
specification of what a committer does. I know there are bits in the javadocs 
like "Implementations should handle this case correctly", but without a 
definition of "correct" its hard to point at an implementation and say "you've 
got it wrong".

I would propose supplementing the code with 

1. A rigorous specification, including possible workflows. scala & RFC2119 
keywords, maybe.
1. Tests against that, including attempts to simulate the failure modes, 
all the orders of execution which aren't expected to be valid, etc.

Some general questions related to this:

* If a writer doesn't support speculation, can it say so? I know 
speculation and failure recovery are related, but task retry after failure is 
linearized, whereas speculation can happen in parallel. 
* Is it possible to instantiate a second writer and say "abort the output 
of the other writer" (on a different host?). This allows for cleanup of a 
task's work after the failure of the entire executor. If it's not possible, 
then the job commit must be required to clean up. Maybe: pass to the job commit 
information about failing tasks, so it has more of an idea what to do. (Hadoop 
MapReduce example:  AM calls abortTask() for all failing containers before 
instantiating a new one and retrying)
* MAY the intermediate output of a task be observable to others?
* MAY the committed output of a task observable to others? If so, what does 
this mean for readers? Is it something which a write may wish to declare/warn 
callers?
* What if `DataWriter.commit()` just doesn't return/the executor fails 
during that commit process? Is that a failure of the entire job vs task? (FWIW 
MR algorithm 1 handles this, algorithm 2 doesn't).
* What if `writer.abort()` raises an exception ? (not unusual if cause of 
commit failure is network/auth problem)
* What if `writer.abort()` is called before any other operation on that 
writer? Better be a no-op.
* What if `DataSourceV2Writer.commit()` fails? Can it be retried? (Easiest 
to say no, obviously). 
* If, after a `DataSourceV2Writer.commit()` fails, can 
`DataSourceV2Writer.abort()` be called?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-30 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r142004644
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a write task which will be serialized and sent to executors. 
For each partition of the
+   * input data(RDD), there will be one write task to write the records.
+   */
+  WriteTask createWriteTask();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * all data writers for this writing job and are produced by {@link 
DataWriter#commit()}. This
+   * also means all the data are written successfully and all data writers 
are committed.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed to write 
the records and aborted.
+   */
+  void abort();
--- End diff --

Knowing what's being committed can provide a bit more information as to 
what is going on, and could be appreciated for that. The biggest issue I fear 
is loss of the driver itself, so *no* abort() call is made. A strategy for 
cleaning up from that would be good, even though its primarily one of bringing 
up a new writer and saying "cleanup this mess a previous instance left". 
Looking the S3 example, my strategy there is/would be: identify all pending 
commits, abort them, then clean up the dest dir. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2017-09-28 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/17745


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-24 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r140658582
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -130,17 +135,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, 
path: String)
 val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, 
String]])
   .foldLeft(Map[String, String]())(_ ++ _)
 logDebug(s"Committing files staged for absolute locations 
$filesToMove")
-val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
-for ((src, dst) <- filesToMove) {
-  fs.rename(new Path(src), new Path(dst))
+if (hasAbsPathFiles) {
+  val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+  for ((src, dst) <- filesToMove) {
+fs.rename(new Path(src), new Path(dst))
+  }
+  fs.delete(absPathStagingDir, true)
 }
-fs.delete(absPathStagingDir, true)
--- End diff --

can do, now you've got a little mock committer in someone can just extend 
it to optionally throw an IOE in abort(). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17743: [SPARK-20448][DOCS] Document how FileInputDStream works ...

2017-09-22 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17743
  
People don't realise how much object stores aren't file systems until they 
discover all their assumptions are broken.

Once you know how they work, you can set up a workflow which is more 
efficient and reliable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-09-22 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/19294
  
As I play with commit logic all the way through the stack, I can' t help 
thinking everyone's lives would be better if we tagged the MRv1 commit APIs as 
deprecated in Hadoop 3. and uses of the commit protocols went fully onto the v2 
committers: one codepath to get confused by, half as much complexity. 

The issue with the custom stuff is inevitably Hive related, isn't it? It's 
always liked to scatter data around a filesystem and pretend its a single 
dataset


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r140188088
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -130,17 +135,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, 
path: String)
 val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, 
String]])
   .foldLeft(Map[String, String]())(_ ++ _)
 logDebug(s"Committing files staged for absolute locations 
$filesToMove")
-val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
-for ((src, dst) <- filesToMove) {
-  fs.rename(new Path(src), new Path(dst))
+if (hasAbsPathFiles) {
+  val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+  for ((src, dst) <- filesToMove) {
+fs.rename(new Path(src), new Path(dst))
+  }
+  fs.delete(absPathStagingDir, true)
 }
-fs.delete(absPathStagingDir, true)
--- End diff --

Given the changes being made here, it seems a good place to add the 
suggestion of [SPARK-20045](https://issues.apache.org/jira/browse/SPARK-20045) 
& make that abort() call resilient to failures, by doing that delete even if 
the hadoop committer raised an IOE


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17745: [SPARK-17159][Streaming] optimise check for new files in...

2017-09-21 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17745
  
Due to lack of support/interest, moved to 
https://github.com/hortonworks-spark/cloud-integration


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...

2017-09-21 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/17747


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r140008216
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -568,6 +568,51 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(FakeWriterWithCallback.exception.getMessage contains "failed to 
write")
   }
 
+  test("saveAsNewAPIHadoopDataset should use current working directory " +
+"for files to be committed to an absolute output location when empty 
output path specified") {
+val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
+
+val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
+job.setOutputKeyClass(classOf[Integer])
+job.setOutputValueClass(classOf[Integer])
+job.setOutputFormatClass(classOf[NewFakeFormat])
+val jobConfiguration = job.getConfiguration
+
+val fs = FileSystem.get(jobConfiguration)
+fs.setWorkingDirectory(new 
Path(getClass.getResource(".").toExternalForm))
+try {
+  // just test that the job does not fail with
+  // java.lang.IllegalArgumentException: Can not create a Path from a 
null string
+  pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
+} finally {
+  // close to prevent filesystem caching across different tests
+  fs.close()
+}
+  }
+
+  test("saveAsHadoopDataset should use current working directory " +
+"for files to be committed to an absolute output location when empty 
output path specified") {
+val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
+
+val conf = new JobConf()
+conf.setOutputKeyClass(classOf[Integer])
+conf.setOutputValueClass(classOf[Integer])
+conf.setOutputFormat(classOf[FakeOutputFormat])
+conf.setOutputCommitter(classOf[FakeOutputCommitter])
+
+val fs = FileSystem.get(conf)
+fs.setWorkingDirectory(new 
Path(getClass.getResource(".").toExternalForm))
+try {
+  FakeOutputCommitter.ran = false
+  pairs.saveAsHadoopDataset(conf)
+} finally {
+  // close to prevent filesystem caching across different tests
+  fs.close()
--- End diff --

again, you don't need this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r140008084
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -568,6 +568,51 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(FakeWriterWithCallback.exception.getMessage contains "failed to 
write")
   }
 
+  test("saveAsNewAPIHadoopDataset should use current working directory " +
+"for files to be committed to an absolute output location when empty 
output path specified") {
+val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
+
+val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
+job.setOutputKeyClass(classOf[Integer])
+job.setOutputValueClass(classOf[Integer])
+job.setOutputFormatClass(classOf[NewFakeFormat])
+val jobConfiguration = job.getConfiguration
+
+val fs = FileSystem.get(jobConfiguration)
+fs.setWorkingDirectory(new 
Path(getClass.getResource(".").toExternalForm))
+try {
+  // just test that the job does not fail with
+  // java.lang.IllegalArgumentException: Can not create a Path from a 
null string
+  pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
+} finally {
+  // close to prevent filesystem caching across different tests
+  fs.close()
--- End diff --

avoid. Either use FileSystem.newInstance() or skip the close. Given you 
aren't playing with low-level FS options, its faster and more efficient to reuse


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to han...

2017-08-30 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18111
  
thx


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to han...

2017-08-29 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18111
  
I believe this patch implements the original design goal: if a committer 
doesn't have a working path supplied by `getWorkingPath()` then it downgrades. 

It might be worthwhile doing a larger fault-injecting committer at some 
point.  have indirectly the ability to inject faults, but its a long way from 
the spark codebase, when really a thin shim is all that's needed. In 
particular, if a fault injecting ParquetOutputCommitter would test a big chunk 
of the codebase whose error handling codepaths are more likely to be tested in 
production than by jenkins. Are people interested (separate JIRA, obviously)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-08-22 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
Related to this, updated spec on [Hadoop output stream, Syncable and 
StreamCapabilities](https://github.com/steveloughran/hadoop/blob/s3/HADOOP-13327-outputstream-trunk/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md).

As the doc notes, object stores != filesystems, and while a lot can be done 
to preserve the metaphor on input, its on output where CRUD inconsistencies 
surface. along with the logic as "does a 0-byte file get created in create()", 
"when is data written?", etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...

2017-08-22 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18979#discussion_r134460176
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
 ---
@@ -57,7 +60,14 @@ class BasicWriteTaskStatsTracker(hadoopConf: 
Configuration)
   private def getFileSize(filePath: String): Long = {
 val path = new Path(filePath)
 val fs = path.getFileSystem(hadoopConf)
-fs.getFileStatus(path).getLen()
+try {
+  fs.getFileStatus(path).getLen()
+} catch {
+  case e: FileNotFoundException =>
+// may arise against eventually consistent object stores
+logInfo(s"File $path is not yet visible", e)
+0
--- End diff --

+I could a comment in the docs somewhere to state that metrics in the cloud 
may not be consistent. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17342: [SPARK-12868][SQL] Allow adding jars from hdfs

2017-08-21 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17342
  
@Chopinxb no worries; the hard part is thinking how to fix this. I don't 
see it being possible to do reliably except through an explicit download. 
Hadoop 2.8+ has moved off commons-logging so this problem *may* have gone away. 
However, there are too many dependencies to be confident that will hold


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-08-18 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
@adrian-ionescu wrote
> is there a need for calling getFinalStats() more than once?

No. As long as everyone is aware of it, it won't be an issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-08-18 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
> To mimic S3-like behavior, you can overwrite the file system 
spark.hadoop.fs.$scheme.impl"

@gatorsmile: you will be able to do something better soon, as S3A is adding 
an inconsistent AWS client into `hadoop-aws` JAR, which you can then enable to 
guarantee consistency delays and inject intermittent faults into the system 
(throttling, transient network events). All it will take is a config option to 
switch to this client, plus the chaos-monkey-esque probabilities and delays. 
This is what I'm already using —you will be able to as well. That is, no need 
to switch clients, just 
go`spark.hadoop.fs.s3a.s3.client.factory.impl=org.apache.hadoop.fs.s3a.InconsistentS3ClientFactory`
 and wait for the stack traces.

The S3A FS itself [needs to do 
more](https://issues.apache.org/jira/browse/HADOOP-14531) to handle throttling 
& failures (retry, add failure metrics so throttling & error rates can be 
measured).  Knowing throttling rates is important as it will help identify perf 
problems due to bad distribution of work across a bucket, excess use of KMS key 
lookup..., things that in surface in support calls.

This patch restores Spark 2.3 to the behaviour it has in Spark 2.2: a brief 
delay between object creation and visibility does not cause the task to fail


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-08-18 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
Currently *nobody should be using s3a:// at the the temp file destination*, 
which is the same as saying "nobody should be using s3a:// as the direct 
destination of work", not without a special committer (Netflix, IBM's stocator, 
...) or without something to give S3 list consistency. Because today, task 
commit relies on a list & rename of all files in the task attempt dir, and if 
you don't get list consistency, you can miss out on files. If you ever hear 
anyone complaining "it takes too long to commit to s3" then they are using it 
this way. Tell them to use a consistency layer or to stop it :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...

2017-08-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18979#discussion_r133919035
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.nio.charset.Charset
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.Utils
+
+/**
+ * Test how BasicWriteTaskStatsTracker handles files.
+ */
+class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
+
+  private val tempDir = Utils.createTempDir()
+  private val tempDirPath = new Path(tempDir.toURI)
+  private val conf = new Configuration()
+  private val localfs = tempDirPath.getFileSystem(conf)
+  private val data1 = "0123456789".getBytes(Charset.forName("US-ASCII"))
+  private val data2 = "012".getBytes(Charset.forName("US-ASCII"))
+  private val len1 = data1.length
+  private val len2 = data2.length
+
+  /**
+   * In teardown delete the temp dir.
+   */
+  protected override def afterAll(): Unit = {
+Utils.deleteRecursively(tempDir)
+  }
+
+  /**
+   * Assert that the stats match that expected.
+   * @param tracker tracker to check
+   * @param files number of files expected
+   * @param bytes total number of bytes expected
+   */
+  private def assertStats(
+  tracker: BasicWriteTaskStatsTracker,
+  files: Int,
+  bytes: Int): Unit = {
+val stats = finalStatus(tracker)
+assert(files === stats.numFiles, "Wrong number of files")
+assert(bytes === stats.numBytes, "Wrong byte count of file size")
+  }
+
+  private def finalStatus(tracker: BasicWriteTaskStatsTracker): 
BasicWriteTaskStats = {
+tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats]
+  }
+
+  test("No files in run") {
+val tracker = new BasicWriteTaskStatsTracker(conf)
+assertStats(tracker, 0, 0)
+  }
+
+  test("Missing File") {
+val missing = new Path(tempDirPath, "missing")
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile(missing.toString)
+assertStats(tracker, 1, 0)
+  }
+
+  test("Empty filename is forwarded") {
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile("")
+intercept[IllegalArgumentException] {
+  finalStatus(tracker)
+}
+  }
+
+  test("Null filename is only picked up in final status") {
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile(null)
+intercept[IllegalArgumentException] {
+  finalStatus(tracker)
+}
+  }
+
+  test("0 byte file") {
+val file = new Path(tempDirPath, "file0")
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile(file.toString)
+touch(file)
+assertStats(tracker, 1, 0)
--- End diff --

I'm assuming that the file will *eventually* come into existence; that its 
absence straight after collection is simply a transient create inconsistency of 
the endpoint, like a brief caching of negative HEAD/GET requests (which AWS S3 
does do as part of its DoS defences). The files will be there later.

One option: count the #of missing files and include that in the report. It 
shouldn't be a metric most of the time though: never on a "real" FS or 
consistent object store, rarely on an inconsistent one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does no

[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...

2017-08-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18979#discussion_r133918269
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
 ---
@@ -57,7 +60,14 @@ class BasicWriteTaskStatsTracker(hadoopConf: 
Configuration)
   private def getFileSize(filePath: String): Long = {
 val path = new Path(filePath)
 val fs = path.getFileSystem(hadoopConf)
-fs.getFileStatus(path).getLen()
+try {
+  fs.getFileStatus(path).getLen()
+} catch {
+  case e: FileNotFoundException =>
+// may arise against eventually consistent object stores
+logInfo(s"File $path is not yet visible", e)
+0
--- End diff --

The problem is: what can be picked up if the file isn't yet reported as 
present by the endpoint? Adding a bool to say "results are unreliable" could be 
used as a warning.

One thing to consider long term is: if hadoop FS output streams added a 
simple  map of statistics, could they be picked up by committers 
& then aggregated in job reports. Hadoop filesystems have statistics (simple 
ones in  Hadoop <= 2.7, an arbitrary map of String -> Long in 2.8 (with 
[standard key names across 
filesystems](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java)),
 and certainly today S3ABlockOutputStream [collects stats on individual 
streams](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java#L756).
 If that was made visible and collected, you could get a lot more detail on 
what is going on. Thoughts? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...

2017-08-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18979#discussion_r133913173
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.nio.charset.Charset
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.Utils
+
+/**
+ * Test how BasicWriteTaskStatsTracker handles files.
+ */
+class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
+
+  private val tempDir = Utils.createTempDir()
+  private val tempDirPath = new Path(tempDir.toURI)
+  private val conf = new Configuration()
+  private val localfs = tempDirPath.getFileSystem(conf)
+  private val data1 = "0123456789".getBytes(Charset.forName("US-ASCII"))
+  private val data2 = "012".getBytes(Charset.forName("US-ASCII"))
+  private val len1 = data1.length
+  private val len2 = data2.length
+
+  /**
+   * In teardown delete the temp dir.
+   */
+  protected override def afterAll(): Unit = {
+Utils.deleteRecursively(tempDir)
+  }
+
+  /**
+   * Assert that the stats match that expected.
+   * @param tracker tracker to check
+   * @param files number of files expected
+   * @param bytes total number of bytes expected
+   */
+  private def assertStats(
+  tracker: BasicWriteTaskStatsTracker,
+  files: Int,
+  bytes: Int): Unit = {
+val stats = finalStatus(tracker)
+assert(files === stats.numFiles, "Wrong number of files")
+assert(bytes === stats.numBytes, "Wrong byte count of file size")
+  }
+
+  private def finalStatus(tracker: BasicWriteTaskStatsTracker): 
BasicWriteTaskStats = {
+tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats]
+  }
+
+  test("No files in run") {
+val tracker = new BasicWriteTaskStatsTracker(conf)
+assertStats(tracker, 0, 0)
+  }
+
+  test("Missing File") {
+val missing = new Path(tempDirPath, "missing")
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile(missing.toString)
+assertStats(tracker, 1, 0)
+  }
+
+  test("Empty filename is forwarded") {
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile("")
+intercept[IllegalArgumentException] {
+  finalStatus(tracker)
+}
+  }
+
+  test("Null filename is only picked up in final status") {
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile(null)
+intercept[IllegalArgumentException] {
+  finalStatus(tracker)
+}
+  }
+
+  test("0 byte file") {
+val file = new Path(tempDirPath, "file0")
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile(file.toString)
+touch(file)
+assertStats(tracker, 1, 0)
+  }
+
+  test("File with data") {
+val file = new Path(tempDirPath, "file-with-data")
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile(file.toString)
+write1(file)
+assertStats(tracker, 1, len1)
+  }
+
+  test("Open file") {
+val file = new Path(tempDirPath, "file-open")
+val tracker = new BasicWriteTaskStatsTracker(conf)
+tracker.newFile(file.toString)
+val stream = localfs.create(file, true)
+try {
+  assertStats(tracker, 1, 0)
+  stream.write(data1)
+  stream.flush()
+  assert(1 === finalStatus(tracker).numFiles, "Wrong number of file

[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...

2017-08-17 Thread steveloughran
GitHub user steveloughran opened a pull request:

https://github.com/apache/spark/pull/18979

[SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsTracker metrics 
collection fails if a new file isn't yet visible

## What changes were proposed in this pull request?

`BasicWriteTaskStatsTracker.getFileSize()` to catch 
`FileNotFoundException`, log @ info and then return 0 as a file size.

This ensures that if a newly created file isn't visible due to the store 
not always having create consistency, the metric collection doesn't cause the 
failure. 

## How was this patch tested?

New test suite included, `BasicWriteTaskStatsTrackerSuite`. This not only 
checks the resilience to missing files, but verifies the existing logic as to 
how file statistics are gathered.

Note that in the current implementation

1. if you call `Tracker..getFinalStats()` more than once, the file size 
count will increase by size of the last file. This could be fixed by clearing 
the filename field inside `getFinalStats()` itself.

2. If you pass in an empty or null string to `Tracker.newFile(path)` then 
IllegalArgumentException is raised, but only in `getFinalStats()`, rather than 
in `newFile`.  There's a test for this behaviour in the new suite, as it 
verifies that only FNFEs get swallowed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark 
cloud/SPARK-21762-missing-files-in-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18979.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18979


commit 8ad28b9bcd6a56b963ab57a5b4937d10f492de33
Author: Steve Loughran 
Date:   2017-08-17T19:35:35Z

SPARK-21762 handle FNFE events in BasicWriteStatsTracker; add a suite of 
tests for various file states.

Change-Id: I3269cb901a38b33e399ebef10b2dbcd51ccf9b75

commit 2a113fde1653743a3543df8ada395f320b826a3e
Author: Steve Loughran 
Date:   2017-08-17T20:01:50Z

SPARK-21762 add tests for "" and null filenames

Change-Id: I38ac11c808849e2fd91f4931f4cb5cdfad43e2af




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol...

2017-08-17 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18111#discussion_r133751724
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -73,7 +73,10 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
 
 val stagingDir: String = committer match {
   // For FileOutputCommitter it has its own staging path called "work 
path".
-  case f: FileOutputCommitter => 
Option(f.getWorkPath.toString).getOrElse(path)
+  case f: FileOutputCommitter =>
+val workPath = f.getWorkPath
+require(workPath != null, s"Committer has no workpath $f")
+Option(workPath.toString).getOrElse(path)
--- End diff --

{{workPath.toString()}} triggers an NPE, which was the reason for the stack 
trace.

Now, what's this code trying to do? find a directory to put stuff. If the 
committer is a subclass of `FileOutputFormat`,  then its getWorkPath method can 
be called, which *should* return a non-null path. 

If the requirement of the code is "get the workpath or return "path" if its 
null", maybe the code should really be

```scala
Option(f.getWorkPath).getOrElse(path).toString
```
That'd return the workPath if non null, falling back to the `path` 
variable, and then call `toString` on the returned object. I'm not sure off the 
top of my head if Scala type inference likes that though. It may be less 
elegant and more reliable to have

```scala
val workPath = f.getWorkPath
if (workPath != null) workPath.toString else path;
```
+maybe a bonus paranoid check for workPath being "".

I think that would actually get closer to the original goal of the code: 
always return a path, even of the committer doesn't have one




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17743: [SPARK-20448][DOCS] Document how FileInputDStream works ...

2017-08-17 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17743
  
Just reread this; still looks correct. Review comments welcome


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17342: [SPARK-12868][SQL] Allow adding jars from hdfs

2017-08-10 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17342
  
Created: [SPARK-21697](https://issues.apache.org/jira/browse/SPARK-21697) 
with the stack trace attached


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17342: [SPARK-12868][SQL] Allow adding jars from hdfs

2017-08-09 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17342
  
I'm going to recommend you file a SPARK bug on issues.apache.org there & an 
HDFS linked to it "NPE in BlockReaderFactory log init". It looks like the 
creation of the LOG for BlockReader is triggering introspection which is 
triggering the BlockReaderFactory to do something before its fully inited, and 
then possibly NPE-ing as the LOG field is null.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14601: [SPARK-13979][Core] Killed executor is re spawned withou...

2017-08-06 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/14601
  
I know this hasn't been updated, but it is still important. I can take it 
on if all it needs is a test case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18628: [SPARK-18061][ThriftServer] Add spnego auth support for ...

2017-08-03 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18628
  
Thanks for making sure this is consistent with other uses of 
Configuration.get(); consistency is critical here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties...

2017-08-03 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18668#discussion_r131095350
  
--- Diff: 
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 ---
@@ -50,6 +50,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
   private val prompt = "spark-sql"
   private val continuedPrompt = "".padTo(prompt.length, ' ')
   private var transport: TSocket = _
+  private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop."
--- End diff --

good point. I see `spark.hive` in some of my configs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties...

2017-08-03 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18668#discussion_r131094720
  
--- Diff: docs/configuration.md ---
@@ -2335,5 +2335,61 @@ The location of these configuration files varies 
across Hadoop versions, but
 a common location is inside of `/etc/hadoop/conf`. Some tools create
 configurations on-the-fly, but offer a mechanisms to download copies of 
them.
 
-To make these files visible to Spark, set `HADOOP_CONF_DIR` in 
`$SPARK_HOME/spark-env.sh`
+To make these files visible to Spark, set `HADOOP_CONF_DIR` in 
`$SPARK_HOME/conf/spark-env.sh`
 to a location containing the configuration files.
+
+# Custom Hadoop/Hive Configuration
+
+If your Spark applications interacting with Hadoop, Hive, or both, there 
are probably Hadoop/Hive
+configuration files in Spark's class path.
+
+Multiple running applications might require different Hadoop/Hive client 
side configurations.
+You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, 
`hive-site.xml` in
+Spark's class path for each application, but it is not very convenient and 
these
+files are best to be shared with common properties to avoid hard-coding 
certain configurations.
+
+The better choice is to use spark hadoop properties in the form of 
`spark.hadoop.*`. 
+They can be considered as same as normal spark properties which can be set 
in `$SPARK_HOME/conf/spark-defalut.conf`
+
+In some cases, you may want to avoid hard-coding certain configurations in 
a `SparkConf`. For
+instance, Spark allows you to simply create an empty conf and set 
spark/spark hadoop properties.
+
+{% highlight scala %}
+val conf = new SparkConf().set("spark.hadoop.abc.def","xyz")
+val sc = new SparkContext(conf)
+{% endhighlight %}
+
+Also, you can modify or add configurations at runtime:
+{% highlight bash %}
+./bin/spark-submit \ 
+  --name "My app" \ 
+  --master local[4] \  
+  --conf spark.eventLog.enabled=false \ 
+  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps" \ 
+  --conf spark.hadoop.abc.def=xyz \ 
+  myApp.jar
+{% endhighlight %}
+
+## Typical Hadoop/Hive Configurations
+
+
+
+  spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
+  1
+  
+The file output committer algorithm version, valid algorithm version 
number: 1 or 2.
+Version 2 may have better performance, but version 1 may handle 
failures better in certain situations,
+as per https://issues.apache.org/jira/browse/MAPREDUCE-4815";>MAPREDUCE-4815.
+  
+
+
+
+  spark.hadoop.fs.hdfs.impl.disable.cache
--- End diff --

this is a pretty dangerous one to point people at, especially since it's 
fixed in future Hadoop versions & backported to some distros —and the cost of 
creating a new HDFS client on every worker can get very expensive if you have a 
spark process with many threads, all fielding work from the same user (thread 
pools, IPC connections, )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties...

2017-08-03 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18668#discussion_r131093892
  
--- Diff: docs/configuration.md ---
@@ -2335,5 +2335,61 @@ The location of these configuration files varies 
across Hadoop versions, but
 a common location is inside of `/etc/hadoop/conf`. Some tools create
 configurations on-the-fly, but offer a mechanisms to download copies of 
them.
 
-To make these files visible to Spark, set `HADOOP_CONF_DIR` in 
`$SPARK_HOME/spark-env.sh`
+To make these files visible to Spark, set `HADOOP_CONF_DIR` in 
`$SPARK_HOME/conf/spark-env.sh`
 to a location containing the configuration files.
+
+# Custom Hadoop/Hive Configuration
+
+If your Spark applications interacting with Hadoop, Hive, or both, there 
are probably Hadoop/Hive
+configuration files in Spark's class path.
+
+Multiple running applications might require different Hadoop/Hive client 
side configurations.
+You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, 
`hive-site.xml` in
+Spark's class path for each application, but it is not very convenient and 
these
+files are best to be shared with common properties to avoid hard-coding 
certain configurations.
--- End diff --

"best shared"

You can'd do that anyway on a production Spark on Yarn cluster as if you 
did., lots of other things would break. How about

```
In a Spark cluster running on YARN, these configuration files are set 
cluster-wide, and cannot safely be changed by the application.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties...

2017-08-03 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18668#discussion_r131093320
  
--- Diff: docs/configuration.md ---
@@ -2335,5 +2335,61 @@ The location of these configuration files varies 
across Hadoop versions, but
 a common location is inside of `/etc/hadoop/conf`. Some tools create
 configurations on-the-fly, but offer a mechanisms to download copies of 
them.
 
-To make these files visible to Spark, set `HADOOP_CONF_DIR` in 
`$SPARK_HOME/spark-env.sh`
+To make these files visible to Spark, set `HADOOP_CONF_DIR` in 
`$SPARK_HOME/conf/spark-env.sh`
 to a location containing the configuration files.
+
+# Custom Hadoop/Hive Configuration
+
+If your Spark applications interacting with Hadoop, Hive, or both, there 
are probably Hadoop/Hive
--- End diff --

s/applications/r/application is/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18628: [SPARK-18061][ThriftServer] Add spnego auth suppo...

2017-08-01 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18628#discussion_r130598803
  
--- Diff: 
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
 ---
@@ -57,6 +59,19 @@ private[hive] class SparkSQLCLIService(hiveServer: 
HiveServer2, sqlContext: SQLC
 case e @ (_: IOException | _: LoginException) =>
   throw new ServiceException("Unable to login to kerberos with 
given principal/keytab", e)
   }
+
+  // Try creating spnego UGI if it is configured.
+  val principal = 
hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL)
+  val keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB)
--- End diff --

`HiveConf.getVar()` doesn't trim thhe results of leading/trailing via 
{{Configuration.getTrimmed()}}. Check for other uses of the confvar to see if 
there is any trimming of whitespace before and after their use: if so,  you 
need to copy that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18628: [SPARK-18061][ThriftServer] Add spnego auth suppo...

2017-08-01 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18628#discussion_r130598230
  
--- Diff: 
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
 ---
@@ -57,6 +59,19 @@ private[hive] class SparkSQLCLIService(hiveServer: 
HiveServer2, sqlContext: SQLC
 case e @ (_: IOException | _: LoginException) =>
   throw new ServiceException("Unable to login to kerberos with 
given principal/keytab", e)
   }
+
+  // Try creating spnego UGI if it is configured.
+  val principal = 
hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL)
+  val keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB)
+  if (principal.nonEmpty && keyTabFile.nonEmpty) {
+try {
+  httpUGI = 
HiveAuthFactory.loginFromSpnegoKeytabAndReturnUGI(hiveConf)
+  setSuperField(this, "httpUGI", httpUGI)
+} catch {
+  case e: IOException =>
+throw new ServiceException("Unable to login to spnego with 
given principal/keytab", e)
+}
--- End diff --

I'd recommend including as much diagnostics in the exception string, 
including principal & keytab, and the value of e.toString in the message
```
throw new ServiceException(s"Unable to login to spnego with principal 
`$principal and keytab `$keytab`: $e", e)
```

The scenario to plan for is "nothing works and all you have to go on is 
that string from the raised exception". 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17747: [SPARK-11373] [CORE] Add metrics to the FsHistoryProvide...

2017-07-12 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17747
  
I know, I just have too many open JIRAs to try and manage


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17747: [SPARK-11373] [CORE] Add metrics to the FsHistoryProvide...

2017-07-12 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17747
  
Pushing up a new patched rebased to work with master.

It's getting boring all round for this patch: me having to do a merge, 
retest, repush. How about finalising the review so we can be done with it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to fai...

2017-07-04 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18111
  
Is there anything else which needs to be one here, or is it matter of 
finding the right reviewer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17747: [SPARK-11373] [CORE] Add metrics to the FsHistoryProvide...

2017-06-23 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17747
  
Mima test failure was about a new method in hist server
```
[info] spark-mllib: found 0 potential binary incompatibilities while 
checking against org.apache.spark:spark-mllib_2.11:2.0.0  (filtered 167)
[info] spark-core: found 1 potential binary incompatibilities while 
checking against org.apache.spark:spark-core_2.11:2.0.0  (filtered 1041)
[error]  * method 
cacheMetrics()org.apache.spark.deploy.history.CacheMetrics in class 
org.apache.spark.deploy.history.HistoryServer does not have a correspondent in 
current version
[error]filter with: ProblemFil
```

I'll make sure the scope for that is as restricted as possible; it should 
only be accessed for testing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #9518: [SPARK-11574][Core] Add metrics StatsD sink

2017-06-22 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/9518
  
BTW, here are some ongoing Hadoop JIRAs related to its shipping statsd: 
[HADOOP-12360](https://issues.apache.org/jira/browse/HADOOP-12360?focusedCommentId=16034826&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16034826).
 other improvements in 
[HADOOP-13048](https://issues.apache.org/jira/browse/HADOOP-13048). Think there 
have been some complaints about how failures to publish are handled (speed @ 
which it gives up, etc). Worth looking at the JIRAs to see if there are any 
lessons there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9518: [SPARK-11574][Core] Add metrics StatsD sink

2017-06-22 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9518#discussion_r123483576
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.io.IOException
+import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.SortedMap
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import com.codahale.metrics._
+import org.apache.hadoop.net.NetUtils
+
+import org.apache.spark.Logging
+
+/**
+ * @see https://github.com/etsy/statsd/blob/master/docs/metric_types.md";>
+ *StatsD metric types
+ */
+private[spark] sealed trait StatsdMetricType {
+  val COUNTER = "c"
+  val GAUGE = "g"
+  val TIMER = "ms"
+  val Set = "s"
+}
+
+private[spark] class StatsdReporter(
+registry: MetricRegistry,
+host: String = "127.0.0.1",
+port: Int = 8125,
+prefix: String = "",
+filter: MetricFilter = MetricFilter.ALL,
+rateUnit: TimeUnit = TimeUnit.SECONDS,
+durationUnit: TimeUnit = TimeUnit.MILLISECONDS)
+  extends ScheduledReporter(registry, "statsd-reporter", filter, rateUnit, 
durationUnit)
+  with StatsdMetricType with Logging {
+
+  private val address = new InetSocketAddress(host, port)
+  private val whitespace = "[\\s]+".r
+
+  override def report(
+  gauges: SortedMap[String, Gauge[_]],
+  counters: SortedMap[String, Counter],
+  histograms: SortedMap[String, Histogram],
+  meters: SortedMap[String, Meter],
+  timers: SortedMap[String, Timer]): Unit =
+Try(new DatagramSocket) match {
+  case Failure(ioe: IOException) => logWarning("StatsD datagram socket 
construction failed",
+NetUtils.wrapException(host, port, "0.0.0.0", 0, ioe))
--- End diff --

Preferable to use {{NetUtils.getLocalHostname()}} for the source address, 
you'll appreciate it on large cluster diagnostics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14601: [SPARK-13979][Core] Killed executor is re spawned withou...

2017-06-21 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/14601
  
Testing should not be too hard.  Here's my *untested* attempt

```scala
val sconf = new SparkConf(false)
sconf.set("fs.example.value", "true")
val conf = new Configuration(false)
new SparkHadoopUtil().appendS3AndSparkHadoopConfigurations(sconf, conf)
assert( conf.getBoolean("fs.example.value", false))
```
 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17747: [SPARK-11373] [CORE] Add metrics to the FsHistoryProvide...

2017-06-15 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/17747
  
I'm going to go with your suggestion and go via the metricServer to get at 
the state of counters and gauges; this is is actually better in that it will 
verify that all metrics are making their way through. This is just some 
background work, so expect a short delay before it gets updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...

2017-06-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/17747#discussion_r122295075
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -164,6 +169,16 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
   }
 
+  /**
--- End diff --

cut it; just noting that something is actually returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...

2017-06-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/17747#discussion_r122294995
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -129,6 +131,9 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 
   private val pendingReplayTasksCount = new 
java.util.concurrent.atomic.AtomicInteger(0)
 
+  /** filesystem metrics: visible for test access */
--- End diff --

there are some which do; `"appui.load.count` for example


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...

2017-06-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/17747#discussion_r122279400
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -110,6 +117,14 @@ class HistoryServer(
 appCache.getSparkUI(appKey)
   }
 
+  val historyMetrics = new HistoryMetrics(this, "history.server")
+
+  /**
+   * Provider metrics are None until the provider is started, and only 
after that
+   * point if the provider returns any.
+   */
+  var providerMetrics: Option[Source] = None
--- End diff --

not easily, would be the summary. making it a `private[history]` would lock 
down access though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...

2017-06-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/17747#discussion_r122256050
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryMetricSource.scala 
---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, Gauge, Metric, MetricFilter, 
MetricRegistry, Timer}
+
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.util.Clock
+
+/**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations for
+ * retrieving entries; the `toString()` operation dumps all counters and 
gauges.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
+
+  override val metricRegistry = new MetricRegistry()
+
+  /**
+   * Register a sequence of metrics
+   * @param metrics sequence of metrics to register
+   */
+  def register(metrics: Seq[(String, Metric)]): Unit = {
+metrics.foreach { case (name, metric) =>
+  metricRegistry.register(fullname(name), metric)
+}
+  }
+
+  /**
+   * Create the full name of a metric by prepending the prefix to the name
+   * @param name short name
+   * @return the full name to use in registration
+   */
+  def fullname(name: String): String = {
+MetricRegistry.name(prefix, name)
+  }
+
+  /**
+   * Dump the counters and gauges.
+   * @return a string for logging and diagnostics -not for parsing by 
machines.
+   */
+  override def toString: String = {
+val sb = new StringBuilder(s"Metrics for $sourceName:\n")
+def robustAppend(s : => Long) = {
+  try {
+sb.append(s)
+  } catch {
+case e: Exception =>
+  sb.append(s"(exception: $e)")
+  }
+}
+
+sb.append("  Counters\n")
+
+metricRegistry.getCounters.asScala.foreach { case (name, counter) =>
+sb.append("").append(name).append(" = ")
+.append(counter.getCount).append('\n')
+}
+sb.append("  Gauges\n")
+metricRegistry.getGauges.asScala.foreach { case (name, gauge) =>
+  sb.append("").append(name).append(" = ")
+  try {
+sb.append(gauge.getValue)
--- End diff --

yes, I'm trying to work why I *didn't* use that method given it was there. 
Widened it to a generic type and used. It's probably overkill, but as anything 
can be a Gauge, its possible that they will NPE or similar, and that just ruins 
logging & debugging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18247: [SPARK-13933][BUILD] Update hadoop-2.7 profile's curator...

2017-06-14 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18247
  
..just caught this. No, no issues with it. A retrospective non-binding +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to fai...

2017-05-26 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18111
  
Not really. I thought about how I could do it, but essentially you do need 
to do things underneath the commit protocol, either in the Hadoop codebase (me) 
or in a test which somehow misconfigured things. It's most likely to surface in 
custom subclasses of `FileOutputCommitter`, so a test would probably consist of 
a run with a new committer set to always return null on `getWorkingPath`, 
verifying the changed assertion went from NPE to an IllegalArgException. 

That's a lot of complexity a test for what is essentially: making explicit 
a requirement of the code, namely, that `getWorkPath.toString()` requires 
`getWorkPath` to be non-null

I've included the before/after stack traces in the JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol...

2017-05-25 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/18111#discussion_r118530138
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -73,7 +73,10 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
 
 val stagingDir: String = committer match {
   // For FileOutputCommitter it has its own staging path called "work 
path".
-  case f: FileOutputCommitter => 
Option(f.getWorkPath.toString).getOrElse(path)
+  case f: FileOutputCommitter =>
+val workPath = f.getWorkPath
+require(workPath != null, s"Committer has no workpath $f")
+Option(workPath.toString).getOrElse(path)
--- End diff --

If this was intended to be resilient, it'd have to become 
`Option(workpath).getOrElse(path).toString()`, though that would actually hide 
the problem I'd managed to create: hand in a job committer to a task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol...

2017-05-25 Thread steveloughran
GitHub user steveloughran opened a pull request:

https://github.com/apache/spark/pull/18111

[SPARK-20886][CORE] HadoopMapReduceCommitProtocol to fail meaningfully if 
FileOutputCommitter.getWorkPath==null

## What changes were proposed in this pull request?

Handles the situation where a `FileOutputCommitter.getWorkPath()` returns 
`null` by a `require()` call and a message which explains the problem and 
includes the `toString` value of the committer for better diagnostics.

The situation occurs if the committer being passed in is a job committer, 
not a task committer, that is: it was initalised with a `JobAttemptContext` not 
a `TaskAttemptContext`.

The existing code does an  `Option(workPath.toString).getOrElse(path)` 
which *may* be an attempt to handle the null path case. If so, it isn't, 
because its the `.toString()` call which is failing. If people do think that 
code should be resilient to null work paths, that line could be changed. 
However, it may hide the underlying problem: the committer is misconfigured.

It may be a rare-occurence today, but it is more likely with modified 
subclasses of `FileOutputCommitter`, as well as possible
with some ongoing work of mine in Hadoop to better support commitment to 
cloud storage infrastructures.

## How was this patch tested?

Manually. The before & after stack traces are on the JIRA.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark 
cloud/SPARK-20886-committer-NPE

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18111


commit 02eb7bf0ee6b81841f22e3c46d822eaebb28e85c
Author: Steve Loughran 
Date:   2017-05-25T15:46:50Z

SPARK-20886 HadoopMapReduceCommitProtocol to fail with message if 
FileOutputCommitter.getWorkPath==null
Add a requirement.
The existing code does an Option.getWorkpath.toString() which *may* be an 
attempt to handle the null path case. If so, it isn't, because its the 
.toString() which is failing.

Change-Id: Idddf9813761e7008425542f96903bce12bedd978




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-05-25 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/9571


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >