Re: How to implement a "saveAsBinaryFile" function?
I think you could also try saveAsHadoopFile with a custom output format like https://github.com/amutu/tdw/blob/master/qe/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/protobuf/mapred/ProtobufOutputFormat.java On Thu, 16 Jan 2020 at 09:34, Duan,Bing wrote: > Hi all: > > I read binary data(protobuf format) from filesystem by binaryFiles > function to a RDD[Array[Byte]] it works fine. But when I save the it to > filesystem by saveAsTextFile, the quotation mark was be escaped like this: > "\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which should > be "20192_1",1,24,0,2,”S66.000x001”. > > Anyone could give me some tip to implement a function > like saveAsBinaryFile to persist the RDD[Array[Byte]]? > > Bests! > > Bing >
Re: How to implement a "saveAsBinaryFile" function?
Hi Fokko, Maxim, Long: Thanks! This reading has been occurred in a custom datasource as below: override def createRelation(…) { … blocks.map(block => (block.bytes)).saveAsTextFile(parameters("path”)) ... } I am a new Sparker, will try the those methods you guys provides. Best! Bing. On Jan 17, 2020, at 4:28 AM, Maxim Gekk mailto:maxim.g...@databricks.com>> wrote: Hi Bing, You can try Text datasource. It shouldn't modify strings: scala> Seq(""""20192_1",1,24,0,2,”S66.000x001”""").toDS.write.text("tmp/text.txt") $ cat tmp/text.txt/part-0-256d960f-9f85-47fe-8edd-8428276eb3c6-c000.txt "20192_1",1,24,0,2,”S66.000x001” Maxim Gekk Software Engineer Databricks B. V. [http://go.databricks.com/hubfs/emails/Databricks-logo-bug.png] <http://databricks.com/> On Thu, Jan 16, 2020 at 10:02 PM Long, Andrew mailto:loand...@amazon.com.invalid>> wrote: Hey Bing, There’s a couple different approaches you could take. The quickest and easiest would be to use the existing APIs val bytes = spark.range(1000 bytes.foreachPartition(bytes =>{ //W ARNING anything used in here will need to be serializable. // There's some magic to serializing the hadoop conf. see the hadoop wrapper class in the source val writer = FileSystem.get(null).create(new Path("s3://...")) bytes.foreach(b => writer.write(b)) writer.close() }) The more complicated but pretty approach would be to either implement a custom datasource. From: "Duan,Bing" mailto:duanb...@baidu.com>> Date: Thursday, January 16, 2020 at 12:35 AM To: "dev@spark.apache.org<mailto:dev@spark.apache.org>" mailto:dev@spark.apache.org>> Subject: How to implement a "saveAsBinaryFile" function? Hi all: I read binary data(protobuf format) from filesystem by binaryFiles function to a RDD[Array[Byte]] it works fine. But when I save the it to filesystem by saveAsTextFile, the quotation mark was be escaped like this: "\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which should be "20192_1",1,24,0,2,”S66.000x001”. Anyone could give me some tip to implement a function like saveAsBinaryFile to persist the RDD[Array[Byte]]? Bests! Bing
Re: How to implement a "saveAsBinaryFile" function?
Hi Bing, You can try Text datasource. It shouldn't modify strings: scala> Seq(""""20192_1",1,24,0,2,”S66.000x001”""").toDS.write.text("tmp/text.txt") $ cat tmp/text.txt/part-0-256d960f-9f85-47fe-8edd-8428276eb3c6-c000.txt "20192_1",1,24,0,2,”S66.000x001” Maxim Gekk Software Engineer Databricks B. V. <http://databricks.com/> On Thu, Jan 16, 2020 at 10:02 PM Long, Andrew wrote: > Hey Bing, > > > > There’s a couple different approaches you could take. The quickest and > easiest would be to use the existing APIs > > > > val bytes = *spark*.range(1000 > > bytes.foreachPartition(bytes =>{ > //W ARNING anything used in here will need to be serializable. > // There's some magic to serializing the hadoop conf. see the hadoop > wrapper class in the source > val writer = FileSystem.*get*(null).create(new Path("s3://...")) > bytes.foreach(b => writer.write(b)) > writer.close() > }) > > > > The more complicated but pretty approach would be to either implement a > custom datasource. > > > > *From: *"Duan,Bing" > *Date: *Thursday, January 16, 2020 at 12:35 AM > *To: *"dev@spark.apache.org" > *Subject: *How to implement a "saveAsBinaryFile" function? > > > > Hi all: > > > > I read binary data(protobuf format) from filesystem by binaryFiles > function to a RDD[Array[Byte]] it works fine. But when I save the it to > filesystem by saveAsTextFile, the quotation mark was be escaped like this: > > "\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which should > be "20192_1",1,24,0,2,”S66.000x001”. > > > > Anyone could give me some tip to implement a function > like saveAsBinaryFile to persist the RDD[Array[Byte]]? > > > > Bests! > > > > Bing >
Re: How to implement a "saveAsBinaryFile" function?
Hey Bing, There’s a couple different approaches you could take. The quickest and easiest would be to use the existing APIs val bytes = spark.range(1000 bytes.foreachPartition(bytes =>{ //W ARNING anything used in here will need to be serializable. // There's some magic to serializing the hadoop conf. see the hadoop wrapper class in the source val writer = FileSystem.get(null).create(new Path("s3://...")) bytes.foreach(b => writer.write(b)) writer.close() }) The more complicated but pretty approach would be to either implement a custom datasource. From: "Duan,Bing" Date: Thursday, January 16, 2020 at 12:35 AM To: "dev@spark.apache.org" Subject: How to implement a "saveAsBinaryFile" function? Hi all: I read binary data(protobuf format) from filesystem by binaryFiles function to a RDD[Array[Byte]] it works fine. But when I save the it to filesystem by saveAsTextFile, the quotation mark was be escaped like this: "\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which should be "20192_1",1,24,0,2,”S66.000x001”. Anyone could give me some tip to implement a function like saveAsBinaryFile to persist the RDD[Array[Byte]]? Bests! Bing
Re: How to implement a "saveAsBinaryFile" function?
Hi Bing, Good question and the answer is; it depends on what your use-case is. If you really just want to write raw bytes, then you could create a .foreach where you open an OutputStream and write it to some file. But this is probably not what you want, and in practice not very handy since you want to keep the records. My suggestion would be to write it as Parquet or Avro, and write it to a binary field. With Avro you have the bytes primitive which converts in Spark to Array[Byte]: https://avro.apache.org/docs/1.9.1/spec.html Similar to Parquet where you have the BYTE_ARRAY: https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0 In the words of Linus Torvalds; *Talk is cheap, show me the code*: MacBook-Pro-van-Fokko:~ fokkodriesprong$ spark-shell 20/01/16 10:58:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://172.20.10.3:4040 Spark context available as 'sc' (master = local[*], app id = local-1579168731763). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_172) Type in expressions to have them evaluated. Type :help for more information. scala> val data: Array[Array[Byte]] = Array( | Array(0x19.toByte, 0x25.toByte) | ) data: Array[Array[Byte]] = Array(Array(25, 37)) scala> val rdd = sc.parallelize(data, 1); rdd: org.apache.spark.rdd.RDD[Array[Byte]] = ParallelCollectionRDD[0] at parallelize at :26 scala> rdd.toDF("byte") res1: org.apache.spark.sql.DataFrame = [byte: binary] scala> val df = rdd.toDF("byte") df: org.apache.spark.sql.DataFrame = [byte: binary] scala> df.write.parquet("/tmp/bytes/") MacBook-Pro-van-Fokko:~ fokkodriesprong$ ls -lah /tmp/bytes/ total 24 drwxr-xr-x 6 fokkodriesprong wheel 192B 16 jan 11:01 . drwxrwxrwt 16 root wheel 512B 16 jan 11:01 .. -rw-r--r-- 1 fokkodriesprong wheel 8B 16 jan 11:01 ._SUCCESS.crc -rw-r--r-- 1 fokkodriesprong wheel12B 16 jan 11:01 .part-0-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet.crc -rw-r--r-- 1 fokkodriesprong wheel 0B 16 jan 11:01 _SUCCESS -rw-r--r-- 1 fokkodriesprong wheel 384B 16 jan 11:01 part-0-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet MacBook-Pro-van-Fokko:~ fokkodriesprong$ parquet-tools schema /tmp/bytes/part-0-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet message spark_schema { optional binary byte; } Hope this helps. Cheers, Fokko Op do 16 jan. 2020 om 09:34 schreef Duan,Bing : > Hi all: > > I read binary data(protobuf format) from filesystem by binaryFiles > function to a RDD[Array[Byte]] it works fine. But when I save the it to > filesystem by saveAsTextFile, the quotation mark was be escaped like this: > "\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which should > be "20192_1",1,24,0,2,”S66.000x001”. > > Anyone could give me some tip to implement a function > like saveAsBinaryFile to persist the RDD[Array[Byte]]? > > Bests! > > Bing >
How to implement a "saveAsBinaryFile" function?
Hi all: I read binary data(protobuf format) from filesystem by binaryFiles function to a RDD[Array[Byte]] it works fine. But when I save the it to filesystem by saveAsTextFile, the quotation mark was be escaped like this: "\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which should be "20192_1",1,24,0,2,”S66.000x001”. Anyone could give me some tip to implement a function like saveAsBinaryFile to persist the RDD[Array[Byte]]? Bests! Bing