Github user freeman-lab commented on a diff in the pull request: https://github.com/apache/spark/pull/1658#discussion_r17709043 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Get an RDD for a Hadoop-readable dataset as byte-streams for each file + * (useful for binary data) + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, Array[Byte])] = { + val job = new NewHadoopJob(hadoopConfiguration) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updateConf = job.getConfiguration + new BinaryFileRDD( + this, + classOf[ByteInputFormat], + classOf[String], + classOf[Array[Byte]], + updateConf, + minPartitions).setName(path) + } + + /** + * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file + * (useful for binary data) + * + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Care must be taken to close the files afterwards + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + @DeveloperApi + def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, PortableDataStream)] = { + val job = new NewHadoopJob(hadoopConfiguration) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updateConf = job.getConfiguration + new BinaryFileRDD( + this, + classOf[StreamInputFormat], + classOf[String], + classOf[PortableDataStream], + updateConf, + minPartitions).setName(path) + } + + /** + * Load data from a flat binary file, assuming each record is a set of numbers + * with the specified numerical format (see ByteBuffer), and the number of + * bytes per record is constant (see FixedLengthBinaryInputFormat) + * + * @param path Directory to the input data files + * @return An RDD of data with values, RDD[(Array[Byte])] + */ + def binaryRecords(path: String): RDD[Array[Byte]] = { --- End diff -- @mateiz I agree that makes sense. Originally I used the global configuration file so I could call it from PySpark, as in ``sc.newAPIHadoopRDD('FixedLengthBinaryInputFormat',..., conf={'recordLength': '100'})``. But we can just add the new ``binaryRecords`` to PySpark directly (probably in a separate PR), and have record length an argument as you suggest.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org