[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2015-12-27 Thread saeedehA
Github user saeedehA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-167496362
  
Hello
I'm sorry that I'm beginner in pyspark, but need to get .bin file as byte 
array and I don't understand anything about what I must do, is it possible that 
say me what I must do, step by step(from what I must import to function that 
must call)!! Thanks alot
( Sorry, I can't find any thing that help me, except here)  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-12-26 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r22288031
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -510,6 +510,52 @@ class SparkContext(config: SparkConf) extends Logging {
   minPartitions).setName(path)
   }
 
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset as PortableDataStream for 
each file
+   * (useful for binary data)
+   *
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  @DeveloperApi
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  RDD[(String, PortableDataStream)] = {
+val job = new NewHadoopJob(hadoopConfiguration)
+NewFileInputFormat.addInputPath(job, new Path(path))
+val updateConf = job.getConfiguration
+new BinaryFileRDD(
+  this,
+  classOf[StreamInputFormat],
+  classOf[String],
+  classOf[PortableDataStream],
+  updateConf,
+  minPartitions).setName(path)
+  }
+
+  /**
+   * Load data from a flat binary file, assuming each record is a set of 
numbers
+   * with the specified numerical format (see ByteBuffer), and the number 
of
+   * bytes per record is constant (see FixedLengthBinaryInputFormat)
+   *
+   * @param path Directory to the input data files
+   * @param recordLength The length at which to split the records
+   * @return An RDD of data with values, RDD[(Array[Byte])]
+   */
+  def binaryRecords(path: String, recordLength: Int,
+conf: Configuration = hadoopConfiguration): 
RDD[Array[Byte]] = {
+conf.setInt(recordLength,recordLength)
+val br = newAPIHadoopFile[LongWritable, BytesWritable, 
FixedLengthBinaryInputFormat](path,
+  classOf[FixedLengthBinaryInputFormat],
+  classOf[LongWritable],
+  classOf[BytesWritable],
+  conf=conf)
+val data = br.map{ case (k, v) = v.getBytes}
--- End diff --

It turns out that `getBytes` returns a padded byte array, so I think  you 
may need to copy / slice out the subarray with the data using `v.getLength`; 
see [HADOOP-6298: BytesWritable#getBytes is a bad name that leads to 
programming mistakes](https://issues.apache.org/jira/browse/HADOOP-6298) for 
more details.

Using `getBytes` without `getLength` has caused bugs in Spark in the past: 
#2712.

Is the use of `getBytes` in this patch a bug?  Or is it somehow safe due to 
our use of FixedLengthBinaryInputFormat?  If it is somehow safe, we should have 
a comment which explains this so that readers who know about the `getBytes` 
issue aren't confused (or better yet, an `assert` that `getBytes` returns an 
array of the expected length).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-11-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1658


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-11-01 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61379142
  
Thanks @kmader, I merged this now. I manually amended the patch a bit to 
fix style issues (there were still a bunch of commas without spaces, etc), and 
I also changed the name of the recordLength property in Hadoop JobConfs to 
start with org.apache.spark so that it's less likely to clash with other Hadoop 
properties. Finally I marked this API as `@Experimental` for now since it's new 
in this release, though we can probably make it non-experimental in 1.3.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-60979360
  
@kmader btw if you don't have time to deal with these comments, let me 
know; I might be able to take the patch from where it is and implement them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread kmader
Github user kmader commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r19582168
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+/** Allows better control of the partitioning
+  *
+  */
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, 
SparkContext}
+import org.apache.spark.input.StreamFileInputFormat
+
+private[spark] class BinaryFileRDD[T](
+   sc : SparkContext,
+   inputFormatClass: Class[_ : 
StreamFileInputFormat[T]],
+   keyClass: Class[String],
+   valueClass: Class[T],
+   @transient conf: Configuration,
+   minPartitions: Int)
+  extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, 
valueClass, conf) {
+
+
+  override def getPartitions: Array[Partition] = {
+val inputFormat = inputFormatClass.newInstance
+inputFormat match {
+  case configurable: Configurable =
+configurable.setConf(conf)
+  case _ =
+}
+val jobContext = newJobContext(conf, jobId)
+inputFormat.setMaxSplitSize(jobContext, minPartitions)
--- End diff --

Sorry this function was named incorrectly, it ultimately calls 
```setMaxSplitSize``` after calculating the maximum size based on the number of 
partitions, I have now renamed it accordingly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61030284
  
  [Test build #22503 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22503/consoleFull)
 for   PR 1658 at commit 
[`3c49a30`](https://github.com/apache/spark/commit/3c49a305033d76e07bca60a72600c0db544407dd).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61031018
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22502/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61031930
  
Thanks for the update, Kevin. Looks like Jenkins had some issues with git, 
will retry it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61031939
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61032240
  
  [Test build #22505 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22505/consoleFull)
 for   PR 1658 at commit 
[`3c49a30`](https://github.com/apache/spark/commit/3c49a305033d76e07bca60a72600c0db544407dd).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61035841
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22503/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61035835
  
  [Test build #22503 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22503/consoleFull)
 for   PR 1658 at commit 
[`3c49a30`](https://github.com/apache/spark/commit/3c49a305033d76e07bca60a72600c0db544407dd).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61037379
  
  [Test build #22505 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22505/consoleFull)
 for   PR 1658 at commit 
[`3c49a30`](https://github.com/apache/spark/commit/3c49a305033d76e07bca60a72600c0db544407dd).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(@transient isplit: CombineFileSplit,`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-61037383
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22505/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-27 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r19453138
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---
@@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends 
JavaSparkContextVarargsWork
   def wholeTextFiles(path: String): JavaPairRDD[String, String] =
 new JavaPairRDD(sc.wholeTextFiles(path))
 
+  /**
+   * Read a directory of binary files from HDFS, a local file system 
(available on all nodes),
+   * or any Hadoop-supported file system URI as a byte array. Each file is 
read as a single
+   * record and returned in a key-value pair, where the key is the path of 
each file,
+   * the value is the content of each file.
+   *
+   * p For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-0
+   *   hdfs://a-hdfs-path/part-1
+   *   ...
+   *   hdfs://a-hdfs-path/part-n
+   * }}}
+   *
+   * Do
+   * `JavaPairRDDString, byte[] rdd = 
sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`,
+   *
+   * p then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-0, its content)
+   *   (a-hdfs-path/part-1, its content)
+   *   ...
+   *   (a-hdfs-path/part-n, its content)
+   * }}}
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  JavaPairRDD[String,PortableDataStream] = new 
JavaPairRDD(sc.binaryFiles(path,minPartitions))
+
+  /**
+   * Read a directory of files as DataInputStream from HDFS,
+   * a local file system (available on all nodes), or any Hadoop-supported 
file system URI
+   * as a byte array. Each file is read as a single record and returned in 
a
+   * key-value pair, where the key is the path of each file, the value is 
the content of each.
+   *
+   * p For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-0
+   *   hdfs://a-hdfs-path/part-1
+   *   ...
+   *   hdfs://a-hdfs-path/part-n
+   * }}}
+   *
+   * Do
+   * `JavaPairRDDString,DataInputStream rdd = 
sparkContext.binaryFiles(hdfs://a-hdfs-path)`,
+   *
+   * p then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-0, its content)
+   *   (a-hdfs-path/part-1, its content)
+   *   ...
+   *   (a-hdfs-path/part-n, its content)
+   * }}}
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   */
+  def binaryArrays(path: String, minPartitions: Int = 
defaultMinPartitions):
--- End diff --

I'd still remove this. It's confusing to see the API in just one language, 
and with Java 8, the extra class will be a one-liner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-27 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-60709302
  
Thanks for the update, Kevin. Note that there are still a few comments from 
me on https://github.com/apache/spark/pull/1658/files, do you mind dealing with 
those?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-27 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r19453170
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+/** Allows better control of the partitioning
+  *
+  */
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, 
SparkContext}
+import org.apache.spark.input.StreamFileInputFormat
+
+private[spark] class BinaryFileRDD[T](
+   sc : SparkContext,
+   inputFormatClass: Class[_ : 
StreamFileInputFormat[T]],
+   keyClass: Class[String],
+   valueClass: Class[T],
+   @transient conf: Configuration,
+   minPartitions: Int)
+  extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, 
valueClass, conf) {
+
+
+  override def getPartitions: Array[Partition] = {
+val inputFormat = inputFormatClass.newInstance
+inputFormat match {
+  case configurable: Configurable =
+configurable.setConf(conf)
+  case _ =
+}
+val jobContext = newJobContext(conf, jobId)
+inputFormat.setMaxSplitSize(jobContext, minPartitions)
--- End diff --

BTW this comment was important too, what is the meaning of this parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59882709
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21972/consoleFull)
 for   PR 1658 at commit 
[`8ac288b`](https://github.com/apache/spark/commit/8ac288bc09e779f1b4c96dcb497ee4eca962439f).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-21 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59887629
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21972/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59887621
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21972/consoleFull)
 for   PR 1658 at commit 
[`8ac288b`](https://github.com/apache/spark/commit/8ac288bc09e779f1b4c96dcb497ee4eca962439f).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(@transient isplit: CombineFileSplit,`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-21 Thread kmader
Github user kmader commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r19133684
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---
@@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends 
JavaSparkContextVarargsWork
   def wholeTextFiles(path: String): JavaPairRDD[String, String] =
 new JavaPairRDD(sc.wholeTextFiles(path))
 
+  /**
+   * Read a directory of binary files from HDFS, a local file system 
(available on all nodes),
+   * or any Hadoop-supported file system URI as a byte array. Each file is 
read as a single
+   * record and returned in a key-value pair, where the key is the path of 
each file,
+   * the value is the content of each file.
+   *
+   * p For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-0
+   *   hdfs://a-hdfs-path/part-1
+   *   ...
+   *   hdfs://a-hdfs-path/part-n
+   * }}}
+   *
+   * Do
+   * `JavaPairRDDString, byte[] rdd = 
sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`,
+   *
+   * p then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-0, its content)
+   *   (a-hdfs-path/part-1, its content)
+   *   ...
+   *   (a-hdfs-path/part-n, its content)
+   * }}}
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  JavaPairRDD[String,PortableDataStream] = new 
JavaPairRDD(sc.binaryFiles(path,minPartitions))
+
+  /**
+   * Read a directory of files as DataInputStream from HDFS,
+   * a local file system (available on all nodes), or any Hadoop-supported 
file system URI
+   * as a byte array. Each file is read as a single record and returned in 
a
+   * key-value pair, where the key is the path of each file, the value is 
the content of each.
+   *
+   * p For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-0
+   *   hdfs://a-hdfs-path/part-1
+   *   ...
+   *   hdfs://a-hdfs-path/part-n
+   * }}}
+   *
+   * Do
+   * `JavaPairRDDString,DataInputStream rdd = 
sparkContext.binaryFiles(hdfs://a-hdfs-path)`,
+   *
+   * p then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-0, its content)
+   *   (a-hdfs-path/part-1, its content)
+   *   ...
+   *   (a-hdfs-path/part-n, its content)
+   * }}}
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   */
+  def binaryArrays(path: String, minPartitions: Int = 
defaultMinPartitions):
--- End diff --

In scala it is easy to add separately, in java you need to create an 
anonymous class which is more of a hassle


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-60015558
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22011/consoleFull)
 for   PR 1658 at commit 
[`6379be4`](https://github.com/apache/spark/commit/6379be487cfa91097f3591dfd05b8e87e09c2399).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(@transient isplit: CombineFileSplit,`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-21 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-60015564
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22011/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59805377
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21922/consoleFull)
 for   PR 1658 at commit 
[`92bda0d`](https://github.com/apache/spark/commit/92bda0daf2fffeea0f1de9199fc71fe978a165c7).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59805569
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21922/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59805565
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21922/consoleFull)
 for   PR 1658 at commit 
[`92bda0d`](https://github.com/apache/spark/commit/92bda0daf2fffeea0f1de9199fc71fe978a165c7).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-20 Thread kmader
Github user kmader commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59832070
  
So I made the requested changes and added a few more tests, but the tests 
appear to have not run for a strange reason: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21922/console,
 the build runs out of memory with Maven but works fine in IntelliJ but I do 
not get any feedback on the style is there any single maven phase I can run to 
get that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-20 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59866772
  
There might've been some Jenkins issues recently; going to restart it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-20 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59866801
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-20 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59866791
  
BTW for the style, you can do sbt/sbt scalastyle locally if you want. Not 
sure there's a command in Maven.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59867124
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21950/consoleFull)
 for   PR 1658 at commit 
[`92bda0d`](https://github.com/apache/spark/commit/92bda0daf2fffeea0f1de9199fc71fe978a165c7).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-59867194
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21950/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18610904
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
+   * Get an RDD for a Hadoop-readable dataset as byte-streams for each file
+   * (useful for binary data)
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  RDD[(String, Array[Byte])] = {
+val job = new NewHadoopJob(hadoopConfiguration)
+NewFileInputFormat.addInputPath(job, new Path(path))
+val updateConf = job.getConfiguration
+new RawFileRDD(
+  this,
+  classOf[ByteInputFormat],
+  classOf[String],
+  classOf[Array[Byte]],
+  updateConf,
+  minPartitions).setName(path)
+  }
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset as DataInputStreams for each 
file
+   * (useful for binary data)
+   *
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Care must be taken to close the files afterwards
--- End diff --

Hey Kevin, is this `@note` still relevant? using addOnCompleteCallback you 
might be able to avoid this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18610930
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -510,6 +510,53 @@ class SparkContext(config: SparkConf) extends Logging {
   minPartitions).setName(path)
   }
 
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset as PortableDataStream for 
each file
+   * (useful for binary data)
+   *
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Care must be taken to close the files afterwards
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  @DeveloperApi
--- End diff --

There's no need to have `@DeveloperApi` on this, as PortableDataStream is 
something we want to support.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18610978
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ ByteStreams, Closeables }
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{ FSDataInputStream, Path }
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, 
DataOutputStream, DataInputStream }
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+private[spark] abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String, T] {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: 
TaskAttemptContext): RecordReader[String, T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ * @note TaskAttemptContext is not serializable resulting in the confBytes 
construct
+ * @note CombineFileSplit is not serializable resulting in the splitBytes 
construct
+ */
+@DeveloperApi
--- End diff --

No need for `@DeveloperApi` on this, we want to support it. It was only 
needed on the various input formats.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18611084
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ ByteStreams, Closeables }
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{ FSDataInputStream, Path }
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, 
DataOutputStream, DataInputStream }
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+private[spark] abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String, T] {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: 
TaskAttemptContext): RecordReader[String, T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ * @note TaskAttemptContext is not serializable resulting in the confBytes 
construct
+ * @note CombineFileSplit is not serializable resulting in the splitBytes 
construct
+ */
+@DeveloperApi
--- End diff --

Actually, one thing that would help is to make this a `trait` (i.e. 
interface in Java), so users can't instantiate it, and then have a 
`private[spark] class PortableDataStreamImpl` for the implementation. We don't 
want to expose the constructor if we can.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18611129
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ ByteStreams, Closeables }
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{ FSDataInputStream, Path }
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, 
DataOutputStream, DataInputStream }
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+private[spark] abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String, T] {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: 
TaskAttemptContext): RecordReader[String, T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ * @note TaskAttemptContext is not serializable resulting in the confBytes 
construct
+ * @note CombineFileSplit is not serializable resulting in the splitBytes 
construct
+ */
+@DeveloperApi
+class PortableDataStream(@transient isplit: CombineFileSplit,
+  @transient context: TaskAttemptContext, index: Integer)
+  extends Serializable {
+  // transient forces file to be reopened after being serialization
+  // it is also used for non-serializable classes
+
+  @transient
+  private var fileIn: DataInputStream = null.asInstanceOf[DataInputStream]
+  @transient
+  private var isOpen = false
+
+  private val confBytes = {
+val baos = new ByteArrayOutputStream()
+context.getConfiguration.write(new DataOutputStream(baos))
+baos.toByteArray
+  }
+
+  private val splitBytes = {
+val baos = new ByteArrayOutputStream()
+isplit.write(new DataOutputStream(baos))
+baos.toByteArray
+  }
+
+  @transient
+  private lazy val split = {
+val bais = new ByteArrayInputStream(splitBytes)
+val nsplit = new CombineFileSplit()
+nsplit.readFields(new DataInputStream(bais))
+nsplit
+  }
+
+  @transient
+  private lazy val conf = {
+val bais = new ByteArrayInputStream(confBytes)
+val nconf = new Configuration()
+nconf.readFields(new DataInputStream(bais))
+nconf
+  }
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  @transient
+  private lazy val path = {
+val pathp = split.getPath(index)
+pathp.toString
+  }
+
+  /**
+   * create a new DataInputStream from the split and context
+   */
+  def open(): DataInputStream = {
+if (!isOpen) {
+  val pathp = split.getPath(index)
+  val fs = 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18611204
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,219 @@
+/*
--- End diff --

This file contains several different classes none of which is named 
RawFileInput -- it would be better to move them to separate files, each named 
after the class. That makes them easier to find later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18611224
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+/** Allows better control of the partitioning
+  *
+  */
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, 
SparkContext}
+import org.apache.spark.input.StreamFileInputFormat
+
+private[spark] class BinaryFileRDD[T](
+   sc : SparkContext,
+   inputFormatClass: Class[_ : 
StreamFileInputFormat[T]],
+   keyClass: Class[String],
+   valueClass: Class[T],
+   @transient conf: Configuration,
+   minPartitions: Int)
--- End diff --

Format is slightly wrong here, the constructor args should only be indented 
with 4 spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18611326
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+/** Allows better control of the partitioning
+  *
+  */
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, 
SparkContext}
+import org.apache.spark.input.StreamFileInputFormat
+
+private[spark] class BinaryFileRDD[T](
+   sc : SparkContext,
+   inputFormatClass: Class[_ : 
StreamFileInputFormat[T]],
+   keyClass: Class[String],
+   valueClass: Class[T],
+   @transient conf: Configuration,
+   minPartitions: Int)
+  extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, 
valueClass, conf) {
+
+
+  override def getPartitions: Array[Partition] = {
+val inputFormat = inputFormatClass.newInstance
+inputFormat match {
+  case configurable: Configurable =
+configurable.setConf(conf)
+  case _ =
+}
+val jobContext = newJobContext(conf, jobId)
+inputFormat.setMaxSplitSize(jobContext, minPartitions)
--- End diff --

Is this actually a max split size? It seems you're passing an int that 
means something else, but I might've misunderstood 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18611451
  
--- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java ---
@@ -836,6 +839,84 @@ public void sequenceFile() {
 Assert.assertEquals(pairs, readRDD.collect());
   }
 
+@Test
+public void binaryFiles() throws Exception {
+// Reusing the wholeText files example
+byte[] content1 = spark is easy to use.\n.getBytes(utf-8);
+
+
+String tempDirName = tempDir.getAbsolutePath();
+File file1 = new File(tempDirName + /part-0);
+
+FileOutputStream fos1 = new FileOutputStream(file1);
+
+FileChannel channel1 = fos1.getChannel();
+ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+channel1.write(bbuf);
+channel1.close();
+JavaPairRDDString, PortableDataStream readRDD = 
sc.binaryFiles(tempDirName,3);
+ListTuple2String, PortableDataStream result = 
readRDD.collect();
+for (Tuple2String, PortableDataStream res : result) {
+Assert.assertArrayEquals(content1, res._2().toArray());
+}
+}
+
+@Test
+public void binaryFilesCaching() throws Exception {
+// Reusing the wholeText files example
+byte[] content1 = spark is easy to use.\n.getBytes(utf-8);
+
+
+String tempDirName = tempDir.getAbsolutePath();
+File file1 = new File(tempDirName + /part-0);
+
+FileOutputStream fos1 = new FileOutputStream(file1);
+
+FileChannel channel1 = fos1.getChannel();
+ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+channel1.write(bbuf);
+channel1.close();
+
+JavaPairRDDString, PortableDataStream readRDD = 
sc.binaryFiles(tempDirName,3).cache();
+readRDD.foreach(new 
VoidFunctionTuple2String,PortableDataStream() {
+@Override
+public void call(Tuple2String, PortableDataStream 
stringPortableDataStreamTuple2) throws Exception {
+stringPortableDataStreamTuple2._2().toArray(); // force 
the file to read
+}
+});
+
+ListTuple2String, PortableDataStream result = 
readRDD.collect();
+for (Tuple2String, PortableDataStream res : result) {
+Assert.assertArrayEquals(content1, res._2().toArray());
+}
+}
+
+@Test
+public void binaryRecords() throws Exception {
+// Reusing the wholeText files example
+byte[] content1 = spark isn't always easy to 
use.\n.getBytes(utf-8);
+int numOfCopies = 10;
+String tempDirName = tempDir.getAbsolutePath();
+File file1 = new File(tempDirName + /part-0);
+
+FileOutputStream fos1 = new FileOutputStream(file1);
+
+FileChannel channel1 = fos1.getChannel();
+
+for (int i=0;inumOfCopies;i++) {
+ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+channel1.write(bbuf);
+}
+channel1.close();
+
+JavaRDDbyte[] readRDD = 
sc.binaryRecords(tempDirName,content1.length);
--- End diff --

Add spaces after commas (affects several places in here)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18611774
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -224,6 +226,128 @@ class FileSuite extends FunSuite with 
LocalSparkContext {
 assert(output.map(_.toString).collect().toList === List((1,a), 
(2,aa), (3,aaa)))
   }
 
+  test(binary file input as byte array) {
+sc = new SparkContext(local, test)
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+channel.write(bbuf)
+channel.close()
+file.close()
+
+val inRdd = sc.binaryFiles(outFileName)
+val (infile: String, indata: PortableDataStream) = inRdd.first
+
+// Make sure the name and array match
+assert(infile.contains(outFileName)) // a prefix may get added
+assert(indata.toArray === testOutput)
+  }
+
+  test(portabledatastream caching tests) {
+sc = new SparkContext(local, test)
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+channel.write(bbuf)
+channel.close()
+file.close()
+
+val inRdd = sc.binaryFiles(outFileName).cache()
+inRdd.foreach{
+  curData: (String, PortableDataStream) =
+   curData._2.toArray() // force the file to read
+}
+val mappedRdd = inRdd.map{
+  curData: (String, PortableDataStream) =
+(curData._2.getPath(),curData._2)
+}
+val (infile: String, indata: PortableDataStream) = mappedRdd.first
--- End diff --

Use collect instead of first because first might go through a different 
code path (computing the RDD locally instead of reusing cached data). You can 
do .collect.head


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18611791
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -224,6 +226,128 @@ class FileSuite extends FunSuite with 
LocalSparkContext {
 assert(output.map(_.toString).collect().toList === List((1,a), 
(2,aa), (3,aaa)))
   }
 
+  test(binary file input as byte array) {
+sc = new SparkContext(local, test)
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+channel.write(bbuf)
+channel.close()
+file.close()
+
+val inRdd = sc.binaryFiles(outFileName)
+val (infile: String, indata: PortableDataStream) = inRdd.first
+
+// Make sure the name and array match
+assert(infile.contains(outFileName)) // a prefix may get added
+assert(indata.toArray === testOutput)
+  }
+
+  test(portabledatastream caching tests) {
+sc = new SparkContext(local, test)
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+channel.write(bbuf)
+channel.close()
+file.close()
+
+val inRdd = sc.binaryFiles(outFileName).cache()
+inRdd.foreach{
+  curData: (String, PortableDataStream) =
+   curData._2.toArray() // force the file to read
+}
+val mappedRdd = inRdd.map{
--- End diff --

Small style issue, always have space before {


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18612223
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---
@@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends 
JavaSparkContextVarargsWork
   def wholeTextFiles(path: String): JavaPairRDD[String, String] =
 new JavaPairRDD(sc.wholeTextFiles(path))
 
+  /**
+   * Read a directory of binary files from HDFS, a local file system 
(available on all nodes),
+   * or any Hadoop-supported file system URI as a byte array. Each file is 
read as a single
+   * record and returned in a key-value pair, where the key is the path of 
each file,
+   * the value is the content of each file.
+   *
+   * p For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-0
+   *   hdfs://a-hdfs-path/part-1
+   *   ...
+   *   hdfs://a-hdfs-path/part-n
+   * }}}
+   *
+   * Do
+   * `JavaPairRDDString, byte[] rdd = 
sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`,
+   *
+   * p then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-0, its content)
+   *   (a-hdfs-path/part-1, its content)
+   *   ...
+   *   (a-hdfs-path/part-n, its content)
+   * }}}
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  JavaPairRDD[String,PortableDataStream] = new 
JavaPairRDD(sc.binaryFiles(path,minPartitions))
+
+  /**
+   * Read a directory of files as DataInputStream from HDFS,
+   * a local file system (available on all nodes), or any Hadoop-supported 
file system URI
+   * as a byte array. Each file is read as a single record and returned in 
a
+   * key-value pair, where the key is the path of each file, the value is 
the content of each.
+   *
+   * p For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-0
+   *   hdfs://a-hdfs-path/part-1
+   *   ...
+   *   hdfs://a-hdfs-path/part-n
+   * }}}
+   *
+   * Do
+   * `JavaPairRDDString,DataInputStream rdd = 
sparkContext.binaryFiles(hdfs://a-hdfs-path)`,
+   *
+   * p then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-0, its content)
+   *   (a-hdfs-path/part-1, its content)
+   *   ...
+   *   (a-hdfs-path/part-n, its content)
+   * }}}
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   */
+  def binaryArrays(path: String, minPartitions: Int = 
defaultMinPartitions):
--- End diff --

Don't add a default value here, it won't be usable from Java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18612265
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---
@@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends 
JavaSparkContextVarargsWork
   def wholeTextFiles(path: String): JavaPairRDD[String, String] =
 new JavaPairRDD(sc.wholeTextFiles(path))
 
+  /**
+   * Read a directory of binary files from HDFS, a local file system 
(available on all nodes),
+   * or any Hadoop-supported file system URI as a byte array. Each file is 
read as a single
+   * record and returned in a key-value pair, where the key is the path of 
each file,
+   * the value is the content of each file.
+   *
+   * p For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-0
+   *   hdfs://a-hdfs-path/part-1
+   *   ...
+   *   hdfs://a-hdfs-path/part-n
+   * }}}
+   *
+   * Do
+   * `JavaPairRDDString, byte[] rdd = 
sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`,
+   *
+   * p then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-0, its content)
+   *   (a-hdfs-path/part-1, its content)
+   *   ...
+   *   (a-hdfs-path/part-n, its content)
+   * }}}
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
--- End diff --

Same here, no default value. Instead add a version of the method with only 
one argument, and one with two.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18612296
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---
@@ -220,6 +227,83 @@ class JavaSparkContext(val sc: SparkContext) extends 
JavaSparkContextVarargsWork
   def wholeTextFiles(path: String): JavaPairRDD[String, String] =
 new JavaPairRDD(sc.wholeTextFiles(path))
 
+  /**
+   * Read a directory of binary files from HDFS, a local file system 
(available on all nodes),
+   * or any Hadoop-supported file system URI as a byte array. Each file is 
read as a single
+   * record and returned in a key-value pair, where the key is the path of 
each file,
+   * the value is the content of each file.
+   *
+   * p For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-0
+   *   hdfs://a-hdfs-path/part-1
+   *   ...
+   *   hdfs://a-hdfs-path/part-n
+   * }}}
+   *
+   * Do
+   * `JavaPairRDDString, byte[] rdd = 
sparkContext.dataStreamFiles(hdfs://a-hdfs-path)`,
+   *
+   * p then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-0, its content)
+   *   (a-hdfs-path/part-1, its content)
+   *   ...
+   *   (a-hdfs-path/part-n, its content)
+   * }}}
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  JavaPairRDD[String,PortableDataStream] = new 
JavaPairRDD(sc.binaryFiles(path,minPartitions))
+
+  /**
+   * Read a directory of files as DataInputStream from HDFS,
+   * a local file system (available on all nodes), or any Hadoop-supported 
file system URI
+   * as a byte array. Each file is read as a single record and returned in 
a
+   * key-value pair, where the key is the path of each file, the value is 
the content of each.
+   *
+   * p For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-0
+   *   hdfs://a-hdfs-path/part-1
+   *   ...
+   *   hdfs://a-hdfs-path/part-n
+   * }}}
+   *
+   * Do
+   * `JavaPairRDDString,DataInputStream rdd = 
sparkContext.binaryFiles(hdfs://a-hdfs-path)`,
+   *
+   * p then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-0, its content)
+   *   (a-hdfs-path/part-1, its content)
+   *   ...
+   *   (a-hdfs-path/part-n, its content)
+   * }}}
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   */
+  def binaryArrays(path: String, minPartitions: Int = 
defaultMinPartitions):
--- End diff --

Also it seems this method is gone in Scala, maybe it needs to be removed in 
Java?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18612346
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala 
---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.FSDataInputStream
+import org.apache.hadoop.io.compress.CompressionCodecFactory
+import org.apache.hadoop.io.{BytesWritable, LongWritable}
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+
+/**
+ *
+ * FixedLengthBinaryRecordReader is returned by 
FixedLengthBinaryInputFormat.
+ * It uses the record length set in FixedLengthBinaryInputFormat to
+ * read one record at a time from the given InputSplit.
+ *
+ * Each call to nextKeyValue() updates the LongWritable KEY and 
BytesWritable VALUE.
+ *
+ * KEY = record index (Long)
+ * VALUE = the record itself (BytesWritable)
+ *
+ */
+private[spark] class FixedLengthBinaryRecordReader
+  extends RecordReader[LongWritable, BytesWritable] {
+
+  override def close() {
+if (fileInputStream != null) {
+  fileInputStream.close()
+}
+  }
+
+  override def getCurrentKey: LongWritable = {
+recordKey
+  }
+
+  override def getCurrentValue: BytesWritable = {
+recordValue
+  }
+
+  override def getProgress: Float = {
+splitStart match {
+  case x if x == splitEnd = 0.0.toFloat
+  case _ = Math.min(
+((currentPosition - splitStart) / (splitEnd - 
splitStart)).toFloat, 1.0
+  ).toFloat
+}
+  }
+
+  override def initialize(inputSplit: InputSplit, context: 
TaskAttemptContext) {
+
+// the file input
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+
+// the byte position this fileSplit starts at
+splitStart = fileSplit.getStart
+
+// splitEnd byte marker that the fileSplit ends at
+splitEnd = splitStart + fileSplit.getLength
+
+// the actual file we will be reading from
+val file = fileSplit.getPath
+
+// job configuration
+val job = context.getConfiguration
+
+// check compression
+val codec = new CompressionCodecFactory(job).getCodec(file)
+if (codec != null) {
+  throw new IOException(FixedLengthRecordReader does not support 
reading compressed files)
+}
+
+// get the record length
+recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
+
+// get the filesystem
+val fs = file.getFileSystem(job)
+
+// open the File
+fileInputStream = fs.open(file)
+
+// seek to the splitStart position
+fileInputStream.seek(splitStart)
+
+// set our current position
+currentPosition = splitStart
+
+  }
+
+  override def nextKeyValue(): Boolean = {
+
+if (recordKey == null) {
+  recordKey = new LongWritable()
+}
+
+// the key is a linear index of the record, given by the
+// position the record starts divided by the record length
+recordKey.set(currentPosition / recordLength)
+
+// the recordValue to place the bytes into
+if (recordValue == null) {
+  recordValue = new BytesWritable(new Array[Byte](recordLength))
+}
+
+// read a record if the currentPosition is less than the split end
+if (currentPosition  splitEnd) {
+
+  // setup a buffer to store the record
+  val buffer = recordValue.getBytes
+
+  fileInputStream.read(buffer, 0, recordLength)
+
+  // update our current position
+  currentPosition = currentPosition + recordLength
+
+  // return true
+  return true
+}
+
+false
+  }
 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18612406
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -224,6 +226,128 @@ class FileSuite extends FunSuite with 
LocalSparkContext {
 assert(output.map(_.toString).collect().toList === List((1,a), 
(2,aa), (3,aaa)))
   }
 
+  test(binary file input as byte array) {
+sc = new SparkContext(local, test)
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+channel.write(bbuf)
+channel.close()
+file.close()
+
+val inRdd = sc.binaryFiles(outFileName)
+val (infile: String, indata: PortableDataStream) = inRdd.first
+
+// Make sure the name and array match
+assert(infile.contains(outFileName)) // a prefix may get added
+assert(indata.toArray === testOutput)
+  }
+
+  test(portabledatastream caching tests) {
+sc = new SparkContext(local, test)
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+channel.write(bbuf)
+channel.close()
+file.close()
+
+val inRdd = sc.binaryFiles(outFileName).cache()
+inRdd.foreach{
+  curData: (String, PortableDataStream) =
+   curData._2.toArray() // force the file to read
+}
+val mappedRdd = inRdd.map{
+  curData: (String, PortableDataStream) =
+(curData._2.getPath(),curData._2)
+}
+val (infile: String, indata: PortableDataStream) = mappedRdd.first
+
+// Try reading the output back as an object file
+
+assert(indata.toArray === testOutput)
+  }
--- End diff --

Apart from the cache() test, try adding one where we call 
persist(StorageLevel.DISK_ONLY) to check that these are also stored correctly 
to disk. And add one where we use Kryo serialization too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-58429329
  
Hey Kevin, sorry for the delay in getting back to this. I just made a few 
more comments. I think this is getting pretty close, hopefully we can put it in 
1.2.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18613097
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -224,6 +226,128 @@ class FileSuite extends FunSuite with 
LocalSparkContext {
 assert(output.map(_.toString).collect().toList === List((1,a), 
(2,aa), (3,aaa)))
   }
 
+  test(binary file input as byte array) {
+sc = new SparkContext(local, test)
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+channel.write(bbuf)
+channel.close()
+file.close()
+
+val inRdd = sc.binaryFiles(outFileName)
+val (infile: String, indata: PortableDataStream) = inRdd.first
+
+// Make sure the name and array match
+assert(infile.contains(outFileName)) // a prefix may get added
+assert(indata.toArray === testOutput)
+  }
+
+  test(portabledatastream caching tests) {
+sc = new SparkContext(local, test)
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+channel.write(bbuf)
+channel.close()
+file.close()
+
+val inRdd = sc.binaryFiles(outFileName).cache()
+inRdd.foreach{
+  curData: (String, PortableDataStream) =
+   curData._2.toArray() // force the file to read
+}
+val mappedRdd = inRdd.map{
+  curData: (String, PortableDataStream) =
+(curData._2.getPath(),curData._2)
+}
+val (infile: String, indata: PortableDataStream) = mappedRdd.first
+
+// Try reading the output back as an object file
+
+assert(indata.toArray === testOutput)
+  }
+
+  test(portabledatastream flatmap tests) {
+sc = new SparkContext(local, test)
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val numOfCopies = 3
+val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+channel.write(bbuf)
+channel.close()
+file.close()
+
+val inRdd = sc.binaryFiles(outFileName)
+val mappedRdd = inRdd.map{
+  curData: (String, PortableDataStream) =
+(curData._2.getPath(),curData._2)
+}
+val copyRdd = mappedRdd.flatMap{
+  curData: (String, PortableDataStream) =
+for(i - 1 to numOfCopies) yield (i,curData._2)
+}
+
+val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect()
+
+// Try reading the output back as an object file
+assert(copyArr.length == numOfCopies)
+copyArr.foreach{
+  cEntry: (Int, PortableDataStream) =
+assert(cEntry._2.toArray === testOutput)
+}
+
+  }
+
+  test(fixed record length binary file as byte array) {
+// a fixed length of 6 bytes
+
+sc = new SparkContext(local, test)
+
+val outFile = new File(tempDir, record-bytestream-0.bin)
+val outFileName = outFile.getAbsolutePath()
+
+// create file
+val testOutput = Array[Byte](1,2,3,4,5,6)
+val testOutputCopies = 10
+
+// write data to file
+val file = new java.io.FileOutputStream(outFile)
+val channel = file.getChannel
+for(i - 1 to testOutputCopies) {
+  val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+  channel.write(bbuf)
+}
+channel.close()
+file.close()
+
+val inRdd = sc.binaryRecords(outFileName, testOutput.length)
+// make sure there are enough elements
+assert(inRdd.count == testOutputCopies)
+
+// now just compare the first one
+val indata: Array[Byte] = inRdd.first
+assert(indata === testOutput)
+  }
--- End diff --

Add a test where you try to read records with 0 or negative size too, which 
should raise an exception in the driver program


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18613144
  
--- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java ---
@@ -836,6 +839,84 @@ public void sequenceFile() {
 Assert.assertEquals(pairs, readRDD.collect());
   }
 
+@Test
+public void binaryFiles() throws Exception {
+// Reusing the wholeText files example
+byte[] content1 = spark is easy to use.\n.getBytes(utf-8);
+
+
+String tempDirName = tempDir.getAbsolutePath();
+File file1 = new File(tempDirName + /part-0);
+
+FileOutputStream fos1 = new FileOutputStream(file1);
+
+FileChannel channel1 = fos1.getChannel();
+ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+channel1.write(bbuf);
+channel1.close();
+JavaPairRDDString, PortableDataStream readRDD = 
sc.binaryFiles(tempDirName,3);
+ListTuple2String, PortableDataStream result = 
readRDD.collect();
+for (Tuple2String, PortableDataStream res : result) {
+Assert.assertArrayEquals(content1, res._2().toArray());
+}
+}
+
+@Test
+public void binaryFilesCaching() throws Exception {
+// Reusing the wholeText files example
+byte[] content1 = spark is easy to use.\n.getBytes(utf-8);
+
+
--- End diff --

Try not to have 2 blank lines in a row, except between classes if you have 
multiple classes in one file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18613166
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala 
---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.FSDataInputStream
+import org.apache.hadoop.io.compress.CompressionCodecFactory
+import org.apache.hadoop.io.{BytesWritable, LongWritable}
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+
+/**
+ *
+ * FixedLengthBinaryRecordReader is returned by 
FixedLengthBinaryInputFormat.
+ * It uses the record length set in FixedLengthBinaryInputFormat to
+ * read one record at a time from the given InputSplit.
+ *
+ * Each call to nextKeyValue() updates the LongWritable KEY and 
BytesWritable VALUE.
+ *
+ * KEY = record index (Long)
+ * VALUE = the record itself (BytesWritable)
+ *
+ */
+private[spark] class FixedLengthBinaryRecordReader
+  extends RecordReader[LongWritable, BytesWritable] {
+
+  override def close() {
+if (fileInputStream != null) {
+  fileInputStream.close()
+}
+  }
+
+  override def getCurrentKey: LongWritable = {
+recordKey
+  }
+
+  override def getCurrentValue: BytesWritable = {
+recordValue
+  }
+
+  override def getProgress: Float = {
+splitStart match {
+  case x if x == splitEnd = 0.0.toFloat
+  case _ = Math.min(
+((currentPosition - splitStart) / (splitEnd - 
splitStart)).toFloat, 1.0
+  ).toFloat
+}
+  }
+
+  override def initialize(inputSplit: InputSplit, context: 
TaskAttemptContext) {
+
+// the file input
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+
+// the byte position this fileSplit starts at
+splitStart = fileSplit.getStart
+
+// splitEnd byte marker that the fileSplit ends at
+splitEnd = splitStart + fileSplit.getLength
+
+// the actual file we will be reading from
+val file = fileSplit.getPath
+
+// job configuration
+val job = context.getConfiguration
+
+// check compression
+val codec = new CompressionCodecFactory(job).getCodec(file)
+if (codec != null) {
+  throw new IOException(FixedLengthRecordReader does not support 
reading compressed files)
+}
+
+// get the record length
+recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
+
+// get the filesystem
+val fs = file.getFileSystem(job)
+
+// open the File
+fileInputStream = fs.open(file)
+
+// seek to the splitStart position
+fileInputStream.seek(splitStart)
+
+// set our current position
+currentPosition = splitStart
+
--- End diff --

Avoid blank lines at end of methods


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18613254
  
--- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java ---
@@ -836,6 +839,84 @@ public void sequenceFile() {
 Assert.assertEquals(pairs, readRDD.collect());
   }
 
+@Test
+public void binaryFiles() throws Exception {
+// Reusing the wholeText files example
+byte[] content1 = spark is easy to use.\n.getBytes(utf-8);
+
+
+String tempDirName = tempDir.getAbsolutePath();
+File file1 = new File(tempDirName + /part-0);
+
+FileOutputStream fos1 = new FileOutputStream(file1);
+
+FileChannel channel1 = fos1.getChannel();
+ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+channel1.write(bbuf);
+channel1.close();
+JavaPairRDDString, PortableDataStream readRDD = 
sc.binaryFiles(tempDirName,3);
+ListTuple2String, PortableDataStream result = 
readRDD.collect();
+for (Tuple2String, PortableDataStream res : result) {
+Assert.assertArrayEquals(content1, res._2().toArray());
+}
--- End diff --

Test the versions of the methods that take only one argument too (the main 
purpose of the Java API suite is to make sure all these methods are callable 
from Java, this is why it's useful to add calls to them even if they're not 
doing much)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-08 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18613290
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+/** Allows better control of the partitioning
+  *
--- End diff --

This comment seems unrelated, why is it up here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-02 Thread kmader
Github user kmader commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18335807
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
+  extends Serializable {
+
+  private var fileIn: FSDataInputStream = 
null.asInstanceOf[FSDataInputStream]
+  private var isOpen = false
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  private lazy val path = {
+val pathp = split.getPath(index)
+pathp.toString
+  }
+
+  /**
+   * create a new DataInputStream from the split and context
+   */
+  def open(): FSDataInputStream = {
+val pathp = split.getPath(index)
+val fs = pathp.getFileSystem(context.getConfiguration)
+fileIn = fs.open(pathp)
+isOpen=true
+fileIn
+  }
+
+  /**
+   * close the file (if it is already open)
+   */
+  def close() = {
+if (isOpen) {
+  try {
+fileIn.close()
+isOpen=false
+  } catch {
+case ioe: java.io.IOException = // do nothing
+  }
+}
+  }
+  def getPath(): String = path
+}
+
+/**
+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader 
RecordReader]]
+ * to reading files out as streams
+ */
+abstract class StreamBasedRecordReader[T](
+   split: CombineFileSplit,
+   context: TaskAttemptContext,
+   index: Integer)
+  extends RecordReader[String, T] {
+
+
+
+  // True means the current file has been processed, then skip it.
+  private var processed = false
+
+  private var key = 
+  private var value: T = null.asInstanceOf[T]
+
+
+  override def initialize(split: InputSplit, context: TaskAttemptContext) 
= {}
+  override def close() = {}
+
+  override def getProgress = if (processed) 1.0f else 0.0f
+
+  override def getCurrentKey = key
+
+  override def getCurrentValue = value
+
+
+
+  override 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57623981
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21200/consoleFull)
 for   PR 1658 at commit 
[`a32fef7`](https://github.com/apache/spark/commit/a32fef7b4905ef098be9e4f73e15ebdfea6a545b).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57632461
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21200/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57632451
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21200/consoleFull)
 for   PR 1658 at commit 
[`a32fef7`](https://github.com/apache/spark/commit/a32fef7b4905ef098be9e4f73e15ebdfea6a545b).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(@transient isplit: CombineFileSplit,`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread kmader
Github user kmader commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18267344
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
+  extends Serializable {
+
+  private var fileIn: FSDataInputStream = 
null.asInstanceOf[FSDataInputStream]
+  private var isOpen = false
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  private lazy val path = {
+val pathp = split.getPath(index)
+pathp.toString
+  }
+
+  /**
+   * create a new DataInputStream from the split and context
+   */
+  def open(): FSDataInputStream = {
+val pathp = split.getPath(index)
+val fs = pathp.getFileSystem(context.getConfiguration)
+fileIn = fs.open(pathp)
+isOpen=true
+fileIn
+  }
+
+  /**
+   * close the file (if it is already open)
+   */
+  def close() = {
+if (isOpen) {
+  try {
+fileIn.close()
+isOpen=false
+  } catch {
+case ioe: java.io.IOException = // do nothing
+  }
+}
+  }
+  def getPath(): String = path
+}
+
+/**
+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader 
RecordReader]]
+ * to reading files out as streams
+ */
+abstract class StreamBasedRecordReader[T](
+   split: CombineFileSplit,
+   context: TaskAttemptContext,
+   index: Integer)
+  extends RecordReader[String, T] {
+
+
+
+  // True means the current file has been processed, then skip it.
+  private var processed = false
+
+  private var key = 
+  private var value: T = null.asInstanceOf[T]
+
+
+  override def initialize(split: InputSplit, context: TaskAttemptContext) 
= {}
+  override def close() = {}
+
+  override def getProgress = if (processed) 1.0f else 0.0f
+
+  override def getCurrentKey = key
+
+  override def getCurrentValue = value
+
+
+
+  override 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread kmader
Github user kmader commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18267674
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
+   * Get an RDD for a Hadoop-readable dataset as byte-streams for each file
+   * (useful for binary data)
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  RDD[(String, Array[Byte])] = {
+val job = new NewHadoopJob(hadoopConfiguration)
+NewFileInputFormat.addInputPath(job, new Path(path))
+val updateConf = job.getConfiguration
+new BinaryFileRDD(
+  this,
+  classOf[ByteInputFormat],
+  classOf[String],
+  classOf[Array[Byte]],
+  updateConf,
+  minPartitions).setName(path)
+  }
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset as PortableDataStream for 
each file
+   * (useful for binary data)
+   *
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Care must be taken to close the files afterwards
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  @DeveloperApi
+  def dataStreamFiles(path: String, minPartitions: Int = 
defaultMinPartitions):
+  RDD[(String, PortableDataStream)] = {
+val job = new NewHadoopJob(hadoopConfiguration)
+NewFileInputFormat.addInputPath(job, new Path(path))
+val updateConf = job.getConfiguration
+new BinaryFileRDD(
+  this,
+  classOf[StreamInputFormat],
+  classOf[String],
+  classOf[PortableDataStream],
+  updateConf,
+  minPartitions).setName(path)
+  }
+
+  /**
+   * Load data from a flat binary file, assuming each record is a set of 
numbers
+   * with the specified numerical format (see ByteBuffer), and the number 
of
+   * bytes per record is constant (see FixedLengthBinaryInputFormat)
+   *
+   * @param path Directory to the input data files
+   * @return An RDD of data with values, RDD[(Array[Byte])]
+   */
+  def binaryRecords(path: String): RDD[Array[Byte]] = {
--- End diff --

Yes this makes much more sense, I had just copied the code from 
@freeman-lab, but I made it into a parameter now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57440499
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21103/consoleFull)
 for   PR 1658 at commit 
[`238c83c`](https://github.com/apache/spark/commit/238c83cc9eeab7012aad1a3e2660aae31073a56d).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57440605
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21103/consoleFull)
 for   PR 1658 at commit 
[`238c83c`](https://github.com/apache/spark/commit/238c83cc9eeab7012aad1a3e2660aae31073a56d).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)`
  * `abstract class StreamBasedRecordReader[T](`
  * `abstract class BinaryRecordReader[T](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57440612
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21103/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57451347
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21108/consoleFull)
 for   PR 1658 at commit 
[`19812a8`](https://github.com/apache/spark/commit/19812a83df8a4852412feb7dec7f42126b0b139e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57451427
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21108/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57451425
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21108/consoleFull)
 for   PR 1658 at commit 
[`19812a8`](https://github.com/apache/spark/commit/19812a83df8a4852412feb7dec7f42126b0b139e).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(@transient isplit: CombineFileSplit, 
@transient context: TaskAttemptContext, index: Integer)`
  * `abstract class StreamBasedRecordReader[T](`
  * `abstract class BinaryRecordReader[T](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57452596
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21109/consoleFull)
 for   PR 1658 at commit 
[`4163e38`](https://github.com/apache/spark/commit/4163e38bccca33608fc4a241760e86d4862793b5).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57457522
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21109/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57457518
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21109/consoleFull)
 for   PR 1658 at commit 
[`4163e38`](https://github.com/apache/spark/commit/4163e38bccca33608fc4a241760e86d4862793b5).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(@transient isplit: CombineFileSplit,`
  * `abstract class StreamBasedRecordReader[T](`
  * `abstract class BinaryRecordReader[T](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57458416
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21110/consoleFull)
 for   PR 1658 at commit 
[`0588737`](https://github.com/apache/spark/commit/05887379eafdc359206753a68571aaf3fb2dd7a6).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57465007
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21110/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57464997
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21110/consoleFull)
 for   PR 1658 at commit 
[`0588737`](https://github.com/apache/spark/commit/05887379eafdc359206753a68571aaf3fb2dd7a6).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(@transient isplit: CombineFileSplit,`
  * `abstract class StreamBasedRecordReader[T](`
  * `abstract class BinaryRecordReader[T](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57469082
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21114/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57475609
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21117/consoleFull)
 for   PR 1658 at commit 
[`c27a8f1`](https://github.com/apache/spark/commit/c27a8f144eeb4e1c03d614691efae0241cca6fab).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57489219
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21117/consoleFull)
 for   PR 1658 at commit 
[`c27a8f1`](https://github.com/apache/spark/commit/c27a8f144eeb4e1c03d614691efae0241cca6fab).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(@transient isplit: CombineFileSplit,`
  * `abstract class StreamBasedRecordReader[T](`
  * `abstract class BinaryRecordReader[T](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-57489237
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21117/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18300208
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
+  extends Serializable {
+
+  private var fileIn: FSDataInputStream = 
null.asInstanceOf[FSDataInputStream]
+  private var isOpen = false
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  private lazy val path = {
+val pathp = split.getPath(index)
+pathp.toString
+  }
+
+  /**
+   * create a new DataInputStream from the split and context
+   */
+  def open(): FSDataInputStream = {
+val pathp = split.getPath(index)
+val fs = pathp.getFileSystem(context.getConfiguration)
+fileIn = fs.open(pathp)
+isOpen=true
+fileIn
+  }
+
+  /**
+   * close the file (if it is already open)
+   */
+  def close() = {
+if (isOpen) {
+  try {
+fileIn.close()
+isOpen=false
+  } catch {
+case ioe: java.io.IOException = // do nothing
+  }
+}
+  }
+  def getPath(): String = path
+}
+
+/**
+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader 
RecordReader]]
+ * to reading files out as streams
+ */
+abstract class StreamBasedRecordReader[T](
+   split: CombineFileSplit,
+   context: TaskAttemptContext,
+   index: Integer)
+  extends RecordReader[String, T] {
+
+
+
+  // True means the current file has been processed, then skip it.
+  private var processed = false
+
+  private var key = 
+  private var value: T = null.asInstanceOf[T]
+
+
+  override def initialize(split: InputSplit, context: TaskAttemptContext) 
= {}
+  override def close() = {}
+
+  override def getProgress = if (processed) 1.0f else 0.0f
+
+  override def getCurrentKey = key
+
+  override def getCurrentValue = value
+
+
+
+  override 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-10-01 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r18315773
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
+  extends Serializable {
+
+  private var fileIn: FSDataInputStream = 
null.asInstanceOf[FSDataInputStream]
+  private var isOpen = false
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  private lazy val path = {
+val pathp = split.getPath(index)
+pathp.toString
+  }
+
+  /**
+   * create a new DataInputStream from the split and context
+   */
+  def open(): FSDataInputStream = {
+val pathp = split.getPath(index)
+val fs = pathp.getFileSystem(context.getConfiguration)
+fileIn = fs.open(pathp)
+isOpen=true
+fileIn
+  }
+
+  /**
+   * close the file (if it is already open)
+   */
+  def close() = {
+if (isOpen) {
+  try {
+fileIn.close()
+isOpen=false
+  } catch {
+case ioe: java.io.IOException = // do nothing
+  }
+}
+  }
+  def getPath(): String = path
+}
+
+/**
+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader 
RecordReader]]
+ * to reading files out as streams
+ */
+abstract class StreamBasedRecordReader[T](
+   split: CombineFileSplit,
+   context: TaskAttemptContext,
+   index: Integer)
+  extends RecordReader[String, T] {
+
+
+
+  // True means the current file has been processed, then skip it.
+  private var processed = false
+
+  private var key = 
+  private var value: T = null.asInstanceOf[T]
+
+
+  override def initialize(split: InputSplit, context: TaskAttemptContext) 
= {}
+  override def close() = {}
+
+  override def getProgress = if (processed) 1.0f else 0.0f
+
+  override def getCurrentKey = key
+
+  override def getCurrentValue = value
+
+
+
+  override 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707338
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
+  extends Serializable {
+
+  private var fileIn: FSDataInputStream = 
null.asInstanceOf[FSDataInputStream]
+  private var isOpen = false
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  private lazy val path = {
+val pathp = split.getPath(index)
+pathp.toString
+  }
+
+  /**
+   * create a new DataInputStream from the split and context
+   */
+  def open(): FSDataInputStream = {
--- End diff --

Instead of returning this Hadoop data type, can we return a 
java.io.DataInputStream? It's easier to maintain in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707347
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
+  extends Serializable {
+
+  private var fileIn: FSDataInputStream = 
null.asInstanceOf[FSDataInputStream]
+  private var isOpen = false
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  private lazy val path = {
+val pathp = split.getPath(index)
+pathp.toString
+  }
+
+  /**
+   * create a new DataInputStream from the split and context
+   */
+  def open(): FSDataInputStream = {
+val pathp = split.getPath(index)
+val fs = pathp.getFileSystem(context.getConfiguration)
+fileIn = fs.open(pathp)
+isOpen=true
+fileIn
+  }
+
+  /**
+   * close the file (if it is already open)
+   */
+  def close() = {
+if (isOpen) {
+  try {
+fileIn.close()
+isOpen=false
+  } catch {
+case ioe: java.io.IOException = // do nothing
+  }
+}
+  }
+  def getPath(): String = path
+}
+
+/**
+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader 
RecordReader]]
+ * to reading files out as streams
+ */
+abstract class StreamBasedRecordReader[T](
+   split: CombineFileSplit,
+   context: TaskAttemptContext,
+   index: Integer)
--- End diff --

Formatting here is kind of wrong, the arguments should be indented only 4 
spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707360
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
+  extends Serializable {
+
+  private var fileIn: FSDataInputStream = 
null.asInstanceOf[FSDataInputStream]
--- End diff --

I think you can just write = null


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707399
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
--- End diff --

IMO we should call this BinaryData or DataStream instead of 
PortableDataStream, because the user doesn't really care that it's portable. I 
prefer BinaryData slightly more but I'd also be okay with DataStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707460
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
+   * Get an RDD for a Hadoop-readable dataset as byte-streams for each file
+   * (useful for binary data)
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  RDD[(String, Array[Byte])] = {
--- End diff --

What if we just added a toArray to PortableDataStream, and had only one 
method for reading these? Then you could do 
`sc.binaryFiles(...).map(_.toArray)` if you want to get byte arrays. Or would 
this cause a regression?

Basically my suggestion is to have binaryFiles, which will return an RDD of 
PortableDataStream, and binaryRecords, which will return an RDD of byte arrays 
of the same length (since I imagine there's no point streaming a record).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-55990853
  
Jenkins, add to whitelist and test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707495
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
--- End diff --

Put spaces around binary operators like `*` and `/`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-55990912
  
@kmader sorry for the delay on this, but the new version looks pretty good. 
Made a few more comments on the API. The other thing I see is that the code 
style is not quite matching the rest of the project in some places -- you can 
do sbt scalastyle to check it, or Jenkins will also check it. Will point out 
the bigger ones.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707506
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
+  extends Serializable {
+
+  private var fileIn: FSDataInputStream = 
null.asInstanceOf[FSDataInputStream]
+  private var isOpen = false
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  private lazy val path = {
+val pathp = split.getPath(index)
+pathp.toString
+  }
+
+  /**
+   * create a new DataInputStream from the split and context
+   */
+  def open(): FSDataInputStream = {
+val pathp = split.getPath(index)
+val fs = pathp.getFileSystem(context.getConfiguration)
+fileIn = fs.open(pathp)
+isOpen=true
+fileIn
+  }
+
+  /**
+   * close the file (if it is already open)
+   */
+  def close() = {
+if (isOpen) {
+  try {
+fileIn.close()
+isOpen=false
+  } catch {
+case ioe: java.io.IOException = // do nothing
+  }
+}
+  }
+  def getPath(): String = path
+}
+
+/**
+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader 
RecordReader]]
+ * to reading files out as streams
+ */
+abstract class StreamBasedRecordReader[T](
+   split: CombineFileSplit,
+   context: TaskAttemptContext,
+   index: Integer)
+  extends RecordReader[String, T] {
+
+
+
+  // True means the current file has been processed, then skip it.
+  private var processed = false
+
+  private var key = 
+  private var value: T = null.asInstanceOf[T]
+
+
+  override def initialize(split: InputSplit, context: TaskAttemptContext) 
= {}
+  override def close() = {}
+
+  override def getProgress = if (processed) 1.0f else 0.0f
+
+  override def getCurrentKey = key
+
+  override def getCurrentValue = value
+
+
+
+  override 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707520
  
--- Diff: core/src/main/scala/org/apache/spark/input/RawFileInput.scala ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import scala.collection.JavaConversions._
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.{FSDataInputStream, Path}
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import java.io.DataInputStream
+
+
+/**
+ *  A general format for reading whole files in as streams, byte arrays,
+ *  or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String,T]  {
+  override protected def isSplitable(context: JobContext, file: Path): 
Boolean = false
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API.
+   */
+  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+val files = listStatus(context)
+val totalLen = files.map { file =
+  if (file.isDir) 0L else file.getLen
+}.sum
+
+val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+  RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by 
not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)
+  extends Serializable {
+
+  private var fileIn: FSDataInputStream = 
null.asInstanceOf[FSDataInputStream]
+  private var isOpen = false
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  private lazy val path = {
+val pathp = split.getPath(index)
+pathp.toString
+  }
+
+  /**
+   * create a new DataInputStream from the split and context
+   */
+  def open(): FSDataInputStream = {
+val pathp = split.getPath(index)
+val fs = pathp.getFileSystem(context.getConfiguration)
+fileIn = fs.open(pathp)
+isOpen=true
+fileIn
+  }
+
+  /**
+   * close the file (if it is already open)
+   */
+  def close() = {
+if (isOpen) {
+  try {
+fileIn.close()
+isOpen=false
+  } catch {
+case ioe: java.io.IOException = // do nothing
+  }
+}
+  }
+  def getPath(): String = path
+}
+
+/**
+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader 
RecordReader]]
+ * to reading files out as streams
+ */
+abstract class StreamBasedRecordReader[T](
+   split: CombineFileSplit,
+   context: TaskAttemptContext,
+   index: Integer)
+  extends RecordReader[String, T] {
+
+
+
+  // True means the current file has been processed, then skip it.
+  private var processed = false
+
+  private var key = 
+  private var value: T = null.asInstanceOf[T]
+
+
+  override def initialize(split: InputSplit, context: TaskAttemptContext) 
= {}
+  override def close() = {}
+
+  override def getProgress = if (processed) 1.0f else 0.0f
+
+  override def getCurrentKey = key
+
+  override def getCurrentValue = value
+
+
+
+  override 

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-55990964
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20521/consoleFull)
 for   PR 1658 at commit 
[`932a206`](https://github.com/apache/spark/commit/932a2066edc45c666d94253b2c3e5f7763638b81).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707533
  
--- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java ---
@@ -836,6 +838,29 @@ public void sequenceFile() {
 Assert.assertEquals(pairs, readRDD.collect());
   }
 
+@Test
+public void binaryFiles() throws Exception {
+// Reusing the wholeText files example
+byte[] content1 = spark is easy to use.\n.getBytes(utf-8);
+
+
+String tempDirName = tempDir.getAbsolutePath();
+File file1 = new File(tempDirName + /part-0);
+
+FileOutputStream fos1 = new FileOutputStream(file1);
+
+FileChannel channel1 = fos1.getChannel();
+ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+channel1.write(bbuf);
+
+
+JavaPairRDDString, byte[] readRDD = 
sc.binaryFiles(tempDirName,3);
+ListTuple2String, byte[] result = readRDD.collect();
+for (Tuple2String, byte[] res : result) {
+Assert.assertArrayEquals(content1, res._2());
+}
+}
--- End diff --

Make sure to add tests for the new methods too (binaryRecords and data 
streams). For data streams, add a test where we cache them and go over some RDD 
multiple times, to make sure they can be re-read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17707574
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
+   * Get an RDD for a Hadoop-readable dataset as byte-streams for each file
+   * (useful for binary data)
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  RDD[(String, Array[Byte])] = {
+val job = new NewHadoopJob(hadoopConfiguration)
+NewFileInputFormat.addInputPath(job, new Path(path))
+val updateConf = job.getConfiguration
+new BinaryFileRDD(
+  this,
+  classOf[ByteInputFormat],
+  classOf[String],
+  classOf[Array[Byte]],
+  updateConf,
+  minPartitions).setName(path)
+  }
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset as PortableDataStream for 
each file
+   * (useful for binary data)
+   *
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Care must be taken to close the files afterwards
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  @DeveloperApi
+  def dataStreamFiles(path: String, minPartitions: Int = 
defaultMinPartitions):
+  RDD[(String, PortableDataStream)] = {
+val job = new NewHadoopJob(hadoopConfiguration)
+NewFileInputFormat.addInputPath(job, new Path(path))
+val updateConf = job.getConfiguration
+new BinaryFileRDD(
+  this,
+  classOf[StreamInputFormat],
+  classOf[String],
+  classOf[PortableDataStream],
+  updateConf,
+  minPartitions).setName(path)
+  }
+
+  /**
+   * Load data from a flat binary file, assuming each record is a set of 
numbers
+   * with the specified numerical format (see ByteBuffer), and the number 
of
+   * bytes per record is constant (see FixedLengthBinaryInputFormat)
+   *
+   * @param path Directory to the input data files
+   * @return An RDD of data with values, RDD[(Array[Byte])]
+   */
+  def binaryRecords(path: String): RDD[Array[Byte]] = {
--- End diff --

IMO we should pass the record length here rather than waiting for users to 
set it in hadoopConfiguration. You can make a new Configuration in this method 
and set it. I don't see any use case where the user would prefer to set it in a 
global configuration object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-55993418
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20521/consoleFull)
 for   PR 1658 at commit 
[`932a206`](https://github.com/apache/spark/commit/932a2066edc45c666d94253b2c3e5f7763638b81).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class PortableDataStream(split: CombineFileSplit, context: 
TaskAttemptContext, index: Integer)`
  * `abstract class StreamBasedRecordReader[T](`
  * `abstract class BinaryRecordReader[T](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-17 Thread freeman-lab
Github user freeman-lab commented on a diff in the pull request:

https://github.com/apache/spark/pull/1658#discussion_r17709043
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
+   * Get an RDD for a Hadoop-readable dataset as byte-streams for each file
+   * (useful for binary data)
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+  RDD[(String, Array[Byte])] = {
+val job = new NewHadoopJob(hadoopConfiguration)
+NewFileInputFormat.addInputPath(job, new Path(path))
+val updateConf = job.getConfiguration
+new BinaryFileRDD(
+  this,
+  classOf[ByteInputFormat],
+  classOf[String],
+  classOf[Array[Byte]],
+  updateConf,
+  minPartitions).setName(path)
+  }
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset as PortableDataStream for 
each file
+   * (useful for binary data)
+   *
+   *
+   * @param minPartitions A suggestion value of the minimal splitting 
number for input data.
+   *
+   * @note Care must be taken to close the files afterwards
+   * @note Small files are preferred, large file is also allowable, but 
may cause bad performance.
+   */
+  @DeveloperApi
+  def dataStreamFiles(path: String, minPartitions: Int = 
defaultMinPartitions):
+  RDD[(String, PortableDataStream)] = {
+val job = new NewHadoopJob(hadoopConfiguration)
+NewFileInputFormat.addInputPath(job, new Path(path))
+val updateConf = job.getConfiguration
+new BinaryFileRDD(
+  this,
+  classOf[StreamInputFormat],
+  classOf[String],
+  classOf[PortableDataStream],
+  updateConf,
+  minPartitions).setName(path)
+  }
+
+  /**
+   * Load data from a flat binary file, assuming each record is a set of 
numbers
+   * with the specified numerical format (see ByteBuffer), and the number 
of
+   * bytes per record is constant (see FixedLengthBinaryInputFormat)
+   *
+   * @param path Directory to the input data files
+   * @return An RDD of data with values, RDD[(Array[Byte])]
+   */
+  def binaryRecords(path: String): RDD[Array[Byte]] = {
--- End diff --

@mateiz I agree that makes sense. Originally I used the global 
configuration file so I could call it from PySpark, as in 
``sc.newAPIHadoopRDD('FixedLengthBinaryInputFormat',..., conf={'recordLength': 
'100'})``. But we can just add the new ``binaryRecords`` to PySpark directly 
(probably in a separate PR), and have record length an argument as you suggest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-16 Thread jrabary
Github user jrabary commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-55714177
  
Hi all,
I'm trying to use this patch to load a set of jpeg images but the path 
(key) of the output is empty 

 val image = sc.binaryFiles(data/*.jpg)
 image.take(1) foreach println
 (,[B@15435acb)

how can I correct this ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-16 Thread kmader
Github user kmader commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-55769371
  
Thanks @jrabary for this find, it had to do with the new method for 
handling PortableDataStreams which  didn't calculate the name correctly. I 
think I have it fixed now

```
val images = sc.binaryFiles(figure/*.png)
println(images.first)
```

Output
```
(figure/unnamed-chunk-2.png,[B@2f1a30ca)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-07 Thread kmader
Github user kmader commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-54744540
  
Hey @mateiz, Sorry, I had other projects to work on. I have made the 
changes and called the new class ```PortableDataStream```  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-54694543
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-09-05 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-54699324
  
Hey @kmader, just curious, did you think about this BinaryData object idea? 
If you don't have time to do it, someone else can also pick up from where you 
left off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

2014-08-14 Thread freeman-lab
Github user freeman-lab commented on the pull request:

https://github.com/apache/spark/pull/1658#issuecomment-52149331
  
@kmader great to incorporate the FixedRecord stuff into the PR, thanks! I 
like @mateiz's suggestion for naming the two methods. I was starting to work on 
``saveAsBinaryRecords`` so could put that together in a separate PR, maybe 
@kmader already has some save methods for the ``binaryFiles`` case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >