Hi Bing,

Good question and the answer is; it depends on what your use-case is.

If you really just want to write raw bytes, then you could create a
.foreach where you open an OutputStream and write it to some file. But this
is probably not what you want, and in practice not very handy since you
want to keep the records.

My suggestion would be to write it as Parquet or Avro, and write it to a
binary field. With Avro you have the bytes primitive which converts in
Spark to Array[Byte]: https://avro.apache.org/docs/1.9.1/spec.html Similar
to Parquet where you have the BYTE_ARRAY:
https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0

In the words of Linus Torvalds; *Talk is cheap, show me the code*:

MacBook-Pro-van-Fokko:~ fokkodriesprong$ spark-shell
20/01/16 10:58:44 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http://172.20.10.3:4040
Spark context available as 'sc' (master = local[*], app id =
local-1579168731763).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_172)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val data: Array[Array[Byte]] = Array(
     |   Array(0x19.toByte, 0x25.toByte)
     | )
data: Array[Array[Byte]] = Array(Array(25, 37))

scala> val rdd = sc.parallelize(data, 1);
rdd: org.apache.spark.rdd.RDD[Array[Byte]] = ParallelCollectionRDD[0] at
parallelize at <console>:26

scala> rdd.toDF("byte")
res1: org.apache.spark.sql.DataFrame = [byte: binary]

scala> val df = rdd.toDF("byte")
df: org.apache.spark.sql.DataFrame = [byte: binary]

scala> df.write.parquet("/tmp/bytes/")



MacBook-Pro-van-Fokko:~ fokkodriesprong$ ls -lah /tmp/bytes/
total 24
drwxr-xr-x   6 fokkodriesprong  wheel   192B 16 jan 11:01 .
drwxrwxrwt  16 root             wheel   512B 16 jan 11:01 ..
-rw-r--r--   1 fokkodriesprong  wheel     8B 16 jan 11:01 ._SUCCESS.crc
-rw-r--r--   1 fokkodriesprong  wheel    12B 16 jan 11:01
.part-00000-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet.crc
-rw-r--r--   1 fokkodriesprong  wheel     0B 16 jan 11:01 _SUCCESS
-rw-r--r--   1 fokkodriesprong  wheel   384B 16 jan 11:01
part-00000-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet

MacBook-Pro-van-Fokko:~ fokkodriesprong$ parquet-tools schema
/tmp/bytes/part-00000-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet

message spark_schema {
  optional binary byte;
}

Hope this helps.

Cheers, Fokko


Op do 16 jan. 2020 om 09:34 schreef Duan,Bing <duanb...@baidu.com>:

> Hi all:
>
> I read binary data(protobuf format) from filesystem by binaryFiles
> function to a RDD[Array[Byte]]   it works fine. But when I save the it to
> filesystem by saveAsTextFile, the quotation mark was be escaped like this:
> "\"201900002_1\"",1,24,0,2,"\"S66.000x001\””,    which  should
> be "201900002_1",1,24,0,2,”S66.000x001”.
>
> Anyone could give me some tip to implement a function
> like saveAsBinaryFile to persist the RDD[Array[Byte]]?
>
> Bests!
>
> Bing
>

Reply via email to