[GitHub] spark issue #19497: [SPARK-21549][CORE] Respect OutputFormats with no/invali...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19497 I guess one aspect of `saveAsNewAPIHadoopFile` is that it calls ` jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", path)`, and `Configuration.set(String key, String value)` has a check for null key or value. If handling of paths is to be done in the committer, `saveAsNewAPIHadoopFile` should really be looking @ path and calling jobConfiguration.unset("mapreduce.output.fileoutputformat.outputdir) if path==null. Looking at how Hadoop's FileOutputFormat implementations work, they can handle a null/undefined output dir property, *but not an empty one*. ```java public static Path getOutputPath(JobContext job) { String name = job.getConfiguration().get(FileOutputFormat.OUTDIR); return name == null ? null: new Path(name); ``` Which implies that `saveAsNewHadoopFile("")` might want to unset the config option too, so offloading the problem of what happens on an empty path to the committer. Though I'd recommend checking to see what meaningful exceptions actually get raised in this situation when the committer is the normal FileOutputFormat/FileOutputCommitter setup --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r145096772 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.text.SimpleDateFormat +import java.util.{Collections, Date, List => JList, Locale, Optional, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path} + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +/** + * A HDFS based transactional writable data source. + * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/jobId/` to `target`. + */ +class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { + + private val schema = new StructType().add("i", "long").add("j", "long") + + class Reader(path: String, conf: Configuration) extends DataSourceV2Reader { +override def readSchema(): StructType = schema + +override def createReadTasks(): JList[ReadTask[Row]] = { + val dataPath = new Path(path) + val fs = dataPath.getFileSystem(conf) + if (fs.exists(dataPath)) { +fs.listStatus(dataPath).filterNot { status => + val name = status.getPath.getName + name.startsWith("_") || name.startsWith(".") +}.map { f => + val serializableConf = new SerializableConfiguration(conf) + new SimpleCSVReadTask(f.getPath.toUri.toString, serializableConf): ReadTask[Row] +}.toList.asJava + } else { +Collections.emptyList() + } +} + } + + class Writer(path: String, conf: Configuration) extends DataSourceV2Writer { +// We can't get the real spark job id here, so we use a timestamp and random UUID to simulate +// a unique job id. +protected val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + + "-" + UUID.randomUUID() + +override def createWriterFactory(): DataWriterFactory[Row] = { + new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf)) +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = { + val finalPath = new Path(path) + val jobPath = new Path(new Path(finalPath, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + try { +for (file <- fs.listStatus(jobPath).map(_.getPath)) { + val dest = new Path(finalPath, file.getName) + if(!fs.rename(file, dest)) { +throw new IOException(s"failed to rename($file, $dest)") + } +} + } finally { +fs.delete(jobPath, true) + } +} + +override def abort(messages: Array[WriterCommitMessage]): Unit = { + val jobPath = new Path(new Path(path, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + fs.delete(jobPath, true) +} + } + + class InternalRowWriter(path: String, conf: Configuration) +extends Writer(path, conf) with SupportsWriteInte
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r144948292 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.io.{BufferedReader, InputStreamReader} +import java.text.SimpleDateFormat +import java.util.{Collections, Date, List => JList, Locale, Optional, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +/** + * A HDFS based transactional writable data source. + * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/jobId/` to `target`. + */ +class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { + + private val schema = new StructType().add("i", "long").add("j", "long") + + class Reader(path: String, conf: Configuration) extends DataSourceV2Reader { +override def readSchema(): StructType = schema + +override def createReadTasks(): JList[ReadTask[Row]] = { + val dataPath = new Path(path) + val fs = dataPath.getFileSystem(conf) + if (fs.exists(dataPath)) { + fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f => + val serializableConf = new SerializableConfiguration(conf) + new SimpleCSVReadTask(f.getPath.toUri.toString, serializableConf): ReadTask[Row] +}.toList.asJava + } else { +Collections.emptyList() + } +} + } + + class Writer(path: String, conf: Configuration) extends DataSourceV2Writer { +// We can't get the real spark job id here, so we use a timestamp and random UUID to simulate +// a unique job id. +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + + "-" + UUID.randomUUID() + +override def createWriterFactory(): DataWriterFactory[Row] = { + new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf)) +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = { + val finalPath = new Path(path) + val jobPath = new Path(new Path(finalPath, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + try { +for (file <- fs.listStatus(jobPath).map(_.getPath)) { + fs.rename(file, new Path(finalPath, file.getName)) +} + } finally { +fs.delete(jobPath, true) + } +} + +override def abort(messages: Array[WriterCommitMessage]): Unit = { + val jobPath = new Path(new Path(path, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + fs.delete(jobPath, true) +} + } + + class InternalRowWriter(path: String, conf: Configuration) +extends DataSourceV2Writer with SupportsWriteInternalRow { + +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + +override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { + new InternalRowCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf))
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 thanks for the review everyone! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r144823664 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.io.{BufferedReader, InputStreamReader} +import java.text.SimpleDateFormat +import java.util.{Collections, Date, List => JList, Locale, Optional, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +/** + * A HDFS based transactional writable data source. + * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/jobId/` to `target`. + */ +class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { + + private val schema = new StructType().add("i", "long").add("j", "long") + + class Reader(path: String, conf: Configuration) extends DataSourceV2Reader { +override def readSchema(): StructType = schema + +override def createReadTasks(): JList[ReadTask[Row]] = { + val dataPath = new Path(path) + val fs = dataPath.getFileSystem(conf) + if (fs.exists(dataPath)) { + fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f => + val serializableConf = new SerializableConfiguration(conf) + new SimpleCSVReadTask(f.getPath.toUri.toString, serializableConf): ReadTask[Row] +}.toList.asJava + } else { +Collections.emptyList() + } +} + } + + class Writer(path: String, conf: Configuration) extends DataSourceV2Writer { +// We can't get the real spark job id here, so we use a timestamp and random UUID to simulate +// a unique job id. +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + + "-" + UUID.randomUUID() + +override def createWriterFactory(): DataWriterFactory[Row] = { + new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf)) +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = { + val finalPath = new Path(path) + val jobPath = new Path(new Path(finalPath, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + try { +for (file <- fs.listStatus(jobPath).map(_.getPath)) { + fs.rename(file, new Path(finalPath, file.getName)) +} + } finally { +fs.delete(jobPath, true) + } +} + +override def abort(messages: Array[WriterCommitMessage]): Unit = { + val jobPath = new Path(new Path(path, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + fs.delete(jobPath, true) +} + } + + class InternalRowWriter(path: String, conf: Configuration) +extends DataSourceV2Writer with SupportsWriteInternalRow { + +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + +override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { + new InternalRowCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf))
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r144823298 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.io.{BufferedReader, InputStreamReader} +import java.text.SimpleDateFormat +import java.util.{Collections, Date, List => JList, Locale, Optional, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +/** + * A HDFS based transactional writable data source. + * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/jobId/` to `target`. + */ +class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { + + private val schema = new StructType().add("i", "long").add("j", "long") + + class Reader(path: String, conf: Configuration) extends DataSourceV2Reader { +override def readSchema(): StructType = schema + +override def createReadTasks(): JList[ReadTask[Row]] = { + val dataPath = new Path(path) + val fs = dataPath.getFileSystem(conf) + if (fs.exists(dataPath)) { + fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f => + val serializableConf = new SerializableConfiguration(conf) + new SimpleCSVReadTask(f.getPath.toUri.toString, serializableConf): ReadTask[Row] +}.toList.asJava + } else { +Collections.emptyList() + } +} + } + + class Writer(path: String, conf: Configuration) extends DataSourceV2Writer { +// We can't get the real spark job id here, so we use a timestamp and random UUID to simulate +// a unique job id. +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + + "-" + UUID.randomUUID() + +override def createWriterFactory(): DataWriterFactory[Row] = { + new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf)) +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = { + val finalPath = new Path(path) + val jobPath = new Path(new Path(finalPath, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + try { +for (file <- fs.listStatus(jobPath).map(_.getPath)) { + fs.rename(file, new Path(finalPath, file.getName)) +} + } finally { +fs.delete(jobPath, true) + } +} + +override def abort(messages: Array[WriterCommitMessage]): Unit = { + val jobPath = new Path(new Path(path, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + fs.delete(jobPath, true) +} + } + + class InternalRowWriter(path: String, conf: Configuration) +extends DataSourceV2Writer with SupportsWriteInternalRow { + +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + +override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { + new InternalRowCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf))
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r144822800 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.io.{BufferedReader, InputStreamReader} +import java.text.SimpleDateFormat +import java.util.{Collections, Date, List => JList, Locale, Optional, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +/** + * A HDFS based transactional writable data source. + * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/jobId/` to `target`. + */ +class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { + + private val schema = new StructType().add("i", "long").add("j", "long") + + class Reader(path: String, conf: Configuration) extends DataSourceV2Reader { +override def readSchema(): StructType = schema + +override def createReadTasks(): JList[ReadTask[Row]] = { + val dataPath = new Path(path) + val fs = dataPath.getFileSystem(conf) + if (fs.exists(dataPath)) { + fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f => + val serializableConf = new SerializableConfiguration(conf) + new SimpleCSVReadTask(f.getPath.toUri.toString, serializableConf): ReadTask[Row] +}.toList.asJava + } else { +Collections.emptyList() + } +} + } + + class Writer(path: String, conf: Configuration) extends DataSourceV2Writer { +// We can't get the real spark job id here, so we use a timestamp and random UUID to simulate +// a unique job id. +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + + "-" + UUID.randomUUID() + +override def createWriterFactory(): DataWriterFactory[Row] = { + new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf)) +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = { + val finalPath = new Path(path) + val jobPath = new Path(new Path(finalPath, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + try { +for (file <- fs.listStatus(jobPath).map(_.getPath)) { + fs.rename(file, new Path(finalPath, file.getName)) --- End diff -- Treat rename returning 0 as a failure. It's an ugly mess as, say HDFS returns 0 for "missing source file", other things do it for minor no ops (src==dest). Because of HDFS, I'd have something like ```scala val dest = new Path(finalPath, file.getName if(!rename(file, dest)) { throw new IOException(s"failed to rename($file, $dest)") } ``` Then helps us write the spec and tests for HADOOP-11452 and make public a rename which throws exceptions on failures --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r144822829 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.io.{BufferedReader, InputStreamReader} +import java.text.SimpleDateFormat +import java.util.{Collections, Date, List => JList, Locale, Optional, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +/** + * A HDFS based transactional writable data source. + * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/jobId/` to `target`. + */ +class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { + + private val schema = new StructType().add("i", "long").add("j", "long") + + class Reader(path: String, conf: Configuration) extends DataSourceV2Reader { +override def readSchema(): StructType = schema + +override def createReadTasks(): JList[ReadTask[Row]] = { + val dataPath = new Path(path) + val fs = dataPath.getFileSystem(conf) + if (fs.exists(dataPath)) { + fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f => + val serializableConf = new SerializableConfiguration(conf) + new SimpleCSVReadTask(f.getPath.toUri.toString, serializableConf): ReadTask[Row] +}.toList.asJava + } else { +Collections.emptyList() + } +} + } + + class Writer(path: String, conf: Configuration) extends DataSourceV2Writer { +// We can't get the real spark job id here, so we use a timestamp and random UUID to simulate +// a unique job id. +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + + "-" + UUID.randomUUID() + +override def createWriterFactory(): DataWriterFactory[Row] = { + new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf)) +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = { + val finalPath = new Path(path) + val jobPath = new Path(new Path(finalPath, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + try { +for (file <- fs.listStatus(jobPath).map(_.getPath)) { + fs.rename(file, new Path(finalPath, file.getName)) +} + } finally { +fs.delete(jobPath, true) + } +} + +override def abort(messages: Array[WriterCommitMessage]): Unit = { + val jobPath = new Path(new Path(path, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + fs.delete(jobPath, true) +} + } + + class InternalRowWriter(path: String, conf: Configuration) +extends DataSourceV2Writer with SupportsWriteInternalRow { + +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + +override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { + new InternalRowCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf))
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r144821527 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.io.{BufferedReader, InputStreamReader} +import java.text.SimpleDateFormat +import java.util.{Collections, Date, List => JList, Locale, Optional, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +/** + * A HDFS based transactional writable data source. + * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/jobId/` to `target`. + */ +class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { + + private val schema = new StructType().add("i", "long").add("j", "long") + + class Reader(path: String, conf: Configuration) extends DataSourceV2Reader { +override def readSchema(): StructType = schema + +override def createReadTasks(): JList[ReadTask[Row]] = { + val dataPath = new Path(path) + val fs = dataPath.getFileSystem(conf) + if (fs.exists(dataPath)) { + fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f => --- End diff -- Consider also filtering startsWith(.), in case that gets picked up on localFS. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r144821389 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.io.{BufferedReader, InputStreamReader} +import java.text.SimpleDateFormat +import java.util.{Collections, Date, List => JList, Locale, Optional, UUID} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +/** + * A HDFS based transactional writable data source. + * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/jobId/` to `target`. + */ +class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { + + private val schema = new StructType().add("i", "long").add("j", "long") + + class Reader(path: String, conf: Configuration) extends DataSourceV2Reader { +override def readSchema(): StructType = schema + +override def createReadTasks(): JList[ReadTask[Row]] = { + val dataPath = new Path(path) + val fs = dataPath.getFileSystem(conf) + if (fs.exists(dataPath)) { + fs.listStatus(dataPath).filter(!_.getPath.getName.startsWith("_")).map { f => + val serializableConf = new SerializableConfiguration(conf) + new SimpleCSVReadTask(f.getPath.toUri.toString, serializableConf): ReadTask[Row] +}.toList.asJava + } else { +Collections.emptyList() + } +} + } + + class Writer(path: String, conf: Configuration) extends DataSourceV2Writer { +// We can't get the real spark job id here, so we use a timestamp and random UUID to simulate +// a unique job id. +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + + "-" + UUID.randomUUID() + +override def createWriterFactory(): DataWriterFactory[Row] = { + new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf)) +} + +override def commit(messages: Array[WriterCommitMessage]): Unit = { + val finalPath = new Path(path) + val jobPath = new Path(new Path(finalPath, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + try { +for (file <- fs.listStatus(jobPath).map(_.getPath)) { + fs.rename(file, new Path(finalPath, file.getName)) +} + } finally { +fs.delete(jobPath, true) + } +} + +override def abort(messages: Array[WriterCommitMessage]): Unit = { + val jobPath = new Path(new Path(path, "_temporary"), jobId) + val fs = jobPath.getFileSystem(conf) + fs.delete(jobPath, true) +} + } + + class InternalRowWriter(path: String, conf: Configuration) +extends DataSourceV2Writer with SupportsWriteInternalRow { + +private val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US).format(new Date()) + +override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { + new InternalRowCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf))
[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19487 The more I see of the committer internals, the less confident I am about understanding any of it. If your committer isn't writing stuff out, it doesn't need to have any value of mapred.output.dir at all, does it? If it does use it, it'll handle an invalid entry in setupJob/setupTask by throwing an exception there. So the goal of the stuff above it should be to make sure it gets to deal with validating its inputs. Hadoop trunk adds a new [PathOutputCommitter](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java) class for committers: it's the useful getters of `FileOutputCommitter` pulled up so allowing other committers to provide things like spark the info they need without looking into properties like mapred.output.dir. Have a look at that class and if there is something extra you want pulled up, let me know before Hadoop 3.0 ships & I'll see what I can do --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 done. Not writing 0-byte files will offer significant speedup against object stores, where the cost of a call to getFileStatus() can take hundreds of millis. I look forward to it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19448 > But, if I were working on a Spark distribution at a vendor, this is something I would definitely include because it's such a useful feature. I concur :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19487 "" can come in via configuration files; I'd treat that the same as null. Things which aren't valid URIs though, that's something you want to bounce --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19487 Looking a bit more at this. I see it handles """ as well as empty, and also other forms of invalid URI which Path can't handle today ("multiple colons except with file:// on windows, etc). And as long as you don't call `absPathStagingDir` you don't get an exception there. do you actually want the code to downgrade if an invalid URI is passed in, that is: only skip if the path is empty/null, but not for other things like a path of "::" ? As there you may want to reject it. In which case you'd change the `hasValidPath` query to just looking at the string, and reinstate the test for the path with an invalid non-empty URI BTW, "test:" is possibly a valid path for "the home directory in the schema 'test'. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19487: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19487#discussion_r144545827 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -60,15 +71,6 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) --- End diff -- what if path is null here when passed in? I'd actually expect an IllegalArgumentException being raised, at least from looking at Path.checkParthArg() --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 The latest PR update pulls in @dongjoon-hyun's new test; to avoid merge conflict in the Insert suite I've rebased against master. 1. Everything handles missing files on output 2. There's only one logInfo at the end of the execute call, so if many empty files are created, the logs aren't too noisy. 3. There is now some implicit counting of how many files were missing `= submittedFiles - numFiles`; this isn't aggregated and reported though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19448 PS, for people who are interested in dynamic committers, [MAPREDUCE-6823](https://issues.apache.org/jira/browse/MAPREDUCE-6823) is something to look at. It allows you to switch committers under pretty much everything *other than parquet*...this patch helps make Parquet manageable too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19448 Thanks for reviewing this/getting it in. Personally, I had it in the "improvement" category rather than bug fix. If it wasn't for that line in the docs, there'd be no ambiguity about improve/vs fix, and there is always a lower-risk way to fix doc/code mismatch: change the docs. But I'm grateful for it being in; with the backport to branch-2 ryan should be able to use it semi-immediately --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 Noted :) @dongjoon-hyun : is the issue with ORC that if there's nothing to write, it doesn't generate a file (so avoiding that issue with sometimes you get 0-byte ORC files & things downstream fail)? If so, the warning message which @gatorsmile has proposed is potentially going to mislead people into worrying about a problem which isn't there. and the numFiles metric is going to mislead. I'm starting to worry about how noisy the log would be, both there and when working with s3 when it's playing delayed visibility (rarer). 1. What if this patch just logged at debug: less noise, but still something there if people are trying to debug a mismatch? 1. if there's no file found, numFiles doesn't get incremented. 1. I count the number of files actually submitted 1. And in `getFinalStats()` log @ info if there is a mismatch This would line things up in future for actually returning the list of expected vs actual files up as a metric where it could be reported. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18979#discussion_r144505454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala --- @@ -57,7 +60,14 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) private def getFileSize(filePath: String): Long = { val path = new Path(filePath) val fs = path.getFileSystem(hadoopConf) -fs.getFileStatus(path).getLen() +try { + fs.getFileStatus(path).getLen() +} catch { + case e: FileNotFoundException => +// may arise against eventually consistent object stores +logInfo(s"File $path is not yet visible", e) --- End diff -- say "Reported file size in job summary may be invalid"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19487 LGTM. I'm going stick out today a slight roll of my PathOutputCommitter class which is one layer above FileOutputCommitter : lets people write committers without output & work paths, yet avoid going near complexity that is FileOutputFormat. See [PathOutputCommitter](https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/com/hortonworks/spark/cloud/commit/PathOutputCommitProtocol.scala); if I get my changes into Hadoop 3 then this committer will work for all versions of Hadoop 3.x, even though the S3A stuff is targeting Hadoop 3.1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144381367 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- yes. there is that. Options: do something complicated with a static field to only print ones. Log at debug so people only see the message if they are trying to track things down. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144375059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,11 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + || classOf[ParquetOutputCommitter].isAssignableFrom(committerClass), + s"Committer $committerClass is not a ParquetOutputCommitter and cannot create job summaries." ++ " Set Parquet option " + ParquetOutputFormat.ENABLE_JOB_SUMMARY + " to false.") --- End diff -- I'd thought about that; didn't look any better or worse. Will change it for log message. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144239543 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- There's another option which is "log @ warn and continue". If someone has changed the committer, they get the consequences. That could also permit someone with a modified committer to generate schema summaries if they chose/permitted. IT'd simplify this patch, need the tests tweaked...I'd change the SQLConf text with the committer key to say "if the committer isn't a ParquetOutputCommitter then don't expect summaries" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144238941 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +if (spark != null) { + spark.stop() + spark = null +} +super.afterAll() --- End diff -- good point --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144065810 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + +s" and cannot create job summaries.") --- End diff -- aah. in the move to require() everything is going back onto a single line. so now moot --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144065074 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + +s" and cannot create job summaries.") --- End diff -- aah --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144065041 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +spark.stop() +spark = null --- End diff -- done, + will add a check for spark==null so if a failure happens during setup, the exception doesn't get lost in teardown --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 @viirya : the new data writer API will allow for a broader set of stats to be propagated back from workers. When you are working with the object stores, an useful stat to get back is throttle count & retry count as they can be the cause of why things are slow ... and if it is due to throttling, throwing more workers at the job will actually slow things down. They'd be the ones to look at first --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143992362 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +spark.stop() +spark = null + } + + test("alternative output committer, merge schema") { +intercept[RuntimeException] { + val stat = writeDataFrame(MarkingFileOutput.COMMITTER, true, true) + logError(s"Created marker file $stat") +} + } + + test("alternative output committer, no merge schema") { +writeDataFrame(MarkingFileOutput.COMMITTER, false, true) --- End diff -- OK --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143992319 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + +s" and cannot create job summaries.") --- End diff -- Depends on the policy about "what to do if it's not a parquet committer *and* the option for job summaries is set. It could just mean "you don't get summaries", which worksforme :). May want to log at info though? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143992018 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + --- End diff -- will do --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 Has anyone had a look at this recently? The problem still exists, and while downstream filesystems can address if they recognise the use case & lie about values, they will be returning invalid values to the caller: spark will be reporting the wrong values. At least with this PR Spark will get to make the decisions about how to react itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19269 +1 for the ability to return statistics: the remote stores have lots of information which committers may return --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r143530841 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.writer.*; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleWritableDataSource implements DataSourceV2, ReadSupport, WriteSupport { + private StructType schema = new StructType().add("i", "long").add("j", "long"); + + class Reader implements DataSourceV2Reader { +private String path; + +Reader(String path) { + this.path = path; +} + +@Override +public StructType readSchema() { + return schema; +} + +@Override +public List> createReadTasks() { + return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path)); +} + } + + static class JavaSimpleCSVReadTask implements ReadTask, DataReader { +private String path; +private volatile Iterator lines; +private volatile String currentLine; + +JavaSimpleCSVReadTask(Iterator lines) { + this.lines = lines; +} + +JavaSimpleCSVReadTask(String path) { + this.path = path; +} + +@Override +public DataReader createReader() { + assert path != null; + try { +if (Files.exists(Paths.get(path))) { + return new JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator()); +} else { + return new JavaSimpleCSVReadTask(Collections.emptyIterator()); +} + } catch (IOException e) { +throw new RuntimeException(e); --- End diff -- I see. I'd just declare `testXYZ throws Throwable` and not worry about what gets raised internally: saves on try/catch translation logic --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19448 + @rdblue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
GitHub user steveloughran opened a pull request: https://github.com/apache/spark/pull/19448 [SPARK-22217] [SQL] ParquetFileFormat to support arbitrary OutputCommitters ## What changes were proposed in this pull request? `ParquetFileFormat` to relax its requirement of output committer class from `org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and implicitly Hadoop `FileOutputCommitter` to any committer implementing `org.apache.hadoop.mapreduce.OutputCommitter` This enables output committers which don't write to the filesystem the way `FileOutputCommitter` does to save parquet data from a dataframe: at present you cannot do this. Because a committer which isn't a subclass of `ParquetOutputCommitter`, it checks to see if the context has requested summary metadata by setting `parquet.enable.summary-metadata`. If true, and the committer class isn't a parquet committer, it raises a RuntimeException with an error message. (It could downgrade, of course, but raising an exception makes it clear there won't be an summary. It also makes the behaviour testable.) ## How was this patch tested? The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, `MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a marker file in the destination directory. The presence of the marker file can be used to verify the new committer was used. The tests then try the combinations of Parquet committer summary/no-summary and marking committer summary/no-summary. | committer | summary | outcome | |---|-|-| | parquet | true| success | | parquet | false | success | | marking | false | success with marker | | marking | true| exception | All tests are happy. You can merge this pull request into a Git repository by running: $ git pull https://github.com/steveloughran/spark cloud/SPARK-22217-committer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19448.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19448 commit e6fdbdcf4118283abd22f7b14586ed742d225657 Author: Steve Loughran Date: 2017-07-12T10:42:51Z SPARK-22217 tuning ParquetOutputCommitter to support any committer class, provided saveSummaries is disabled. With Tests Change-Id: I19872dc1c095068ed5a61985d53cb7258bd9a9bb --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19294 @szhem that null path support in `FileOutputCommitter` came with the App Master recovery work of [MAPREDUCE-3711](https://issues.apache.org/jira/browse/MAPREDUCE-3711); its, trying to minimise the amount of HDFS IO done during the recovery process. I don't think that's a codepath spark goes near; in the normal execution paths, `FileOutputFormat` & `FileOutputCommitter` will need output paths. (disclaimer: the more I read of that code, the less I understand it. do not treat my opinions as normative in any way) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19368: [SPARK-22146] FileNotFoundException while reading ORC fi...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19368 Looking @ this, things would be a lot less brittle if there wasn't a round trip Path -> String -> Path. I'm thinking of Windows paths here in particular. Other than tests, which pass in a string (possibly from File.getAbsolutePath); everything appears be downconverting the path and then up again. Is there any fundamental reason why Path isn't being passed around? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19269 One other thing that would be good now and invaluable in future is for the `DataWriter.commit()` call to return a `Map[String,Long]` of statistics alongside the message sent to the committer. The spec should say these statistics "MUST be specific to the writer/its thread", so that aggregating the stats across all tasks produces valid output. What does this to? Lets the writers provide real statistics about the cost of operations. If you look at the changes to {{FileFormatWriter}} to collect stats, it's just listing the written file after close() and returning file size as its sole metric. We are moving to instrumenting more of the Hadoop output streams with statistics collection, and, once there's an API for getting at the values, would allow the driver to aggregate stats from the writer for the writes and the commits. Examples: bytes written, files written, records written, # of 503/throttle events sent back from S3, # of network failures and retried operations, ...etc. Once the writers start collecting this data, there's motivation for the layers underneath to collect more and publish what they get. As an example, here's [the data collected by `S3AOutputStream`](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentat ion.java#L765), exposed via `OutputStream.toString()` as well as fed to hadoop metrics. As well as bytes written, it tracks blocks PUT, retries on completion operations, and how many times a block PUT failed and had to be retried. That means that a job can have results like "it took X seconds and wrote Y bytes but it had to repost Z bytes of data, which made things slow" There's a side issue: what is the proposed mandated re-entrancy policy in the writers? Is expectation that `DataWriter.write()` will always be called by a single thread, and therefore no need to implement thread safe writes to the output stream, or is there a risk that >1 thread may write sequentially (preventing thread local storage for collecting stats) or even simultaneously. (if so, the example is in trouble as the java.io APIs say no need to support re-entrancy, even if HDFS does. Again, this is the kind of thing where some specification can highlight the policy, otherwise people will code against the implementation, which is precisely why HDFS DFSOutputStreams are stuck doing thread-safety writes (HBase, see). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r142005126 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan) + extends RunnableCommand { + override def children: Seq[LogicalPlan] = query :: Nil + + override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { +assert(children.length == 1) + +val writeTask = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new RowToInternalRowDataWriteFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = children.head.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer") + +try { + sparkSession.sparkContext.runJob( +rdd, +(context: TaskContext, iter: Iterator[InternalRow]) => + DataWritingSparkTask.run(writeTask, context, iter), +rdd.partitions.indices, +(index, message: WriterCommitMessage) => messages(index) = message + ) + + writer.commit(messages) + logInfo(s"Data source writer $writer committed.") +} catch { + case cause: Throwable => +writer.abort() +logError(s"Data source writer $writer aborted.") +throw new SparkException("Writing job aborted.", cause) + +} + +Nil + } +} + +object DataWritingSparkTask extends Logging { + def run( + writeTask: DataWriteFactory[InternalRow], + context: TaskContext, + iter: Iterator[InternalRow]): WriterCommitMessage = { +val dataWriter = + writeTask.createWriter(context.stageId(), context.partitionId(), context.attemptNumber()) + +// write the data and commit this writer. +Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + iter.foreach(dataWriter.write) + dataWriter.commit() --- End diff -- good to log something here, at least @ debug --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r142005072 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan) + extends RunnableCommand { + override def children: Seq[LogicalPlan] = query :: Nil + + override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { +assert(children.length == 1) + +val writeTask = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new RowToInternalRowDataWriteFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = children.head.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer") --- End diff -- maybe add #of partitions in the log, helps provide a hint of how long it's going to take. If a job hangs, this'll be the last entry in the log, so it's good to be informative --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r142004971 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.writer.*; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleWritableDataSource implements DataSourceV2, ReadSupport, WriteSupport { + private StructType schema = new StructType().add("i", "long").add("j", "long"); + + class Reader implements DataSourceV2Reader { +private String path; + +Reader(String path) { + this.path = path; +} + +@Override +public StructType readSchema() { + return schema; +} + +@Override +public List> createReadTasks() { + return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path)); +} + } + + static class JavaSimpleCSVReadTask implements ReadTask, DataReader { +private String path; +private volatile Iterator lines; +private volatile String currentLine; + +JavaSimpleCSVReadTask(Iterator lines) { + this.lines = lines; +} + +JavaSimpleCSVReadTask(String path) { + this.path = path; +} + +@Override +public DataReader createReader() { + assert path != null; + try { +if (Files.exists(Paths.get(path))) { + return new JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator()); +} else { + return new JavaSimpleCSVReadTask(Collections.emptyIterator()); +} + } catch (IOException e) { +throw new RuntimeException(e); + } +} + +@Override +public boolean next() { + if (lines.hasNext()) { +currentLine = lines.next(); +return true; + } else { +return false; + } +} + +@Override +public Row get() { + String[] values = currentLine.split(","); + assert values.length == 2; + long l1 = Long.valueOf(values[0]); + long l2 = Long.valueOf(values[1]); + return new GenericRow(new Object[] {l1, l2}); +} + +@Override +public void close() throws IOException { + +} + } + + @Override + public DataSourceV2Reader createReader(DataSourceV2Options options) { +return new Reader(options.get("path").get()); + } + + + class Writer implements DataSourceV2Writer { +private String path; + +Writer(String path) { + this.path = path; +} + +@Override +public DataWriteFacto
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r142004889 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.writer.*; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleWritableDataSource implements DataSourceV2, ReadSupport, WriteSupport { + private StructType schema = new StructType().add("i", "long").add("j", "long"); + + class Reader implements DataSourceV2Reader { +private String path; + +Reader(String path) { + this.path = path; +} + +@Override +public StructType readSchema() { + return schema; +} + +@Override +public List> createReadTasks() { + return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path)); +} + } + + static class JavaSimpleCSVReadTask implements ReadTask, DataReader { +private String path; +private volatile Iterator lines; +private volatile String currentLine; + +JavaSimpleCSVReadTask(Iterator lines) { + this.lines = lines; +} + +JavaSimpleCSVReadTask(String path) { + this.path = path; +} + +@Override +public DataReader createReader() { + assert path != null; + try { +if (Files.exists(Paths.get(path))) { + return new JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator()); +} else { + return new JavaSimpleCSVReadTask(Collections.emptyIterator()); +} + } catch (IOException e) { +throw new RuntimeException(e); + } +} + +@Override +public boolean next() { + if (lines.hasNext()) { +currentLine = lines.next(); +return true; + } else { +return false; + } +} + +@Override +public Row get() { + String[] values = currentLine.split(","); + assert values.length == 2; + long l1 = Long.valueOf(values[0]); + long l2 = Long.valueOf(values[1]); + return new GenericRow(new Object[] {l1, l2}); +} + +@Override +public void close() throws IOException { + +} + } + + @Override + public DataSourceV2Reader createReader(DataSourceV2Options options) { +return new Reader(options.get("path").get()); + } + + + class Writer implements DataSourceV2Writer { +private String path; + +Writer(String path) { + this.path = path; +} + +@Override +public DataWriteFacto
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r142004831 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.writer.*; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleWritableDataSource implements DataSourceV2, ReadSupport, WriteSupport { + private StructType schema = new StructType().add("i", "long").add("j", "long"); + + class Reader implements DataSourceV2Reader { +private String path; + +Reader(String path) { + this.path = path; +} + +@Override +public StructType readSchema() { + return schema; +} + +@Override +public List> createReadTasks() { + return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path)); +} + } + + static class JavaSimpleCSVReadTask implements ReadTask, DataReader { +private String path; +private volatile Iterator lines; +private volatile String currentLine; + +JavaSimpleCSVReadTask(Iterator lines) { + this.lines = lines; +} + +JavaSimpleCSVReadTask(String path) { + this.path = path; +} + +@Override +public DataReader createReader() { + assert path != null; + try { +if (Files.exists(Paths.get(path))) { + return new JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator()); +} else { + return new JavaSimpleCSVReadTask(Collections.emptyIterator()); +} + } catch (IOException e) { +throw new RuntimeException(e); + } +} + +@Override +public boolean next() { + if (lines.hasNext()) { +currentLine = lines.next(); +return true; + } else { +return false; + } +} + +@Override +public Row get() { + String[] values = currentLine.split(","); + assert values.length == 2; + long l1 = Long.valueOf(values[0]); + long l2 = Long.valueOf(values[1]); + return new GenericRow(new Object[] {l1, l2}); +} + +@Override +public void close() throws IOException { + +} + } + + @Override + public DataSourceV2Reader createReader(DataSourceV2Options options) { +return new Reader(options.get("path").get()); + } + + + class Writer implements DataSourceV2Writer { +private String path; + +Writer(String path) { + this.path = path; +} + +@Override +public DataWriteFacto
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r142004814 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.writer.*; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleWritableDataSource implements DataSourceV2, ReadSupport, WriteSupport { + private StructType schema = new StructType().add("i", "long").add("j", "long"); + + class Reader implements DataSourceV2Reader { +private String path; + +Reader(String path) { + this.path = path; +} + +@Override +public StructType readSchema() { + return schema; +} + +@Override +public List> createReadTasks() { + return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path)); +} + } + + static class JavaSimpleCSVReadTask implements ReadTask, DataReader { +private String path; +private volatile Iterator lines; +private volatile String currentLine; + +JavaSimpleCSVReadTask(Iterator lines) { + this.lines = lines; +} + +JavaSimpleCSVReadTask(String path) { + this.path = path; +} + +@Override +public DataReader createReader() { + assert path != null; + try { +if (Files.exists(Paths.get(path))) { + return new JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator()); +} else { + return new JavaSimpleCSVReadTask(Collections.emptyIterator()); +} + } catch (IOException e) { +throw new RuntimeException(e); --- End diff -- Why the translation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r142004778 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan) + extends RunnableCommand { + override def children: Seq[LogicalPlan] = query :: Nil + + override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { +assert(children.length == 1) + +val writeTask = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new RowToInternalRowDataWriteFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = children.head.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer") + +try { + sparkSession.sparkContext.runJob( +rdd, +(context: TaskContext, iter: Iterator[InternalRow]) => + DataWritingSparkTask.run(writeTask, context, iter), +rdd.partitions.indices, +(index, message: WriterCommitMessage) => messages(index) = message + ) + + writer.commit(messages) + logInfo(s"Data source writer $writer committed.") +} catch { + case cause: Throwable => +writer.abort() --- End diff -- this may raise an exception too...better to use `Utils.tryWithSafeFinallyAndFailureCallbacks()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19269 People may know that I'm busy with some S3 committers which work with Hadoop MapReduce & Spark, with an import of Ryan's commtter into the Hadoop codebase. Thisa includes changes to s3a to support that and alternative design which relies on a consistent s3, a spark committer (not in the spark codebase itself) to handle it there, plust the tests & documentation of how things actually commit work. For that, the conclusion I've reached is: nobody really knows what's going on, and its a miracle things work at all. FWIW, I think I now have what is the [closest thing to documentation](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md) of what goes on at the Hadoop FS Committer layer, based on code, tests, stepping through operations in an IDE, and other archaeology. Based on that experience, I think a key deliverable ought to be some specification of what a committer does. I know there are bits in the javadocs like "Implementations should handle this case correctly", but without a definition of "correct" its hard to point at an implementation and say "you've got it wrong". I would propose supplementing the code with 1. A rigorous specification, including possible workflows. scala & RFC2119 keywords, maybe. 1. Tests against that, including attempts to simulate the failure modes, all the orders of execution which aren't expected to be valid, etc. Some general questions related to this: * If a writer doesn't support speculation, can it say so? I know speculation and failure recovery are related, but task retry after failure is linearized, whereas speculation can happen in parallel. * Is it possible to instantiate a second writer and say "abort the output of the other writer" (on a different host?). This allows for cleanup of a task's work after the failure of the entire executor. If it's not possible, then the job commit must be required to clean up. Maybe: pass to the job commit information about failing tasks, so it has more of an idea what to do. (Hadoop MapReduce example: AM calls abortTask() for all failing containers before instantiating a new one and retrying) * MAY the intermediate output of a task be observable to others? * MAY the committed output of a task observable to others? If so, what does this mean for readers? Is it something which a write may wish to declare/warn callers? * What if `DataWriter.commit()` just doesn't return/the executor fails during that commit process? Is that a failure of the entire job vs task? (FWIW MR algorithm 1 handles this, algorithm 2 doesn't). * What if `writer.abort()` raises an exception ? (not unusual if cause of commit failure is network/auth problem) * What if `writer.abort()` is called before any other operation on that writer? Better be a no-op. * What if `DataSourceV2Writer.commit()` fails? Can it be retried? (Easiest to say no, obviously). * If, after a `DataSourceV2Writer.commit()` fails, can `DataSourceV2Writer.abort()` be called? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r142004644 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a write task which will be serialized and sent to executors. For each partition of the + * input data(RDD), there will be one write task to write the records. + */ + WriteTask createWriteTask(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * all data writers for this writing job and are produced by {@link DataWriter#commit()}. This + * also means all the data are written successfully and all data writers are committed. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed to write the records and aborted. + */ + void abort(); --- End diff -- Knowing what's being committed can provide a bit more information as to what is going on, and could be appreciated for that. The biggest issue I fear is loss of the driver itself, so *no* abort() call is made. A strategy for cleaning up from that would be good, even though its primarily one of bringing up a new writer and saying "cleanup this mess a previous instance left". Looking the S3 example, my strategy there is/would be: identify all pending commits, abort them, then clean up the dest dir. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...
Github user steveloughran closed the pull request at: https://github.com/apache/spark/pull/17745 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140658582 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -130,17 +135,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") -val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) -for ((src, dst) <- filesToMove) { - fs.rename(new Path(src), new Path(dst)) +if (hasAbsPathFiles) { + val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) + for ((src, dst) <- filesToMove) { +fs.rename(new Path(src), new Path(dst)) + } + fs.delete(absPathStagingDir, true) } -fs.delete(absPathStagingDir, true) --- End diff -- can do, now you've got a little mock committer in someone can just extend it to optionally throw an IOE in abort(). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17743: [SPARK-20448][DOCS] Document how FileInputDStream works ...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17743 People don't realise how much object stores aren't file systems until they discover all their assumptions are broken. Once you know how they work, you can set up a workflow which is more efficient and reliable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19294 As I play with commit logic all the way through the stack, I can' t help thinking everyone's lives would be better if we tagged the MRv1 commit APIs as deprecated in Hadoop 3. and uses of the commit protocols went fully onto the v2 committers: one codepath to get confused by, half as much complexity. The issue with the custom stuff is inevitably Hive related, isn't it? It's always liked to scatter data around a filesystem and pretend its a single dataset --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140188088 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -130,17 +135,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") -val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) -for ((src, dst) <- filesToMove) { - fs.rename(new Path(src), new Path(dst)) +if (hasAbsPathFiles) { + val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) + for ((src, dst) <- filesToMove) { +fs.rename(new Path(src), new Path(dst)) + } + fs.delete(absPathStagingDir, true) } -fs.delete(absPathStagingDir, true) --- End diff -- Given the changes being made here, it seems a good place to add the suggestion of [SPARK-20045](https://issues.apache.org/jira/browse/SPARK-20045) & make that abort() call resilient to failures, by doing that delete even if the hadoop committer raised an IOE --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17745: [SPARK-17159][Streaming] optimise check for new files in...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17745 Due to lack of support/interest, moved to https://github.com/hortonworks-spark/cloud-integration --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...
Github user steveloughran closed the pull request at: https://github.com/apache/spark/pull/17747 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140008216 --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala --- @@ -568,6 +568,51 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { assert(FakeWriterWithCallback.exception.getMessage contains "failed to write") } + test("saveAsNewAPIHadoopDataset should use current working directory " + +"for files to be committed to an absolute output location when empty output path specified") { +val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) + +val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration)) +job.setOutputKeyClass(classOf[Integer]) +job.setOutputValueClass(classOf[Integer]) +job.setOutputFormatClass(classOf[NewFakeFormat]) +val jobConfiguration = job.getConfiguration + +val fs = FileSystem.get(jobConfiguration) +fs.setWorkingDirectory(new Path(getClass.getResource(".").toExternalForm)) +try { + // just test that the job does not fail with + // java.lang.IllegalArgumentException: Can not create a Path from a null string + pairs.saveAsNewAPIHadoopDataset(jobConfiguration) +} finally { + // close to prevent filesystem caching across different tests + fs.close() +} + } + + test("saveAsHadoopDataset should use current working directory " + +"for files to be committed to an absolute output location when empty output path specified") { +val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) + +val conf = new JobConf() +conf.setOutputKeyClass(classOf[Integer]) +conf.setOutputValueClass(classOf[Integer]) +conf.setOutputFormat(classOf[FakeOutputFormat]) +conf.setOutputCommitter(classOf[FakeOutputCommitter]) + +val fs = FileSystem.get(conf) +fs.setWorkingDirectory(new Path(getClass.getResource(".").toExternalForm)) +try { + FakeOutputCommitter.ran = false + pairs.saveAsHadoopDataset(conf) +} finally { + // close to prevent filesystem caching across different tests + fs.close() --- End diff -- again, you don't need this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140008084 --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala --- @@ -568,6 +568,51 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { assert(FakeWriterWithCallback.exception.getMessage contains "failed to write") } + test("saveAsNewAPIHadoopDataset should use current working directory " + +"for files to be committed to an absolute output location when empty output path specified") { +val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) + +val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration)) +job.setOutputKeyClass(classOf[Integer]) +job.setOutputValueClass(classOf[Integer]) +job.setOutputFormatClass(classOf[NewFakeFormat]) +val jobConfiguration = job.getConfiguration + +val fs = FileSystem.get(jobConfiguration) +fs.setWorkingDirectory(new Path(getClass.getResource(".").toExternalForm)) +try { + // just test that the job does not fail with + // java.lang.IllegalArgumentException: Can not create a Path from a null string + pairs.saveAsNewAPIHadoopDataset(jobConfiguration) +} finally { + // close to prevent filesystem caching across different tests + fs.close() --- End diff -- avoid. Either use FileSystem.newInstance() or skip the close. Given you aren't playing with low-level FS options, its faster and more efficient to reuse --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to han...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18111 thx --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to han...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18111 I believe this patch implements the original design goal: if a committer doesn't have a working path supplied by `getWorkingPath()` then it downgrades. It might be worthwhile doing a larger fault-injecting committer at some point. have indirectly the ability to inject faults, but its a long way from the spark codebase, when really a thin shim is all that's needed. In particular, if a fault injecting ParquetOutputCommitter would test a big chunk of the codebase whose error handling codepaths are more likely to be tested in production than by jenkins. Are people interested (separate JIRA, obviously) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 Related to this, updated spec on [Hadoop output stream, Syncable and StreamCapabilities](https://github.com/steveloughran/hadoop/blob/s3/HADOOP-13327-outputstream-trunk/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md). As the doc notes, object stores != filesystems, and while a lot can be done to preserve the metaphor on input, its on output where CRUD inconsistencies surface. along with the logic as "does a 0-byte file get created in create()", "when is data written?", etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18979#discussion_r134460176 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala --- @@ -57,7 +60,14 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) private def getFileSize(filePath: String): Long = { val path = new Path(filePath) val fs = path.getFileSystem(hadoopConf) -fs.getFileStatus(path).getLen() +try { + fs.getFileStatus(path).getLen() +} catch { + case e: FileNotFoundException => +// may arise against eventually consistent object stores +logInfo(s"File $path is not yet visible", e) +0 --- End diff -- +I could a comment in the docs somewhere to state that metrics in the cloud may not be consistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17342: [SPARK-12868][SQL] Allow adding jars from hdfs
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17342 @Chopinxb no worries; the hard part is thinking how to fix this. I don't see it being possible to do reliably except through an explicit download. Hadoop 2.8+ has moved off commons-logging so this problem *may* have gone away. However, there are too many dependencies to be confident that will hold --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 @adrian-ionescu wrote > is there a need for calling getFinalStats() more than once? No. As long as everyone is aware of it, it won't be an issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 > To mimic S3-like behavior, you can overwrite the file system spark.hadoop.fs.$scheme.impl" @gatorsmile: you will be able to do something better soon, as S3A is adding an inconsistent AWS client into `hadoop-aws` JAR, which you can then enable to guarantee consistency delays and inject intermittent faults into the system (throttling, transient network events). All it will take is a config option to switch to this client, plus the chaos-monkey-esque probabilities and delays. This is what I'm already using âyou will be able to as well. That is, no need to switch clients, just go`spark.hadoop.fs.s3a.s3.client.factory.impl=org.apache.hadoop.fs.s3a.InconsistentS3ClientFactory` and wait for the stack traces. The S3A FS itself [needs to do more](https://issues.apache.org/jira/browse/HADOOP-14531) to handle throttling & failures (retry, add failure metrics so throttling & error rates can be measured). Knowing throttling rates is important as it will help identify perf problems due to bad distribution of work across a bucket, excess use of KMS key lookup..., things that in surface in support calls. This patch restores Spark 2.3 to the behaviour it has in Spark 2.2: a brief delay between object creation and visibility does not cause the task to fail --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 Currently *nobody should be using s3a:// at the the temp file destination*, which is the same as saying "nobody should be using s3a:// as the direct destination of work", not without a special committer (Netflix, IBM's stocator, ...) or without something to give S3 list consistency. Because today, task commit relies on a list & rename of all files in the task attempt dir, and if you don't get list consistency, you can miss out on files. If you ever hear anyone complaining "it takes too long to commit to s3" then they are using it this way. Tell them to use a consistency layer or to stop it :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18979#discussion_r133919035 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.nio.charset.Charset + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +/** + * Test how BasicWriteTaskStatsTracker handles files. + */ +class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { + + private val tempDir = Utils.createTempDir() + private val tempDirPath = new Path(tempDir.toURI) + private val conf = new Configuration() + private val localfs = tempDirPath.getFileSystem(conf) + private val data1 = "0123456789".getBytes(Charset.forName("US-ASCII")) + private val data2 = "012".getBytes(Charset.forName("US-ASCII")) + private val len1 = data1.length + private val len2 = data2.length + + /** + * In teardown delete the temp dir. + */ + protected override def afterAll(): Unit = { +Utils.deleteRecursively(tempDir) + } + + /** + * Assert that the stats match that expected. + * @param tracker tracker to check + * @param files number of files expected + * @param bytes total number of bytes expected + */ + private def assertStats( + tracker: BasicWriteTaskStatsTracker, + files: Int, + bytes: Int): Unit = { +val stats = finalStatus(tracker) +assert(files === stats.numFiles, "Wrong number of files") +assert(bytes === stats.numBytes, "Wrong byte count of file size") + } + + private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = { +tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] + } + + test("No files in run") { +val tracker = new BasicWriteTaskStatsTracker(conf) +assertStats(tracker, 0, 0) + } + + test("Missing File") { +val missing = new Path(tempDirPath, "missing") +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile(missing.toString) +assertStats(tracker, 1, 0) + } + + test("Empty filename is forwarded") { +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile("") +intercept[IllegalArgumentException] { + finalStatus(tracker) +} + } + + test("Null filename is only picked up in final status") { +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile(null) +intercept[IllegalArgumentException] { + finalStatus(tracker) +} + } + + test("0 byte file") { +val file = new Path(tempDirPath, "file0") +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile(file.toString) +touch(file) +assertStats(tracker, 1, 0) --- End diff -- I'm assuming that the file will *eventually* come into existence; that its absence straight after collection is simply a transient create inconsistency of the endpoint, like a brief caching of negative HEAD/GET requests (which AWS S3 does do as part of its DoS defences). The files will be there later. One option: count the #of missing files and include that in the report. It shouldn't be a metric most of the time though: never on a "real" FS or consistent object store, rarely on an inconsistent one --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does no
[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18979#discussion_r133918269 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala --- @@ -57,7 +60,14 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) private def getFileSize(filePath: String): Long = { val path = new Path(filePath) val fs = path.getFileSystem(hadoopConf) -fs.getFileStatus(path).getLen() +try { + fs.getFileStatus(path).getLen() +} catch { + case e: FileNotFoundException => +// may arise against eventually consistent object stores +logInfo(s"File $path is not yet visible", e) +0 --- End diff -- The problem is: what can be picked up if the file isn't yet reported as present by the endpoint? Adding a bool to say "results are unreliable" could be used as a warning. One thing to consider long term is: if hadoop FS output streams added a simple map of statistics, could they be picked up by committers & then aggregated in job reports. Hadoop filesystems have statistics (simple ones in Hadoop <= 2.7, an arbitrary map of String -> Long in 2.8 (with [standard key names across filesystems](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java)), and certainly today S3ABlockOutputStream [collects stats on individual streams](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java#L756). If that was made visible and collected, you could get a lot more detail on what is going on. Thoughts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18979#discussion_r133913173 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.nio.charset.Charset + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +/** + * Test how BasicWriteTaskStatsTracker handles files. + */ +class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { + + private val tempDir = Utils.createTempDir() + private val tempDirPath = new Path(tempDir.toURI) + private val conf = new Configuration() + private val localfs = tempDirPath.getFileSystem(conf) + private val data1 = "0123456789".getBytes(Charset.forName("US-ASCII")) + private val data2 = "012".getBytes(Charset.forName("US-ASCII")) + private val len1 = data1.length + private val len2 = data2.length + + /** + * In teardown delete the temp dir. + */ + protected override def afterAll(): Unit = { +Utils.deleteRecursively(tempDir) + } + + /** + * Assert that the stats match that expected. + * @param tracker tracker to check + * @param files number of files expected + * @param bytes total number of bytes expected + */ + private def assertStats( + tracker: BasicWriteTaskStatsTracker, + files: Int, + bytes: Int): Unit = { +val stats = finalStatus(tracker) +assert(files === stats.numFiles, "Wrong number of files") +assert(bytes === stats.numBytes, "Wrong byte count of file size") + } + + private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = { +tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] + } + + test("No files in run") { +val tracker = new BasicWriteTaskStatsTracker(conf) +assertStats(tracker, 0, 0) + } + + test("Missing File") { +val missing = new Path(tempDirPath, "missing") +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile(missing.toString) +assertStats(tracker, 1, 0) + } + + test("Empty filename is forwarded") { +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile("") +intercept[IllegalArgumentException] { + finalStatus(tracker) +} + } + + test("Null filename is only picked up in final status") { +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile(null) +intercept[IllegalArgumentException] { + finalStatus(tracker) +} + } + + test("0 byte file") { +val file = new Path(tempDirPath, "file0") +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile(file.toString) +touch(file) +assertStats(tracker, 1, 0) + } + + test("File with data") { +val file = new Path(tempDirPath, "file-with-data") +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile(file.toString) +write1(file) +assertStats(tracker, 1, len1) + } + + test("Open file") { +val file = new Path(tempDirPath, "file-open") +val tracker = new BasicWriteTaskStatsTracker(conf) +tracker.newFile(file.toString) +val stream = localfs.create(file, true) +try { + assertStats(tracker, 1, 0) + stream.write(data1) + stream.flush() + assert(1 === finalStatus(tracker).numFiles, "Wrong number of file
[GitHub] spark pull request #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTas...
GitHub user steveloughran opened a pull request: https://github.com/apache/spark/pull/18979 [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible ## What changes were proposed in this pull request? `BasicWriteTaskStatsTracker.getFileSize()` to catch `FileNotFoundException`, log @ info and then return 0 as a file size. This ensures that if a newly created file isn't visible due to the store not always having create consistency, the metric collection doesn't cause the failure. ## How was this patch tested? New test suite included, `BasicWriteTaskStatsTrackerSuite`. This not only checks the resilience to missing files, but verifies the existing logic as to how file statistics are gathered. Note that in the current implementation 1. if you call `Tracker..getFinalStats()` more than once, the file size count will increase by size of the last file. This could be fixed by clearing the filename field inside `getFinalStats()` itself. 2. If you pass in an empty or null string to `Tracker.newFile(path)` then IllegalArgumentException is raised, but only in `getFinalStats()`, rather than in `newFile`. There's a test for this behaviour in the new suite, as it verifies that only FNFEs get swallowed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/steveloughran/spark cloud/SPARK-21762-missing-files-in-metrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18979.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18979 commit 8ad28b9bcd6a56b963ab57a5b4937d10f492de33 Author: Steve Loughran Date: 2017-08-17T19:35:35Z SPARK-21762 handle FNFE events in BasicWriteStatsTracker; add a suite of tests for various file states. Change-Id: I3269cb901a38b33e399ebef10b2dbcd51ccf9b75 commit 2a113fde1653743a3543df8ada395f320b826a3e Author: Steve Loughran Date: 2017-08-17T20:01:50Z SPARK-21762 add tests for "" and null filenames Change-Id: I38ac11c808849e2fd91f4931f4cb5cdfad43e2af --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18111#discussion_r133751724 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -73,7 +73,10 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val stagingDir: String = committer match { // For FileOutputCommitter it has its own staging path called "work path". - case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path) + case f: FileOutputCommitter => +val workPath = f.getWorkPath +require(workPath != null, s"Committer has no workpath $f") +Option(workPath.toString).getOrElse(path) --- End diff -- {{workPath.toString()}} triggers an NPE, which was the reason for the stack trace. Now, what's this code trying to do? find a directory to put stuff. If the committer is a subclass of `FileOutputFormat`, then its getWorkPath method can be called, which *should* return a non-null path. If the requirement of the code is "get the workpath or return "path" if its null", maybe the code should really be ```scala Option(f.getWorkPath).getOrElse(path).toString ``` That'd return the workPath if non null, falling back to the `path` variable, and then call `toString` on the returned object. I'm not sure off the top of my head if Scala type inference likes that though. It may be less elegant and more reliable to have ```scala val workPath = f.getWorkPath if (workPath != null) workPath.toString else path; ``` +maybe a bonus paranoid check for workPath being "". I think that would actually get closer to the original goal of the code: always return a path, even of the committer doesn't have one --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17743: [SPARK-20448][DOCS] Document how FileInputDStream works ...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17743 Just reread this; still looks correct. Review comments welcome --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17342: [SPARK-12868][SQL] Allow adding jars from hdfs
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17342 Created: [SPARK-21697](https://issues.apache.org/jira/browse/SPARK-21697) with the stack trace attached --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17342: [SPARK-12868][SQL] Allow adding jars from hdfs
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17342 I'm going to recommend you file a SPARK bug on issues.apache.org there & an HDFS linked to it "NPE in BlockReaderFactory log init". It looks like the creation of the LOG for BlockReader is triggering introspection which is triggering the BlockReaderFactory to do something before its fully inited, and then possibly NPE-ing as the LOG field is null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14601: [SPARK-13979][Core] Killed executor is re spawned withou...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/14601 I know this hasn't been updated, but it is still important. I can take it on if all it needs is a test case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18628: [SPARK-18061][ThriftServer] Add spnego auth support for ...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18628 Thanks for making sure this is consistent with other uses of Configuration.get(); consistency is critical here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18668#discussion_r131095350 --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala --- @@ -50,6 +50,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { private val prompt = "spark-sql" private val continuedPrompt = "".padTo(prompt.length, ' ') private var transport: TSocket = _ + private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop." --- End diff -- good point. I see `spark.hive` in some of my configs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18668#discussion_r131094720 --- Diff: docs/configuration.md --- @@ -2335,5 +2335,61 @@ The location of these configuration files varies across Hadoop versions, but a common location is inside of `/etc/hadoop/conf`. Some tools create configurations on-the-fly, but offer a mechanisms to download copies of them. -To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh` +To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh` to a location containing the configuration files. + +# Custom Hadoop/Hive Configuration + +If your Spark applications interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive +configuration files in Spark's class path. + +Multiple running applications might require different Hadoop/Hive client side configurations. +You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in +Spark's class path for each application, but it is not very convenient and these +files are best to be shared with common properties to avoid hard-coding certain configurations. + +The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`. +They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf` + +In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For +instance, Spark allows you to simply create an empty conf and set spark/spark hadoop properties. + +{% highlight scala %} +val conf = new SparkConf().set("spark.hadoop.abc.def","xyz") +val sc = new SparkContext(conf) +{% endhighlight %} + +Also, you can modify or add configurations at runtime: +{% highlight bash %} +./bin/spark-submit \ + --name "My app" \ + --master local[4] \ + --conf spark.eventLog.enabled=false \ + --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ + --conf spark.hadoop.abc.def=xyz \ + myApp.jar +{% endhighlight %} + +## Typical Hadoop/Hive Configurations + + + + spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version + 1 + +The file output committer algorithm version, valid algorithm version number: 1 or 2. +Version 2 may have better performance, but version 1 may handle failures better in certain situations, +as per https://issues.apache.org/jira/browse/MAPREDUCE-4815";>MAPREDUCE-4815. + + + + + spark.hadoop.fs.hdfs.impl.disable.cache --- End diff -- this is a pretty dangerous one to point people at, especially since it's fixed in future Hadoop versions & backported to some distros âand the cost of creating a new HDFS client on every worker can get very expensive if you have a spark process with many threads, all fielding work from the same user (thread pools, IPC connections, ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18668#discussion_r131093892 --- Diff: docs/configuration.md --- @@ -2335,5 +2335,61 @@ The location of these configuration files varies across Hadoop versions, but a common location is inside of `/etc/hadoop/conf`. Some tools create configurations on-the-fly, but offer a mechanisms to download copies of them. -To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh` +To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh` to a location containing the configuration files. + +# Custom Hadoop/Hive Configuration + +If your Spark applications interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive +configuration files in Spark's class path. + +Multiple running applications might require different Hadoop/Hive client side configurations. +You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in +Spark's class path for each application, but it is not very convenient and these +files are best to be shared with common properties to avoid hard-coding certain configurations. --- End diff -- "best shared" You can'd do that anyway on a production Spark on Yarn cluster as if you did., lots of other things would break. How about ``` In a Spark cluster running on YARN, these configuration files are set cluster-wide, and cannot safely be changed by the application. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18668#discussion_r131093320 --- Diff: docs/configuration.md --- @@ -2335,5 +2335,61 @@ The location of these configuration files varies across Hadoop versions, but a common location is inside of `/etc/hadoop/conf`. Some tools create configurations on-the-fly, but offer a mechanisms to download copies of them. -To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh` +To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh` to a location containing the configuration files. + +# Custom Hadoop/Hive Configuration + +If your Spark applications interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive --- End diff -- s/applications/r/application is/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18628: [SPARK-18061][ThriftServer] Add spnego auth suppo...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18628#discussion_r130598803 --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala --- @@ -57,6 +59,19 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC case e @ (_: IOException | _: LoginException) => throw new ServiceException("Unable to login to kerberos with given principal/keytab", e) } + + // Try creating spnego UGI if it is configured. + val principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL) + val keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB) --- End diff -- `HiveConf.getVar()` doesn't trim thhe results of leading/trailing via {{Configuration.getTrimmed()}}. Check for other uses of the confvar to see if there is any trimming of whitespace before and after their use: if so, you need to copy that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18628: [SPARK-18061][ThriftServer] Add spnego auth suppo...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18628#discussion_r130598230 --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala --- @@ -57,6 +59,19 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC case e @ (_: IOException | _: LoginException) => throw new ServiceException("Unable to login to kerberos with given principal/keytab", e) } + + // Try creating spnego UGI if it is configured. + val principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL) + val keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB) + if (principal.nonEmpty && keyTabFile.nonEmpty) { +try { + httpUGI = HiveAuthFactory.loginFromSpnegoKeytabAndReturnUGI(hiveConf) + setSuperField(this, "httpUGI", httpUGI) +} catch { + case e: IOException => +throw new ServiceException("Unable to login to spnego with given principal/keytab", e) +} --- End diff -- I'd recommend including as much diagnostics in the exception string, including principal & keytab, and the value of e.toString in the message ``` throw new ServiceException(s"Unable to login to spnego with principal `$principal and keytab `$keytab`: $e", e) ``` The scenario to plan for is "nothing works and all you have to go on is that string from the raised exception". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17747: [SPARK-11373] [CORE] Add metrics to the FsHistoryProvide...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17747 I know, I just have too many open JIRAs to try and manage --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17747: [SPARK-11373] [CORE] Add metrics to the FsHistoryProvide...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17747 Pushing up a new patched rebased to work with master. It's getting boring all round for this patch: me having to do a merge, retest, repush. How about finalising the review so we can be done with it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to fai...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18111 Is there anything else which needs to be one here, or is it matter of finding the right reviewer? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17747: [SPARK-11373] [CORE] Add metrics to the FsHistoryProvide...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17747 Mima test failure was about a new method in hist server ``` [info] spark-mllib: found 0 potential binary incompatibilities while checking against org.apache.spark:spark-mllib_2.11:2.0.0 (filtered 167) [info] spark-core: found 1 potential binary incompatibilities while checking against org.apache.spark:spark-core_2.11:2.0.0 (filtered 1041) [error] * method cacheMetrics()org.apache.spark.deploy.history.CacheMetrics in class org.apache.spark.deploy.history.HistoryServer does not have a correspondent in current version [error]filter with: ProblemFil ``` I'll make sure the scope for that is as restricted as possible; it should only be accessed for testing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #9518: [SPARK-11574][Core] Add metrics StatsD sink
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/9518 BTW, here are some ongoing Hadoop JIRAs related to its shipping statsd: [HADOOP-12360](https://issues.apache.org/jira/browse/HADOOP-12360?focusedCommentId=16034826&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16034826). other improvements in [HADOOP-13048](https://issues.apache.org/jira/browse/HADOOP-13048). Think there have been some complaints about how failures to publish are handled (speed @ which it gives up, etc). Worth looking at the JIRAs to see if there are any lessons there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #9518: [SPARK-11574][Core] Add metrics StatsD sink
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/9518#discussion_r123483576 --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.io.IOException +import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.SortedMap +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +import com.codahale.metrics._ +import org.apache.hadoop.net.NetUtils + +import org.apache.spark.Logging + +/** + * @see https://github.com/etsy/statsd/blob/master/docs/metric_types.md";> + *StatsD metric types + */ +private[spark] sealed trait StatsdMetricType { + val COUNTER = "c" + val GAUGE = "g" + val TIMER = "ms" + val Set = "s" +} + +private[spark] class StatsdReporter( +registry: MetricRegistry, +host: String = "127.0.0.1", +port: Int = 8125, +prefix: String = "", +filter: MetricFilter = MetricFilter.ALL, +rateUnit: TimeUnit = TimeUnit.SECONDS, +durationUnit: TimeUnit = TimeUnit.MILLISECONDS) + extends ScheduledReporter(registry, "statsd-reporter", filter, rateUnit, durationUnit) + with StatsdMetricType with Logging { + + private val address = new InetSocketAddress(host, port) + private val whitespace = "[\\s]+".r + + override def report( + gauges: SortedMap[String, Gauge[_]], + counters: SortedMap[String, Counter], + histograms: SortedMap[String, Histogram], + meters: SortedMap[String, Meter], + timers: SortedMap[String, Timer]): Unit = +Try(new DatagramSocket) match { + case Failure(ioe: IOException) => logWarning("StatsD datagram socket construction failed", +NetUtils.wrapException(host, port, "0.0.0.0", 0, ioe)) --- End diff -- Preferable to use {{NetUtils.getLocalHostname()}} for the source address, you'll appreciate it on large cluster diagnostics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14601: [SPARK-13979][Core] Killed executor is re spawned withou...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/14601 Testing should not be too hard. Here's my *untested* attempt ```scala val sconf = new SparkConf(false) sconf.set("fs.example.value", "true") val conf = new Configuration(false) new SparkHadoopUtil().appendS3AndSparkHadoopConfigurations(sconf, conf) assert( conf.getBoolean("fs.example.value", false)) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17747: [SPARK-11373] [CORE] Add metrics to the FsHistoryProvide...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/17747 I'm going to go with your suggestion and go via the metricServer to get at the state of counters and gauges; this is is actually better in that it will verify that all metrics are making their way through. This is just some background work, so expect a short delay before it gets updated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/17747#discussion_r122295075 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -164,6 +169,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** --- End diff -- cut it; just noting that something is actually returned --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/17747#discussion_r122294995 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -129,6 +131,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) + /** filesystem metrics: visible for test access */ --- End diff -- there are some which do; `"appui.load.count` for example --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/17747#discussion_r122279400 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala --- @@ -110,6 +117,14 @@ class HistoryServer( appCache.getSparkUI(appKey) } + val historyMetrics = new HistoryMetrics(this, "history.server") + + /** + * Provider metrics are None until the provider is started, and only after that + * point if the provider returns any. + */ + var providerMetrics: Option[Source] = None --- End diff -- not easily, would be the summary. making it a `private[history]` would lock down access though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17747: [SPARK-11373] [CORE] Add metrics to the FsHistory...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/17747#discussion_r122256050 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryMetricSource.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.JavaConverters._ + +import com.codahale.metrics.{Counter, Gauge, Metric, MetricFilter, MetricRegistry, Timer} + +import org.apache.spark.metrics.source.Source +import org.apache.spark.util.Clock + +/** + * An abstract implementation of the metrics [[Source]] trait with some common operations for + * retrieving entries; the `toString()` operation dumps all counters and gauges. + */ +private[history] abstract class HistoryMetricSource(val prefix: String) extends Source { + + override val metricRegistry = new MetricRegistry() + + /** + * Register a sequence of metrics + * @param metrics sequence of metrics to register + */ + def register(metrics: Seq[(String, Metric)]): Unit = { +metrics.foreach { case (name, metric) => + metricRegistry.register(fullname(name), metric) +} + } + + /** + * Create the full name of a metric by prepending the prefix to the name + * @param name short name + * @return the full name to use in registration + */ + def fullname(name: String): String = { +MetricRegistry.name(prefix, name) + } + + /** + * Dump the counters and gauges. + * @return a string for logging and diagnostics -not for parsing by machines. + */ + override def toString: String = { +val sb = new StringBuilder(s"Metrics for $sourceName:\n") +def robustAppend(s : => Long) = { + try { +sb.append(s) + } catch { +case e: Exception => + sb.append(s"(exception: $e)") + } +} + +sb.append(" Counters\n") + +metricRegistry.getCounters.asScala.foreach { case (name, counter) => +sb.append("").append(name).append(" = ") +.append(counter.getCount).append('\n') +} +sb.append(" Gauges\n") +metricRegistry.getGauges.asScala.foreach { case (name, gauge) => + sb.append("").append(name).append(" = ") + try { +sb.append(gauge.getValue) --- End diff -- yes, I'm trying to work why I *didn't* use that method given it was there. Widened it to a generic type and used. It's probably overkill, but as anything can be a Gauge, its possible that they will NPE or similar, and that just ruins logging & debugging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18247: [SPARK-13933][BUILD] Update hadoop-2.7 profile's curator...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18247 ..just caught this. No, no issues with it. A retrospective non-binding +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to fai...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18111 Not really. I thought about how I could do it, but essentially you do need to do things underneath the commit protocol, either in the Hadoop codebase (me) or in a test which somehow misconfigured things. It's most likely to surface in custom subclasses of `FileOutputCommitter`, so a test would probably consist of a run with a new committer set to always return null on `getWorkingPath`, verifying the changed assertion went from NPE to an IllegalArgException. That's a lot of complexity a test for what is essentially: making explicit a requirement of the code, namely, that `getWorkPath.toString()` requires `getWorkPath` to be non-null I've included the before/after stack traces in the JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/18111#discussion_r118530138 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -73,7 +73,10 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val stagingDir: String = committer match { // For FileOutputCommitter it has its own staging path called "work path". - case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path) + case f: FileOutputCommitter => +val workPath = f.getWorkPath +require(workPath != null, s"Committer has no workpath $f") +Option(workPath.toString).getOrElse(path) --- End diff -- If this was intended to be resilient, it'd have to become `Option(workpath).getOrElse(path).toString()`, though that would actually hide the problem I'd managed to create: hand in a job committer to a task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18111: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol...
GitHub user steveloughran opened a pull request: https://github.com/apache/spark/pull/18111 [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to fail meaningfully if FileOutputCommitter.getWorkPath==null ## What changes were proposed in this pull request? Handles the situation where a `FileOutputCommitter.getWorkPath()` returns `null` by a `require()` call and a message which explains the problem and includes the `toString` value of the committer for better diagnostics. The situation occurs if the committer being passed in is a job committer, not a task committer, that is: it was initalised with a `JobAttemptContext` not a `TaskAttemptContext`. The existing code does an `Option(workPath.toString).getOrElse(path)` which *may* be an attempt to handle the null path case. If so, it isn't, because its the `.toString()` call which is failing. If people do think that code should be resilient to null work paths, that line could be changed. However, it may hide the underlying problem: the committer is misconfigured. It may be a rare-occurence today, but it is more likely with modified subclasses of `FileOutputCommitter`, as well as possible with some ongoing work of mine in Hadoop to better support commitment to cloud storage infrastructures. ## How was this patch tested? Manually. The before & after stack traces are on the JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/steveloughran/spark cloud/SPARK-20886-committer-NPE Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18111 commit 02eb7bf0ee6b81841f22e3c46d822eaebb28e85c Author: Steve Loughran Date: 2017-05-25T15:46:50Z SPARK-20886 HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null Add a requirement. The existing code does an Option.getWorkpath.toString() which *may* be an attempt to handle the null path case. If so, it isn't, because its the .toString() which is failing. Change-Id: Idddf9813761e7008425542f96903bce12bedd978 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...
Github user steveloughran closed the pull request at: https://github.com/apache/spark/pull/9571 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org