Venkat, No problem!
> So, creating a custom InputFormat or using sc.binaryFiles alone is not the > right solution. We also need the modified version of RDD.pipe to support > binary data? Is my understanding correct? Yep! That is correct. The custom InputFormat allows Spark to load binary formatted data from disk/HDFS/S3/etc…, but then the default RDD.pipe reads/writes text to a pipe, so you’d need the custom mapPartitions call. > If yes, this can be added as new enhancement Jira request? The code that I have right now is fairly custom to my application, but if there was interest, I would be glad to port it for the Spark core. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jan 22, 2015, at 7:11 AM, Venkat, Ankam <ankam.ven...@centurylink.com> wrote: > Thanks Frank for your response. > > So, creating a custom InputFormat or using sc.binaryFiles alone is not the > right solution. We also need the modified version of RDD.pipe to support > binary data? Is my understanding correct? > > If yes, this can be added as new enhancement Jira request? > > Nick: What’s your take on this? > > Regards, > Venkat Ankam > > > From: Frank Austin Nothaft [mailto:fnoth...@berkeley.edu] > Sent: Wednesday, January 21, 2015 12:30 PM > To: Venkat, Ankam > Cc: Nick Allen; user@spark.apache.org > Subject: Re: How to 'Pipe' Binary Data in Apache Spark > > Hi Venkat/Nick, > > The Spark RDD.pipe method pipes text data into a subprocess and then receives > text data back from that process. Once you have the binary data loaded into > an RDD properly, to pipe binary data to/from a subprocess (e.g., you want the > data in the pipes to contain binary, not text), you need to implement your > own, modified version of RDD.pipe. The implementation of RDD.pipe spawns a > process per partition (IIRC), as well as threads for writing to and reading > from the process (as well as stderr for the process). When writing via > RDD.pipe, Spark calls *.toString on the object, and pushes that text > representation down the pipe. There is an example of how to pipe binary data > from within a mapPartitions call using the Scala API in lines 107-177 of this > file. This specific code contains some nastiness around the packaging of > downstream libraries that we rely on in that project, so I’m not sure if it > is the cleanest way, but it is a workable way. > > Regards, > > Frank Austin Nothaft > fnoth...@berkeley.edu > fnoth...@eecs.berkeley.edu > 202-340-0466 > > On Jan 21, 2015, at 9:17 AM, Venkat, Ankam <ankam.ven...@centurylink.com> > wrote: > > > I am trying to solve similar problem. I am using option # 2 as suggested by > Nick. > > I have created an RDD with sc.binaryFiles for a list of .wav files. But, I > am not able to pipe it to the external programs. > > For example: > >>> sq = sc.binaryFiles("wavfiles") ß All .wav files stored on “wavfiles” > >>> directory on HDFS > >>> sq.keys().collect() ß works fine. Shows the list of file names. > >>> sq.values().collect() ß works fine. Shows the content of the files. > >>> sq.values().pipe(lambda x: subprocess.call(['/usr/local/bin/sox', '-t' > >>> 'wav', '-', '-n', 'stats'])).collect() ß Does not work. Tried different > >>> options. > AttributeError: 'function' object has no attribute 'read' > > Any suggestions? > > Regards, > Venkat Ankam > > From: Nick Allen [mailto:n...@nickallen.org] > Sent: Friday, January 16, 2015 11:46 AM > To: user@spark.apache.org > Subject: Re: How to 'Pipe' Binary Data in Apache Spark > > I just wanted to reiterate the solution for the benefit of the community. > > The problem is not from my use of 'pipe', but that 'textFile' cannot be used > to read in binary data. (Doh) There are a couple options to move forward. > > 1. Implement a custom 'InputFormat' that understands the binary input data. > (Per Sean Owen) > > 2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a > single record. This will impact performance as it prevents the use of more > than one mapper on the file's data. > > In my specific case for #1 I can only find one project from RIPE-NCC > (https://github.com/RIPE-NCC/hadoop-pcap) that does this. Unfortunately, it > appears to only support a limited set of network protocols. > > > On Fri, Jan 16, 2015 at 10:40 AM, Nick Allen <n...@nickallen.org> wrote: > Per your last comment, it appears I need something like this: > > https://github.com/RIPE-NCC/hadoop-pcap > > Thanks a ton. That get me oriented in the right direction. > > On Fri, Jan 16, 2015 at 10:20 AM, Sean Owen <so...@cloudera.com> wrote: > Well it looks like you're reading some kind of binary file as text. > That isn't going to work, in Spark or elsewhere, as binary data is not > even necessarily the valid encoding of a string. There are no line > breaks to delimit lines and thus elements of the RDD. > > Your input has some record structure (or else it's not really useful > to put it into an RDD). You can encode this as a SequenceFile and read > it with objectFile. > > You could also write a custom InputFormat that knows how to parse pcap > records directly. > > On Fri, Jan 16, 2015 at 3:09 PM, Nick Allen <n...@nickallen.org> wrote: > > I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe > > that binary data to an external program that will translate it to > > string/text data. Unfortunately, it seems that Spark is mangling the binary > > data before it gets passed to the external program. > > > > This code is representative of what I am trying to do. What am I doing > > wrong? How can I pipe binary data in Spark? Maybe it is getting corrupted > > when I read it in initially with 'textFile'? > > > > bin = sc.textFile("binary-data.dat") > > csv = bin.pipe ("/usr/bin/binary-to-csv.sh") > > csv.saveAsTextFile("text-data.csv") > > > > Specifically, I am trying to use Spark to transform pcap (packet capture) > > data to text/csv so that I can perform an analysis on it. > > > > Thanks! > > > > -- > > Nick Allen <n...@nickallen.org> > > > > -- > Nick Allen <n...@nickallen.org> > > > > -- > Nick Allen <n...@nickallen.org> > This communication is the property of CenturyLink and may contain > confidential or privileged information. Unauthorized use of this > communication is strictly prohibited and may be unlawful. If you have > received this communication in error, please immediately notify the sender by > reply e-mail and destroy all copies of the communication and any attachments. > > This communication is the property of CenturyLink and may contain > confidential or privileged information. Unauthorized use of this > communication is strictly prohibited and may be unlawful. If you have > received this communication in error, please immediately notify the sender by > reply e-mail and destroy all copies of the communication and any attachments.