[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user saeedehA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-167496362 Hello I'm sorry that I'm beginner in pyspark, but need to get .bin file as byte array and I don't understand anything about what I must do, is it possible that say me what I must do, step by step(from what I must import to function that must call)!! Thanks alot ( Sorry, I can't find any thing that help me, except here) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r22288031 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -510,6 +510,52 @@ class SparkContext(config: SparkConf) extends Logging { minPartitions).setName(path) } + + /** + * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file + * (useful for binary data) + * + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + @DeveloperApi + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, PortableDataStream)] = { +val job = new NewHadoopJob(hadoopConfiguration) +NewFileInputFormat.addInputPath(job, new Path(path)) +val updateConf = job.getConfiguration +new BinaryFileRDD( + this, + classOf[StreamInputFormat], + classOf[String], + classOf[PortableDataStream], + updateConf, + minPartitions).setName(path) + } + + /** + * Load data from a flat binary file, assuming each record is a set of numbers + * with the specified numerical format (see ByteBuffer), and the number of + * bytes per record is constant (see FixedLengthBinaryInputFormat) + * + * @param path Directory to the input data files + * @param recordLength The length at which to split the records + * @return An RDD of data with values, RDD[(Array[Byte])] + */ + def binaryRecords(path: String, recordLength: Int, +conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = { +conf.setInt(recordLength,recordLength) +val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path, + classOf[FixedLengthBinaryInputFormat], + classOf[LongWritable], + classOf[BytesWritable], + conf=conf) +val data = br.map{ case (k, v) = v.getBytes} --- End diff -- It turns out that `getBytes` returns a padded byte array, so I think you may need to copy / slice out the subarray with the data using `v.getLength`; see [HADOOP-6298: BytesWritable#getBytes is a bad name that leads to programming mistakes](https://issues.apache.org/jira/browse/HADOOP-6298) for more details. Using `getBytes` without `getLength` has caused bugs in Spark in the past: #2712. Is the use of `getBytes` in this patch a bug? Or is it somehow safe due to our use of FixedLengthBinaryInputFormat? If it is somehow safe, we should have a comment which explains this so that readers who know about the `getBytes` issue aren't confused (or better yet, an `assert` that `getBytes` returns an array of the expected length). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/1658 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61379142 Thanks @kmader, I merged this now. I manually amended the patch a bit to fix style issues (there were still a bunch of commas without spaces, etc), and I also changed the name of the recordLength property in Hadoop JobConfs to start with org.apache.spark so that it's less likely to clash with other Hadoop properties. Finally I marked this API as `@Experimental` for now since it's new in this release, though we can probably make it non-experimental in 1.3. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-60979360 @kmader btw if you don't have time to deal with these comments, let me know; I might be able to take the patch from where it is and implement them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user kmader commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r19582168 --- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +/** Allows better control of the partitioning + * + */ +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ +import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, SparkContext} +import org.apache.spark.input.StreamFileInputFormat + +private[spark] class BinaryFileRDD[T]( + sc : SparkContext, + inputFormatClass: Class[_ : StreamFileInputFormat[T]], + keyClass: Class[String], + valueClass: Class[T], + @transient conf: Configuration, + minPartitions: Int) + extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { + + + override def getPartitions: Array[Partition] = { +val inputFormat = inputFormatClass.newInstance +inputFormat match { + case configurable: Configurable = +configurable.setConf(conf) + case _ = +} +val jobContext = newJobContext(conf, jobId) +inputFormat.setMaxSplitSize(jobContext, minPartitions) --- End diff -- Sorry this function was named incorrectly, it ultimately calls ```setMaxSplitSize``` after calculating the maximum size based on the number of partitions, I have now renamed it accordingly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61030284 [Test build #22503 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22503/consoleFull) for PR 1658 at commit [`3c49a30`](https://github.com/apache/spark/commit/3c49a305033d76e07bca60a72600c0db544407dd). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61031018 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22502/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61031930 Thanks for the update, Kevin. Looks like Jenkins had some issues with git, will retry it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61031939 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61032240 [Test build #22505 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22505/consoleFull) for PR 1658 at commit [`3c49a30`](https://github.com/apache/spark/commit/3c49a305033d76e07bca60a72600c0db544407dd). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61035841 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22503/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61035835 [Test build #22503 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22503/consoleFull) for PR 1658 at commit [`3c49a30`](https://github.com/apache/spark/commit/3c49a305033d76e07bca60a72600c0db544407dd). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61037379 [Test build #22505 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22505/consoleFull) for PR 1658 at commit [`3c49a30`](https://github.com/apache/spark/commit/3c49a305033d76e07bca60a72600c0db544407dd). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(@transient isplit: CombineFileSplit,` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-61037383 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22505/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r19453138 --- Diff: core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala --- @@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) + /** + * Read a directory of binary files from HDFS, a local file system (available on all nodes), + * or any Hadoop-supported file system URI as a byte array. Each file is read as a single + * record and returned in a key-value pair, where the key is the path of each file, + * the value is the content of each file. + * + * p For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-0 + * hdfs://a-hdfs-path/part-1 + * ... + * hdfs://a-hdfs-path/part-n + * }}} + * + * Do + * `JavaPairRDDString, byte[] rdd = sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`, + * + * p then `rdd` contains + * {{{ + * (a-hdfs-path/part-0, its content) + * (a-hdfs-path/part-1, its content) + * ... + * (a-hdfs-path/part-n, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,minPartitions)) + + /** + * Read a directory of files as DataInputStream from HDFS, + * a local file system (available on all nodes), or any Hadoop-supported file system URI + * as a byte array. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each. + * + * p For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-0 + * hdfs://a-hdfs-path/part-1 + * ... + * hdfs://a-hdfs-path/part-n + * }}} + * + * Do + * `JavaPairRDDString,DataInputStream rdd = sparkContext.binaryFiles(hdfs://a-hdfs-path)`, + * + * p then `rdd` contains + * {{{ + * (a-hdfs-path/part-0, its content) + * (a-hdfs-path/part-1, its content) + * ... + * (a-hdfs-path/part-n, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryArrays(path: String, minPartitions: Int = defaultMinPartitions): --- End diff -- I'd still remove this. It's confusing to see the API in just one language, and with Java 8, the extra class will be a one-liner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-60709302 Thanks for the update, Kevin. Note that there are still a few comments from me on https://github.com/apache/spark/pull/1658/files, do you mind dealing with those? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r19453170 --- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +/** Allows better control of the partitioning + * + */ +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ +import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, SparkContext} +import org.apache.spark.input.StreamFileInputFormat + +private[spark] class BinaryFileRDD[T]( + sc : SparkContext, + inputFormatClass: Class[_ : StreamFileInputFormat[T]], + keyClass: Class[String], + valueClass: Class[T], + @transient conf: Configuration, + minPartitions: Int) + extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { + + + override def getPartitions: Array[Partition] = { +val inputFormat = inputFormatClass.newInstance +inputFormat match { + case configurable: Configurable = +configurable.setConf(conf) + case _ = +} +val jobContext = newJobContext(conf, jobId) +inputFormat.setMaxSplitSize(jobContext, minPartitions) --- End diff -- BTW this comment was important too, what is the meaning of this parameter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59882709 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21972/consoleFull) for PR 1658 at commit [`8ac288b`](https://github.com/apache/spark/commit/8ac288bc09e779f1b4c96dcb497ee4eca962439f). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59887629 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21972/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59887621 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21972/consoleFull) for PR 1658 at commit [`8ac288b`](https://github.com/apache/spark/commit/8ac288bc09e779f1b4c96dcb497ee4eca962439f). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(@transient isplit: CombineFileSplit,` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user kmader commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r19133684 --- Diff: core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala --- @@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) + /** + * Read a directory of binary files from HDFS, a local file system (available on all nodes), + * or any Hadoop-supported file system URI as a byte array. Each file is read as a single + * record and returned in a key-value pair, where the key is the path of each file, + * the value is the content of each file. + * + * p For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-0 + * hdfs://a-hdfs-path/part-1 + * ... + * hdfs://a-hdfs-path/part-n + * }}} + * + * Do + * `JavaPairRDDString, byte[] rdd = sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`, + * + * p then `rdd` contains + * {{{ + * (a-hdfs-path/part-0, its content) + * (a-hdfs-path/part-1, its content) + * ... + * (a-hdfs-path/part-n, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,minPartitions)) + + /** + * Read a directory of files as DataInputStream from HDFS, + * a local file system (available on all nodes), or any Hadoop-supported file system URI + * as a byte array. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each. + * + * p For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-0 + * hdfs://a-hdfs-path/part-1 + * ... + * hdfs://a-hdfs-path/part-n + * }}} + * + * Do + * `JavaPairRDDString,DataInputStream rdd = sparkContext.binaryFiles(hdfs://a-hdfs-path)`, + * + * p then `rdd` contains + * {{{ + * (a-hdfs-path/part-0, its content) + * (a-hdfs-path/part-1, its content) + * ... + * (a-hdfs-path/part-n, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryArrays(path: String, minPartitions: Int = defaultMinPartitions): --- End diff -- In scala it is easy to add separately, in java you need to create an anonymous class which is more of a hassle --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-60015558 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22011/consoleFull) for PR 1658 at commit [`6379be4`](https://github.com/apache/spark/commit/6379be487cfa91097f3591dfd05b8e87e09c2399). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(@transient isplit: CombineFileSplit,` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-60015564 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22011/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59805377 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21922/consoleFull) for PR 1658 at commit [`92bda0d`](https://github.com/apache/spark/commit/92bda0daf2fffeea0f1de9199fc71fe978a165c7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59805569 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21922/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59805565 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21922/consoleFull) for PR 1658 at commit [`92bda0d`](https://github.com/apache/spark/commit/92bda0daf2fffeea0f1de9199fc71fe978a165c7). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user kmader commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59832070 So I made the requested changes and added a few more tests, but the tests appear to have not run for a strange reason: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21922/console, the build runs out of memory with Maven but works fine in IntelliJ but I do not get any feedback on the style is there any single maven phase I can run to get that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59866772 There might've been some Jenkins issues recently; going to restart it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59866801 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59866791 BTW for the style, you can do sbt/sbt scalastyle locally if you want. Not sure there's a command in Maven. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59867124 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21950/consoleFull) for PR 1658 at commit [`92bda0d`](https://github.com/apache/spark/commit/92bda0daf2fffeea0f1de9199fc71fe978a165c7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-59867194 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21950/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18610904 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Get an RDD for a Hadoop-readable dataset as byte-streams for each file + * (useful for binary data) + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, Array[Byte])] = { +val job = new NewHadoopJob(hadoopConfiguration) +NewFileInputFormat.addInputPath(job, new Path(path)) +val updateConf = job.getConfiguration +new RawFileRDD( + this, + classOf[ByteInputFormat], + classOf[String], + classOf[Array[Byte]], + updateConf, + minPartitions).setName(path) + } + + /** + * Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file + * (useful for binary data) + * + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Care must be taken to close the files afterwards --- End diff -- Hey Kevin, is this `@note` still relevant? using addOnCompleteCallback you might be able to avoid this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18610930 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -510,6 +510,53 @@ class SparkContext(config: SparkConf) extends Logging { minPartitions).setName(path) } + + /** + * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file + * (useful for binary data) + * + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Care must be taken to close the files afterwards + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + @DeveloperApi --- End diff -- There's no need to have `@DeveloperApi` on this, as PortableDataStream is something we want to support. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18610978 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ ByteStreams, Closeables } +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{ FSDataInputStream, Path } +import org.apache.spark.annotation.DeveloperApi +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream } + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +private[spark] abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String, T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String, T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + * @note TaskAttemptContext is not serializable resulting in the confBytes construct + * @note CombineFileSplit is not serializable resulting in the splitBytes construct + */ +@DeveloperApi --- End diff -- No need for `@DeveloperApi` on this, we want to support it. It was only needed on the various input formats. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18611084 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ ByteStreams, Closeables } +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{ FSDataInputStream, Path } +import org.apache.spark.annotation.DeveloperApi +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream } + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +private[spark] abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String, T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String, T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + * @note TaskAttemptContext is not serializable resulting in the confBytes construct + * @note CombineFileSplit is not serializable resulting in the splitBytes construct + */ +@DeveloperApi --- End diff -- Actually, one thing that would help is to make this a `trait` (i.e. interface in Java), so users can't instantiate it, and then have a `private[spark] class PortableDataStreamImpl` for the implementation. We don't want to expose the constructor if we can. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18611129 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ ByteStreams, Closeables } +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{ FSDataInputStream, Path } +import org.apache.spark.annotation.DeveloperApi +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream } + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +private[spark] abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String, T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String, T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + * @note TaskAttemptContext is not serializable resulting in the confBytes construct + * @note CombineFileSplit is not serializable resulting in the splitBytes construct + */ +@DeveloperApi +class PortableDataStream(@transient isplit: CombineFileSplit, + @transient context: TaskAttemptContext, index: Integer) + extends Serializable { + // transient forces file to be reopened after being serialization + // it is also used for non-serializable classes + + @transient + private var fileIn: DataInputStream = null.asInstanceOf[DataInputStream] + @transient + private var isOpen = false + + private val confBytes = { +val baos = new ByteArrayOutputStream() +context.getConfiguration.write(new DataOutputStream(baos)) +baos.toByteArray + } + + private val splitBytes = { +val baos = new ByteArrayOutputStream() +isplit.write(new DataOutputStream(baos)) +baos.toByteArray + } + + @transient + private lazy val split = { +val bais = new ByteArrayInputStream(splitBytes) +val nsplit = new CombineFileSplit() +nsplit.readFields(new DataInputStream(bais)) +nsplit + } + + @transient + private lazy val conf = { +val bais = new ByteArrayInputStream(confBytes) +val nconf = new Configuration() +nconf.readFields(new DataInputStream(bais)) +nconf + } + /** + * Calculate the path name independently of opening the file + */ + @transient + private lazy val path = { +val pathp = split.getPath(index) +pathp.toString + } + + /** + * create a new DataInputStream from the split and context + */ + def open(): DataInputStream = { +if (!isOpen) { + val pathp = split.getPath(index) + val fs =
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18611204 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,219 @@ +/* --- End diff -- This file contains several different classes none of which is named RawFileInput -- it would be better to move them to separate files, each named after the class. That makes them easier to find later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18611224 --- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +/** Allows better control of the partitioning + * + */ +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ +import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, SparkContext} +import org.apache.spark.input.StreamFileInputFormat + +private[spark] class BinaryFileRDD[T]( + sc : SparkContext, + inputFormatClass: Class[_ : StreamFileInputFormat[T]], + keyClass: Class[String], + valueClass: Class[T], + @transient conf: Configuration, + minPartitions: Int) --- End diff -- Format is slightly wrong here, the constructor args should only be indented with 4 spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18611326 --- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +/** Allows better control of the partitioning + * + */ +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ +import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, SparkContext} +import org.apache.spark.input.StreamFileInputFormat + +private[spark] class BinaryFileRDD[T]( + sc : SparkContext, + inputFormatClass: Class[_ : StreamFileInputFormat[T]], + keyClass: Class[String], + valueClass: Class[T], + @transient conf: Configuration, + minPartitions: Int) + extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { + + + override def getPartitions: Array[Partition] = { +val inputFormat = inputFormatClass.newInstance +inputFormat match { + case configurable: Configurable = +configurable.setConf(conf) + case _ = +} +val jobContext = newJobContext(conf, jobId) +inputFormat.setMaxSplitSize(jobContext, minPartitions) --- End diff -- Is this actually a max split size? It seems you're passing an int that means something else, but I might've misunderstood --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18611451 --- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java --- @@ -836,6 +839,84 @@ public void sequenceFile() { Assert.assertEquals(pairs, readRDD.collect()); } +@Test +public void binaryFiles() throws Exception { +// Reusing the wholeText files example +byte[] content1 = spark is easy to use.\n.getBytes(utf-8); + + +String tempDirName = tempDir.getAbsolutePath(); +File file1 = new File(tempDirName + /part-0); + +FileOutputStream fos1 = new FileOutputStream(file1); + +FileChannel channel1 = fos1.getChannel(); +ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1); +channel1.write(bbuf); +channel1.close(); +JavaPairRDDString, PortableDataStream readRDD = sc.binaryFiles(tempDirName,3); +ListTuple2String, PortableDataStream result = readRDD.collect(); +for (Tuple2String, PortableDataStream res : result) { +Assert.assertArrayEquals(content1, res._2().toArray()); +} +} + +@Test +public void binaryFilesCaching() throws Exception { +// Reusing the wholeText files example +byte[] content1 = spark is easy to use.\n.getBytes(utf-8); + + +String tempDirName = tempDir.getAbsolutePath(); +File file1 = new File(tempDirName + /part-0); + +FileOutputStream fos1 = new FileOutputStream(file1); + +FileChannel channel1 = fos1.getChannel(); +ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1); +channel1.write(bbuf); +channel1.close(); + +JavaPairRDDString, PortableDataStream readRDD = sc.binaryFiles(tempDirName,3).cache(); +readRDD.foreach(new VoidFunctionTuple2String,PortableDataStream() { +@Override +public void call(Tuple2String, PortableDataStream stringPortableDataStreamTuple2) throws Exception { +stringPortableDataStreamTuple2._2().toArray(); // force the file to read +} +}); + +ListTuple2String, PortableDataStream result = readRDD.collect(); +for (Tuple2String, PortableDataStream res : result) { +Assert.assertArrayEquals(content1, res._2().toArray()); +} +} + +@Test +public void binaryRecords() throws Exception { +// Reusing the wholeText files example +byte[] content1 = spark isn't always easy to use.\n.getBytes(utf-8); +int numOfCopies = 10; +String tempDirName = tempDir.getAbsolutePath(); +File file1 = new File(tempDirName + /part-0); + +FileOutputStream fos1 = new FileOutputStream(file1); + +FileChannel channel1 = fos1.getChannel(); + +for (int i=0;inumOfCopies;i++) { +ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1); +channel1.write(bbuf); +} +channel1.close(); + +JavaRDDbyte[] readRDD = sc.binaryRecords(tempDirName,content1.length); --- End diff -- Add spaces after commas (affects several places in here) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18611774 --- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala --- @@ -224,6 +226,128 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(output.map(_.toString).collect().toList === List((1,a), (2,aa), (3,aaa))) } + test(binary file input as byte array) { +sc = new SparkContext(local, test) +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val bbuf = java.nio.ByteBuffer.wrap(testOutput) +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +channel.write(bbuf) +channel.close() +file.close() + +val inRdd = sc.binaryFiles(outFileName) +val (infile: String, indata: PortableDataStream) = inRdd.first + +// Make sure the name and array match +assert(infile.contains(outFileName)) // a prefix may get added +assert(indata.toArray === testOutput) + } + + test(portabledatastream caching tests) { +sc = new SparkContext(local, test) +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val bbuf = java.nio.ByteBuffer.wrap(testOutput) +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +channel.write(bbuf) +channel.close() +file.close() + +val inRdd = sc.binaryFiles(outFileName).cache() +inRdd.foreach{ + curData: (String, PortableDataStream) = + curData._2.toArray() // force the file to read +} +val mappedRdd = inRdd.map{ + curData: (String, PortableDataStream) = +(curData._2.getPath(),curData._2) +} +val (infile: String, indata: PortableDataStream) = mappedRdd.first --- End diff -- Use collect instead of first because first might go through a different code path (computing the RDD locally instead of reusing cached data). You can do .collect.head --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18611791 --- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala --- @@ -224,6 +226,128 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(output.map(_.toString).collect().toList === List((1,a), (2,aa), (3,aaa))) } + test(binary file input as byte array) { +sc = new SparkContext(local, test) +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val bbuf = java.nio.ByteBuffer.wrap(testOutput) +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +channel.write(bbuf) +channel.close() +file.close() + +val inRdd = sc.binaryFiles(outFileName) +val (infile: String, indata: PortableDataStream) = inRdd.first + +// Make sure the name and array match +assert(infile.contains(outFileName)) // a prefix may get added +assert(indata.toArray === testOutput) + } + + test(portabledatastream caching tests) { +sc = new SparkContext(local, test) +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val bbuf = java.nio.ByteBuffer.wrap(testOutput) +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +channel.write(bbuf) +channel.close() +file.close() + +val inRdd = sc.binaryFiles(outFileName).cache() +inRdd.foreach{ + curData: (String, PortableDataStream) = + curData._2.toArray() // force the file to read +} +val mappedRdd = inRdd.map{ --- End diff -- Small style issue, always have space before { --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18612223 --- Diff: core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala --- @@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) + /** + * Read a directory of binary files from HDFS, a local file system (available on all nodes), + * or any Hadoop-supported file system URI as a byte array. Each file is read as a single + * record and returned in a key-value pair, where the key is the path of each file, + * the value is the content of each file. + * + * p For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-0 + * hdfs://a-hdfs-path/part-1 + * ... + * hdfs://a-hdfs-path/part-n + * }}} + * + * Do + * `JavaPairRDDString, byte[] rdd = sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`, + * + * p then `rdd` contains + * {{{ + * (a-hdfs-path/part-0, its content) + * (a-hdfs-path/part-1, its content) + * ... + * (a-hdfs-path/part-n, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,minPartitions)) + + /** + * Read a directory of files as DataInputStream from HDFS, + * a local file system (available on all nodes), or any Hadoop-supported file system URI + * as a byte array. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each. + * + * p For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-0 + * hdfs://a-hdfs-path/part-1 + * ... + * hdfs://a-hdfs-path/part-n + * }}} + * + * Do + * `JavaPairRDDString,DataInputStream rdd = sparkContext.binaryFiles(hdfs://a-hdfs-path)`, + * + * p then `rdd` contains + * {{{ + * (a-hdfs-path/part-0, its content) + * (a-hdfs-path/part-1, its content) + * ... + * (a-hdfs-path/part-n, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryArrays(path: String, minPartitions: Int = defaultMinPartitions): --- End diff -- Don't add a default value here, it won't be usable from Java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18612265 --- Diff: core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala --- @@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) + /** + * Read a directory of binary files from HDFS, a local file system (available on all nodes), + * or any Hadoop-supported file system URI as a byte array. Each file is read as a single + * record and returned in a key-value pair, where the key is the path of each file, + * the value is the content of each file. + * + * p For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-0 + * hdfs://a-hdfs-path/part-1 + * ... + * hdfs://a-hdfs-path/part-n + * }}} + * + * Do + * `JavaPairRDDString, byte[] rdd = sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`, + * + * p then `rdd` contains + * {{{ + * (a-hdfs-path/part-0, its content) + * (a-hdfs-path/part-1, its content) + * ... + * (a-hdfs-path/part-n, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): --- End diff -- Same here, no default value. Instead add a version of the method with only one argument, and one with two. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18612296 --- Diff: core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala --- @@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) + /** + * Read a directory of binary files from HDFS, a local file system (available on all nodes), + * or any Hadoop-supported file system URI as a byte array. Each file is read as a single + * record and returned in a key-value pair, where the key is the path of each file, + * the value is the content of each file. + * + * p For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-0 + * hdfs://a-hdfs-path/part-1 + * ... + * hdfs://a-hdfs-path/part-n + * }}} + * + * Do + * `JavaPairRDDString, byte[] rdd = sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`, + * + * p then `rdd` contains + * {{{ + * (a-hdfs-path/part-0, its content) + * (a-hdfs-path/part-1, its content) + * ... + * (a-hdfs-path/part-n, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,minPartitions)) + + /** + * Read a directory of files as DataInputStream from HDFS, + * a local file system (available on all nodes), or any Hadoop-supported file system URI + * as a byte array. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each. + * + * p For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-0 + * hdfs://a-hdfs-path/part-1 + * ... + * hdfs://a-hdfs-path/part-n + * }}} + * + * Do + * `JavaPairRDDString,DataInputStream rdd = sparkContext.binaryFiles(hdfs://a-hdfs-path)`, + * + * p then `rdd` contains + * {{{ + * (a-hdfs-path/part-0, its content) + * (a-hdfs-path/part-1, its content) + * ... + * (a-hdfs-path/part-n, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryArrays(path: String, minPartitions: Int = defaultMinPartitions): --- End diff -- Also it seems this method is gone in Scala, maybe it needs to be removed in Java? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18612346 --- Diff: core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import java.io.IOException + +import org.apache.hadoop.fs.FSDataInputStream +import org.apache.hadoop.io.compress.CompressionCodecFactory +import org.apache.hadoop.io.{BytesWritable, LongWritable} +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit + +/** + * + * FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat. + * It uses the record length set in FixedLengthBinaryInputFormat to + * read one record at a time from the given InputSplit. + * + * Each call to nextKeyValue() updates the LongWritable KEY and BytesWritable VALUE. + * + * KEY = record index (Long) + * VALUE = the record itself (BytesWritable) + * + */ +private[spark] class FixedLengthBinaryRecordReader + extends RecordReader[LongWritable, BytesWritable] { + + override def close() { +if (fileInputStream != null) { + fileInputStream.close() +} + } + + override def getCurrentKey: LongWritable = { +recordKey + } + + override def getCurrentValue: BytesWritable = { +recordValue + } + + override def getProgress: Float = { +splitStart match { + case x if x == splitEnd = 0.0.toFloat + case _ = Math.min( +((currentPosition - splitStart) / (splitEnd - splitStart)).toFloat, 1.0 + ).toFloat +} + } + + override def initialize(inputSplit: InputSplit, context: TaskAttemptContext) { + +// the file input +val fileSplit = inputSplit.asInstanceOf[FileSplit] + +// the byte position this fileSplit starts at +splitStart = fileSplit.getStart + +// splitEnd byte marker that the fileSplit ends at +splitEnd = splitStart + fileSplit.getLength + +// the actual file we will be reading from +val file = fileSplit.getPath + +// job configuration +val job = context.getConfiguration + +// check compression +val codec = new CompressionCodecFactory(job).getCodec(file) +if (codec != null) { + throw new IOException(FixedLengthRecordReader does not support reading compressed files) +} + +// get the record length +recordLength = FixedLengthBinaryInputFormat.getRecordLength(context) + +// get the filesystem +val fs = file.getFileSystem(job) + +// open the File +fileInputStream = fs.open(file) + +// seek to the splitStart position +fileInputStream.seek(splitStart) + +// set our current position +currentPosition = splitStart + + } + + override def nextKeyValue(): Boolean = { + +if (recordKey == null) { + recordKey = new LongWritable() +} + +// the key is a linear index of the record, given by the +// position the record starts divided by the record length +recordKey.set(currentPosition / recordLength) + +// the recordValue to place the bytes into +if (recordValue == null) { + recordValue = new BytesWritable(new Array[Byte](recordLength)) +} + +// read a record if the currentPosition is less than the split end +if (currentPosition splitEnd) { + + // setup a buffer to store the record + val buffer = recordValue.getBytes + + fileInputStream.read(buffer, 0, recordLength) + + // update our current position + currentPosition = currentPosition + recordLength + + // return true + return true +} + +false + }
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18612406 --- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala --- @@ -224,6 +226,128 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(output.map(_.toString).collect().toList === List((1,a), (2,aa), (3,aaa))) } + test(binary file input as byte array) { +sc = new SparkContext(local, test) +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val bbuf = java.nio.ByteBuffer.wrap(testOutput) +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +channel.write(bbuf) +channel.close() +file.close() + +val inRdd = sc.binaryFiles(outFileName) +val (infile: String, indata: PortableDataStream) = inRdd.first + +// Make sure the name and array match +assert(infile.contains(outFileName)) // a prefix may get added +assert(indata.toArray === testOutput) + } + + test(portabledatastream caching tests) { +sc = new SparkContext(local, test) +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val bbuf = java.nio.ByteBuffer.wrap(testOutput) +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +channel.write(bbuf) +channel.close() +file.close() + +val inRdd = sc.binaryFiles(outFileName).cache() +inRdd.foreach{ + curData: (String, PortableDataStream) = + curData._2.toArray() // force the file to read +} +val mappedRdd = inRdd.map{ + curData: (String, PortableDataStream) = +(curData._2.getPath(),curData._2) +} +val (infile: String, indata: PortableDataStream) = mappedRdd.first + +// Try reading the output back as an object file + +assert(indata.toArray === testOutput) + } --- End diff -- Apart from the cache() test, try adding one where we call persist(StorageLevel.DISK_ONLY) to check that these are also stored correctly to disk. And add one where we use Kryo serialization too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-58429329 Hey Kevin, sorry for the delay in getting back to this. I just made a few more comments. I think this is getting pretty close, hopefully we can put it in 1.2. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18613097 --- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala --- @@ -224,6 +226,128 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(output.map(_.toString).collect().toList === List((1,a), (2,aa), (3,aaa))) } + test(binary file input as byte array) { +sc = new SparkContext(local, test) +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val bbuf = java.nio.ByteBuffer.wrap(testOutput) +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +channel.write(bbuf) +channel.close() +file.close() + +val inRdd = sc.binaryFiles(outFileName) +val (infile: String, indata: PortableDataStream) = inRdd.first + +// Make sure the name and array match +assert(infile.contains(outFileName)) // a prefix may get added +assert(indata.toArray === testOutput) + } + + test(portabledatastream caching tests) { +sc = new SparkContext(local, test) +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val bbuf = java.nio.ByteBuffer.wrap(testOutput) +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +channel.write(bbuf) +channel.close() +file.close() + +val inRdd = sc.binaryFiles(outFileName).cache() +inRdd.foreach{ + curData: (String, PortableDataStream) = + curData._2.toArray() // force the file to read +} +val mappedRdd = inRdd.map{ + curData: (String, PortableDataStream) = +(curData._2.getPath(),curData._2) +} +val (infile: String, indata: PortableDataStream) = mappedRdd.first + +// Try reading the output back as an object file + +assert(indata.toArray === testOutput) + } + + test(portabledatastream flatmap tests) { +sc = new SparkContext(local, test) +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val numOfCopies = 3 +val bbuf = java.nio.ByteBuffer.wrap(testOutput) +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +channel.write(bbuf) +channel.close() +file.close() + +val inRdd = sc.binaryFiles(outFileName) +val mappedRdd = inRdd.map{ + curData: (String, PortableDataStream) = +(curData._2.getPath(),curData._2) +} +val copyRdd = mappedRdd.flatMap{ + curData: (String, PortableDataStream) = +for(i - 1 to numOfCopies) yield (i,curData._2) +} + +val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect() + +// Try reading the output back as an object file +assert(copyArr.length == numOfCopies) +copyArr.foreach{ + cEntry: (Int, PortableDataStream) = +assert(cEntry._2.toArray === testOutput) +} + + } + + test(fixed record length binary file as byte array) { +// a fixed length of 6 bytes + +sc = new SparkContext(local, test) + +val outFile = new File(tempDir, record-bytestream-0.bin) +val outFileName = outFile.getAbsolutePath() + +// create file +val testOutput = Array[Byte](1,2,3,4,5,6) +val testOutputCopies = 10 + +// write data to file +val file = new java.io.FileOutputStream(outFile) +val channel = file.getChannel +for(i - 1 to testOutputCopies) { + val bbuf = java.nio.ByteBuffer.wrap(testOutput) + channel.write(bbuf) +} +channel.close() +file.close() + +val inRdd = sc.binaryRecords(outFileName, testOutput.length) +// make sure there are enough elements +assert(inRdd.count == testOutputCopies) + +// now just compare the first one +val indata: Array[Byte] = inRdd.first +assert(indata === testOutput) + } --- End diff -- Add a test where you try to read records with 0 or negative size too, which should raise an exception in the driver program --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18613144 --- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java --- @@ -836,6 +839,84 @@ public void sequenceFile() { Assert.assertEquals(pairs, readRDD.collect()); } +@Test +public void binaryFiles() throws Exception { +// Reusing the wholeText files example +byte[] content1 = spark is easy to use.\n.getBytes(utf-8); + + +String tempDirName = tempDir.getAbsolutePath(); +File file1 = new File(tempDirName + /part-0); + +FileOutputStream fos1 = new FileOutputStream(file1); + +FileChannel channel1 = fos1.getChannel(); +ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1); +channel1.write(bbuf); +channel1.close(); +JavaPairRDDString, PortableDataStream readRDD = sc.binaryFiles(tempDirName,3); +ListTuple2String, PortableDataStream result = readRDD.collect(); +for (Tuple2String, PortableDataStream res : result) { +Assert.assertArrayEquals(content1, res._2().toArray()); +} +} + +@Test +public void binaryFilesCaching() throws Exception { +// Reusing the wholeText files example +byte[] content1 = spark is easy to use.\n.getBytes(utf-8); + + --- End diff -- Try not to have 2 blank lines in a row, except between classes if you have multiple classes in one file --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18613166 --- Diff: core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import java.io.IOException + +import org.apache.hadoop.fs.FSDataInputStream +import org.apache.hadoop.io.compress.CompressionCodecFactory +import org.apache.hadoop.io.{BytesWritable, LongWritable} +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit + +/** + * + * FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat. + * It uses the record length set in FixedLengthBinaryInputFormat to + * read one record at a time from the given InputSplit. + * + * Each call to nextKeyValue() updates the LongWritable KEY and BytesWritable VALUE. + * + * KEY = record index (Long) + * VALUE = the record itself (BytesWritable) + * + */ +private[spark] class FixedLengthBinaryRecordReader + extends RecordReader[LongWritable, BytesWritable] { + + override def close() { +if (fileInputStream != null) { + fileInputStream.close() +} + } + + override def getCurrentKey: LongWritable = { +recordKey + } + + override def getCurrentValue: BytesWritable = { +recordValue + } + + override def getProgress: Float = { +splitStart match { + case x if x == splitEnd = 0.0.toFloat + case _ = Math.min( +((currentPosition - splitStart) / (splitEnd - splitStart)).toFloat, 1.0 + ).toFloat +} + } + + override def initialize(inputSplit: InputSplit, context: TaskAttemptContext) { + +// the file input +val fileSplit = inputSplit.asInstanceOf[FileSplit] + +// the byte position this fileSplit starts at +splitStart = fileSplit.getStart + +// splitEnd byte marker that the fileSplit ends at +splitEnd = splitStart + fileSplit.getLength + +// the actual file we will be reading from +val file = fileSplit.getPath + +// job configuration +val job = context.getConfiguration + +// check compression +val codec = new CompressionCodecFactory(job).getCodec(file) +if (codec != null) { + throw new IOException(FixedLengthRecordReader does not support reading compressed files) +} + +// get the record length +recordLength = FixedLengthBinaryInputFormat.getRecordLength(context) + +// get the filesystem +val fs = file.getFileSystem(job) + +// open the File +fileInputStream = fs.open(file) + +// seek to the splitStart position +fileInputStream.seek(splitStart) + +// set our current position +currentPosition = splitStart + --- End diff -- Avoid blank lines at end of methods --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18613254 --- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java --- @@ -836,6 +839,84 @@ public void sequenceFile() { Assert.assertEquals(pairs, readRDD.collect()); } +@Test +public void binaryFiles() throws Exception { +// Reusing the wholeText files example +byte[] content1 = spark is easy to use.\n.getBytes(utf-8); + + +String tempDirName = tempDir.getAbsolutePath(); +File file1 = new File(tempDirName + /part-0); + +FileOutputStream fos1 = new FileOutputStream(file1); + +FileChannel channel1 = fos1.getChannel(); +ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1); +channel1.write(bbuf); +channel1.close(); +JavaPairRDDString, PortableDataStream readRDD = sc.binaryFiles(tempDirName,3); +ListTuple2String, PortableDataStream result = readRDD.collect(); +for (Tuple2String, PortableDataStream res : result) { +Assert.assertArrayEquals(content1, res._2().toArray()); +} --- End diff -- Test the versions of the methods that take only one argument too (the main purpose of the Java API suite is to make sure all these methods are callable from Java, this is why it's useful to add calls to them even if they're not doing much) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18613290 --- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +/** Allows better control of the partitioning + * --- End diff -- This comment seems unrelated, why is it up here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user kmader commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18335807 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var isOpen = false + /** + * Calculate the path name independently of opening the file + */ + private lazy val path = { +val pathp = split.getPath(index) +pathp.toString + } + + /** + * create a new DataInputStream from the split and context + */ + def open(): FSDataInputStream = { +val pathp = split.getPath(index) +val fs = pathp.getFileSystem(context.getConfiguration) +fileIn = fs.open(pathp) +isOpen=true +fileIn + } + + /** + * close the file (if it is already open) + */ + def close() = { +if (isOpen) { + try { +fileIn.close() +isOpen=false + } catch { +case ioe: java.io.IOException = // do nothing + } +} + } + def getPath(): String = path +} + +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] + * to reading files out as streams + */ +abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + + + // True means the current file has been processed, then skip it. + private var processed = false + + private var key = + private var value: T = null.asInstanceOf[T] + + + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + + + override
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57623981 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21200/consoleFull) for PR 1658 at commit [`a32fef7`](https://github.com/apache/spark/commit/a32fef7b4905ef098be9e4f73e15ebdfea6a545b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57632461 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21200/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57632451 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21200/consoleFull) for PR 1658 at commit [`a32fef7`](https://github.com/apache/spark/commit/a32fef7b4905ef098be9e4f73e15ebdfea6a545b). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(@transient isplit: CombineFileSplit,` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user kmader commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18267344 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var isOpen = false + /** + * Calculate the path name independently of opening the file + */ + private lazy val path = { +val pathp = split.getPath(index) +pathp.toString + } + + /** + * create a new DataInputStream from the split and context + */ + def open(): FSDataInputStream = { +val pathp = split.getPath(index) +val fs = pathp.getFileSystem(context.getConfiguration) +fileIn = fs.open(pathp) +isOpen=true +fileIn + } + + /** + * close the file (if it is already open) + */ + def close() = { +if (isOpen) { + try { +fileIn.close() +isOpen=false + } catch { +case ioe: java.io.IOException = // do nothing + } +} + } + def getPath(): String = path +} + +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] + * to reading files out as streams + */ +abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + + + // True means the current file has been processed, then skip it. + private var processed = false + + private var key = + private var value: T = null.asInstanceOf[T] + + + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + + + override
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user kmader commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18267674 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Get an RDD for a Hadoop-readable dataset as byte-streams for each file + * (useful for binary data) + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, Array[Byte])] = { +val job = new NewHadoopJob(hadoopConfiguration) +NewFileInputFormat.addInputPath(job, new Path(path)) +val updateConf = job.getConfiguration +new BinaryFileRDD( + this, + classOf[ByteInputFormat], + classOf[String], + classOf[Array[Byte]], + updateConf, + minPartitions).setName(path) + } + + /** + * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file + * (useful for binary data) + * + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Care must be taken to close the files afterwards + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + @DeveloperApi + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, PortableDataStream)] = { +val job = new NewHadoopJob(hadoopConfiguration) +NewFileInputFormat.addInputPath(job, new Path(path)) +val updateConf = job.getConfiguration +new BinaryFileRDD( + this, + classOf[StreamInputFormat], + classOf[String], + classOf[PortableDataStream], + updateConf, + minPartitions).setName(path) + } + + /** + * Load data from a flat binary file, assuming each record is a set of numbers + * with the specified numerical format (see ByteBuffer), and the number of + * bytes per record is constant (see FixedLengthBinaryInputFormat) + * + * @param path Directory to the input data files + * @return An RDD of data with values, RDD[(Array[Byte])] + */ + def binaryRecords(path: String): RDD[Array[Byte]] = { --- End diff -- Yes this makes much more sense, I had just copied the code from @freeman-lab, but I made it into a parameter now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57440499 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21103/consoleFull) for PR 1658 at commit [`238c83c`](https://github.com/apache/spark/commit/238c83cc9eeab7012aad1a3e2660aae31073a56d). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57440605 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21103/consoleFull) for PR 1658 at commit [`238c83c`](https://github.com/apache/spark/commit/238c83cc9eeab7012aad1a3e2660aae31073a56d). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)` * `abstract class StreamBasedRecordReader[T](` * `abstract class BinaryRecordReader[T](` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57440612 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21103/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57451347 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21108/consoleFull) for PR 1658 at commit [`19812a8`](https://github.com/apache/spark/commit/19812a83df8a4852412feb7dec7f42126b0b139e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57451427 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21108/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57451425 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21108/consoleFull) for PR 1658 at commit [`19812a8`](https://github.com/apache/spark/commit/19812a83df8a4852412feb7dec7f42126b0b139e). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(@transient isplit: CombineFileSplit, @transient context: TaskAttemptContext, index: Integer)` * `abstract class StreamBasedRecordReader[T](` * `abstract class BinaryRecordReader[T](` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57452596 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21109/consoleFull) for PR 1658 at commit [`4163e38`](https://github.com/apache/spark/commit/4163e38bccca33608fc4a241760e86d4862793b5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57457522 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21109/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57457518 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21109/consoleFull) for PR 1658 at commit [`4163e38`](https://github.com/apache/spark/commit/4163e38bccca33608fc4a241760e86d4862793b5). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(@transient isplit: CombineFileSplit,` * `abstract class StreamBasedRecordReader[T](` * `abstract class BinaryRecordReader[T](` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57458416 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21110/consoleFull) for PR 1658 at commit [`0588737`](https://github.com/apache/spark/commit/05887379eafdc359206753a68571aaf3fb2dd7a6). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57465007 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21110/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57464997 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21110/consoleFull) for PR 1658 at commit [`0588737`](https://github.com/apache/spark/commit/05887379eafdc359206753a68571aaf3fb2dd7a6). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(@transient isplit: CombineFileSplit,` * `abstract class StreamBasedRecordReader[T](` * `abstract class BinaryRecordReader[T](` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57469082 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21114/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57475609 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21117/consoleFull) for PR 1658 at commit [`c27a8f1`](https://github.com/apache/spark/commit/c27a8f144eeb4e1c03d614691efae0241cca6fab). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57489219 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21117/consoleFull) for PR 1658 at commit [`c27a8f1`](https://github.com/apache/spark/commit/c27a8f144eeb4e1c03d614691efae0241cca6fab). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(@transient isplit: CombineFileSplit,` * `abstract class StreamBasedRecordReader[T](` * `abstract class BinaryRecordReader[T](` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-57489237 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21117/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18300208 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var isOpen = false + /** + * Calculate the path name independently of opening the file + */ + private lazy val path = { +val pathp = split.getPath(index) +pathp.toString + } + + /** + * create a new DataInputStream from the split and context + */ + def open(): FSDataInputStream = { +val pathp = split.getPath(index) +val fs = pathp.getFileSystem(context.getConfiguration) +fileIn = fs.open(pathp) +isOpen=true +fileIn + } + + /** + * close the file (if it is already open) + */ + def close() = { +if (isOpen) { + try { +fileIn.close() +isOpen=false + } catch { +case ioe: java.io.IOException = // do nothing + } +} + } + def getPath(): String = path +} + +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] + * to reading files out as streams + */ +abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + + + // True means the current file has been processed, then skip it. + private var processed = false + + private var key = + private var value: T = null.asInstanceOf[T] + + + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + + + override
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r18315773 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var isOpen = false + /** + * Calculate the path name independently of opening the file + */ + private lazy val path = { +val pathp = split.getPath(index) +pathp.toString + } + + /** + * create a new DataInputStream from the split and context + */ + def open(): FSDataInputStream = { +val pathp = split.getPath(index) +val fs = pathp.getFileSystem(context.getConfiguration) +fileIn = fs.open(pathp) +isOpen=true +fileIn + } + + /** + * close the file (if it is already open) + */ + def close() = { +if (isOpen) { + try { +fileIn.close() +isOpen=false + } catch { +case ioe: java.io.IOException = // do nothing + } +} + } + def getPath(): String = path +} + +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] + * to reading files out as streams + */ +abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + + + // True means the current file has been processed, then skip it. + private var processed = false + + private var key = + private var value: T = null.asInstanceOf[T] + + + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + + + override
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707338 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var isOpen = false + /** + * Calculate the path name independently of opening the file + */ + private lazy val path = { +val pathp = split.getPath(index) +pathp.toString + } + + /** + * create a new DataInputStream from the split and context + */ + def open(): FSDataInputStream = { --- End diff -- Instead of returning this Hadoop data type, can we return a java.io.DataInputStream? It's easier to maintain in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707347 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var isOpen = false + /** + * Calculate the path name independently of opening the file + */ + private lazy val path = { +val pathp = split.getPath(index) +pathp.toString + } + + /** + * create a new DataInputStream from the split and context + */ + def open(): FSDataInputStream = { +val pathp = split.getPath(index) +val fs = pathp.getFileSystem(context.getConfiguration) +fileIn = fs.open(pathp) +isOpen=true +fileIn + } + + /** + * close the file (if it is already open) + */ + def close() = { +if (isOpen) { + try { +fileIn.close() +isOpen=false + } catch { +case ioe: java.io.IOException = // do nothing + } +} + } + def getPath(): String = path +} + +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] + * to reading files out as streams + */ +abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) --- End diff -- Formatting here is kind of wrong, the arguments should be indented only 4 spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707360 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] --- End diff -- I think you can just write = null --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707399 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) --- End diff -- IMO we should call this BinaryData or DataStream instead of PortableDataStream, because the user doesn't really care that it's portable. I prefer BinaryData slightly more but I'd also be okay with DataStream. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707460 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Get an RDD for a Hadoop-readable dataset as byte-streams for each file + * (useful for binary data) + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, Array[Byte])] = { --- End diff -- What if we just added a toArray to PortableDataStream, and had only one method for reading these? Then you could do `sc.binaryFiles(...).map(_.toArray)` if you want to get byte arrays. Or would this cause a regression? Basically my suggestion is to have binaryFiles, which will return an RDD of PortableDataStream, and binaryRecords, which will return an RDD of byte arrays of the same length (since I imagine there's no point streaming a record). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-55990853 Jenkins, add to whitelist and test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707495 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong --- End diff -- Put spaces around binary operators like `*` and `/` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-55990912 @kmader sorry for the delay on this, but the new version looks pretty good. Made a few more comments on the API. The other thing I see is that the code style is not quite matching the rest of the project in some places -- you can do sbt scalastyle to check it, or Jenkins will also check it. Will point out the bigger ones. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707506 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var isOpen = false + /** + * Calculate the path name independently of opening the file + */ + private lazy val path = { +val pathp = split.getPath(index) +pathp.toString + } + + /** + * create a new DataInputStream from the split and context + */ + def open(): FSDataInputStream = { +val pathp = split.getPath(index) +val fs = pathp.getFileSystem(context.getConfiguration) +fileIn = fs.open(pathp) +isOpen=true +fileIn + } + + /** + * close the file (if it is already open) + */ + def close() = { +if (isOpen) { + try { +fileIn.close() +isOpen=false + } catch { +case ioe: java.io.IOException = // do nothing + } +} + } + def getPath(): String = path +} + +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] + * to reading files out as streams + */ +abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + + + // True means the current file has been processed, then skip it. + private var processed = false + + private var key = + private var value: T = null.asInstanceOf[T] + + + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + + + override
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707520 --- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader +import java.io.DataInputStream + + +/** + * A general format for reading whole files in as streams, byte arrays, + * or other functions to be added + */ +abstract class StreamFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { +val files = listStatus(context) +val totalLen = files.map { file = + if (file.isDir) 0L else file.getLen +}.sum + +val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong +super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): + RecordReader[String,T] + +} + +/** + * A class that allows DataStreams to be serialized and moved around by not creating them + * until they need to be read + */ +class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) + extends Serializable { + + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var isOpen = false + /** + * Calculate the path name independently of opening the file + */ + private lazy val path = { +val pathp = split.getPath(index) +pathp.toString + } + + /** + * create a new DataInputStream from the split and context + */ + def open(): FSDataInputStream = { +val pathp = split.getPath(index) +val fs = pathp.getFileSystem(context.getConfiguration) +fileIn = fs.open(pathp) +isOpen=true +fileIn + } + + /** + * close the file (if it is already open) + */ + def close() = { +if (isOpen) { + try { +fileIn.close() +isOpen=false + } catch { +case ioe: java.io.IOException = // do nothing + } +} + } + def getPath(): String = path +} + +/** + * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] + * to reading files out as streams + */ +abstract class StreamBasedRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + + + // True means the current file has been processed, then skip it. + private var processed = false + + private var key = + private var value: T = null.asInstanceOf[T] + + + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + + + override
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-55990964 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20521/consoleFull) for PR 1658 at commit [`932a206`](https://github.com/apache/spark/commit/932a2066edc45c666d94253b2c3e5f7763638b81). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707533 --- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java --- @@ -836,6 +838,29 @@ public void sequenceFile() { Assert.assertEquals(pairs, readRDD.collect()); } +@Test +public void binaryFiles() throws Exception { +// Reusing the wholeText files example +byte[] content1 = spark is easy to use.\n.getBytes(utf-8); + + +String tempDirName = tempDir.getAbsolutePath(); +File file1 = new File(tempDirName + /part-0); + +FileOutputStream fos1 = new FileOutputStream(file1); + +FileChannel channel1 = fos1.getChannel(); +ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1); +channel1.write(bbuf); + + +JavaPairRDDString, byte[] readRDD = sc.binaryFiles(tempDirName,3); +ListTuple2String, byte[] result = readRDD.collect(); +for (Tuple2String, byte[] res : result) { +Assert.assertArrayEquals(content1, res._2()); +} +} --- End diff -- Make sure to add tests for the new methods too (binaryRecords and data streams). For data streams, add a test where we cache them and go over some RDD multiple times, to make sure they can be re-read. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17707574 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Get an RDD for a Hadoop-readable dataset as byte-streams for each file + * (useful for binary data) + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, Array[Byte])] = { +val job = new NewHadoopJob(hadoopConfiguration) +NewFileInputFormat.addInputPath(job, new Path(path)) +val updateConf = job.getConfiguration +new BinaryFileRDD( + this, + classOf[ByteInputFormat], + classOf[String], + classOf[Array[Byte]], + updateConf, + minPartitions).setName(path) + } + + /** + * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file + * (useful for binary data) + * + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Care must be taken to close the files afterwards + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + @DeveloperApi + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, PortableDataStream)] = { +val job = new NewHadoopJob(hadoopConfiguration) +NewFileInputFormat.addInputPath(job, new Path(path)) +val updateConf = job.getConfiguration +new BinaryFileRDD( + this, + classOf[StreamInputFormat], + classOf[String], + classOf[PortableDataStream], + updateConf, + minPartitions).setName(path) + } + + /** + * Load data from a flat binary file, assuming each record is a set of numbers + * with the specified numerical format (see ByteBuffer), and the number of + * bytes per record is constant (see FixedLengthBinaryInputFormat) + * + * @param path Directory to the input data files + * @return An RDD of data with values, RDD[(Array[Byte])] + */ + def binaryRecords(path: String): RDD[Array[Byte]] = { --- End diff -- IMO we should pass the record length here rather than waiting for users to set it in hadoopConfiguration. You can make a new Configuration in this method and set it. I don't see any use case where the user would prefer to set it in a global configuration object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-55993418 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20521/consoleFull) for PR 1658 at commit [`932a206`](https://github.com/apache/spark/commit/932a2066edc45c666d94253b2c3e5f7763638b81). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)` * `abstract class StreamBasedRecordReader[T](` * `abstract class BinaryRecordReader[T](` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user freeman-lab commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17709043 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Get an RDD for a Hadoop-readable dataset as byte-streams for each file + * (useful for binary data) + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, Array[Byte])] = { +val job = new NewHadoopJob(hadoopConfiguration) +NewFileInputFormat.addInputPath(job, new Path(path)) +val updateConf = job.getConfiguration +new BinaryFileRDD( + this, + classOf[ByteInputFormat], + classOf[String], + classOf[Array[Byte]], + updateConf, + minPartitions).setName(path) + } + + /** + * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file + * (useful for binary data) + * + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Care must be taken to close the files afterwards + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + @DeveloperApi + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, PortableDataStream)] = { +val job = new NewHadoopJob(hadoopConfiguration) +NewFileInputFormat.addInputPath(job, new Path(path)) +val updateConf = job.getConfiguration +new BinaryFileRDD( + this, + classOf[StreamInputFormat], + classOf[String], + classOf[PortableDataStream], + updateConf, + minPartitions).setName(path) + } + + /** + * Load data from a flat binary file, assuming each record is a set of numbers + * with the specified numerical format (see ByteBuffer), and the number of + * bytes per record is constant (see FixedLengthBinaryInputFormat) + * + * @param path Directory to the input data files + * @return An RDD of data with values, RDD[(Array[Byte])] + */ + def binaryRecords(path: String): RDD[Array[Byte]] = { --- End diff -- @mateiz I agree that makes sense. Originally I used the global configuration file so I could call it from PySpark, as in ``sc.newAPIHadoopRDD('FixedLengthBinaryInputFormat',..., conf={'recordLength': '100'})``. But we can just add the new ``binaryRecords`` to PySpark directly (probably in a separate PR), and have record length an argument as you suggest. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user jrabary commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-55714177 Hi all, I'm trying to use this patch to load a set of jpeg images but the path (key) of the output is empty val image = sc.binaryFiles(data/*.jpg) image.take(1) foreach println (,[B@15435acb) how can I correct this ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user kmader commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-55769371 Thanks @jrabary for this find, it had to do with the new method for handling PortableDataStreams which didn't calculate the name correctly. I think I have it fixed now ``` val images = sc.binaryFiles(figure/*.png) println(images.first) ``` Output ``` (figure/unnamed-chunk-2.png,[B@2f1a30ca) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user kmader commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-54744540 Hey @mateiz, Sorry, I had other projects to work on. I have made the changes and called the new class ```PortableDataStream``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-54694543 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-54699324 Hey @kmader, just curious, did you think about this BinaryData object idea? If you don't have time to do it, someone else can also pick up from where you left off. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...
Github user freeman-lab commented on the pull request: https://github.com/apache/spark/pull/1658#issuecomment-52149331 @kmader great to incorporate the FixedRecord stuff into the PR, thanks! I like @mateiz's suggestion for naming the two methods. I was starting to work on ``saveAsBinaryRecords`` so could put that together in a separate PR, maybe @kmader already has some save methods for the ``binaryFiles`` case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org