Re: How to implement a "saveAsBinaryFile" function?

2020-01-18 Thread jelmer
I think you could also try saveAsHadoopFile with a custom output format
like

https://github.com/amutu/tdw/blob/master/qe/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/protobuf/mapred/ProtobufOutputFormat.java

On Thu, 16 Jan 2020 at 09:34, Duan,Bing  wrote:

> Hi all:
>
> I read binary data(protobuf format) from filesystem by binaryFiles
> function to a RDD[Array[Byte]]   it works fine. But when I save the it to
> filesystem by saveAsTextFile, the quotation mark was be escaped like this:
> "\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which  should
> be "20192_1",1,24,0,2,”S66.000x001”.
>
> Anyone could give me some tip to implement a function
> like saveAsBinaryFile to persist the RDD[Array[Byte]]?
>
> Bests!
>
> Bing
>


Re: How to implement a "saveAsBinaryFile" function?

2020-01-17 Thread Duan,Bing
Hi Fokko, Maxim, Long:

Thanks!

This reading has been occurred in a custom datasource as below:

override def createRelation(…) {
…
blocks.map(block => (block.bytes)).saveAsTextFile(parameters("path”))
...
}

I am a new Sparker,  will try the those methods you guys provides.

Best!

Bing.

On Jan 17, 2020, at 4:28 AM, Maxim Gekk 
mailto:maxim.g...@databricks.com>> wrote:

Hi Bing,

You can try Text datasource. It shouldn't modify strings:
scala> 
Seq(""""20192_1",1,24,0,2,”S66.000x001”""").toDS.write.text("tmp/text.txt")
$ cat tmp/text.txt/part-0-256d960f-9f85-47fe-8edd-8428276eb3c6-c000.txt
"20192_1",1,24,0,2,”S66.000x001”

Maxim Gekk
Software Engineer
Databricks B. V. 
[http://go.databricks.com/hubfs/emails/Databricks-logo-bug.png] 
<http://databricks.com/>


On Thu, Jan 16, 2020 at 10:02 PM Long, Andrew 
mailto:loand...@amazon.com.invalid>> wrote:
Hey Bing,

There’s a couple different approaches you could take.  The quickest and easiest 
would be to use the existing APIs

val bytes = spark.range(1000

bytes.foreachPartition(bytes =>{
  //W ARNING anything used in here will need to be serializable.
  // There's some magic to serializing the hadoop conf. see the hadoop wrapper 
class in the source
  val writer = FileSystem.get(null).create(new Path("s3://..."))
  bytes.foreach(b => writer.write(b))
  writer.close()
})

The more complicated but pretty approach would be to either implement a custom 
datasource.

From: "Duan,Bing" mailto:duanb...@baidu.com>>
Date: Thursday, January 16, 2020 at 12:35 AM
To: "dev@spark.apache.org<mailto:dev@spark.apache.org>" 
mailto:dev@spark.apache.org>>
Subject: How to implement a "saveAsBinaryFile" function?

Hi all:

I read binary data(protobuf format) from filesystem by binaryFiles function to 
a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by 
saveAsTextFile, the quotation mark was be escaped like this:
"\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which  should be 
"20192_1",1,24,0,2,”S66.000x001”.

Anyone could give me some tip to implement a function like saveAsBinaryFile to 
persist the RDD[Array[Byte]]?

Bests!

Bing



Re: How to implement a "saveAsBinaryFile" function?

2020-01-16 Thread Maxim Gekk
Hi Bing,

You can try Text datasource. It shouldn't modify strings:
scala>
Seq(""""20192_1",1,24,0,2,”S66.000x001”""").toDS.write.text("tmp/text.txt")
$ cat tmp/text.txt/part-0-256d960f-9f85-47fe-8edd-8428276eb3c6-c000.txt
"20192_1",1,24,0,2,”S66.000x001”

Maxim Gekk

Software Engineer

Databricks B. V.  <http://databricks.com/>


On Thu, Jan 16, 2020 at 10:02 PM Long, Andrew 
wrote:

> Hey Bing,
>
>
>
> There’s a couple different approaches you could take.  The quickest and
> easiest would be to use the existing APIs
>
>
>
> val bytes = *spark*.range(1000
>
> bytes.foreachPartition(bytes =>{
>   //W ARNING anything used in here will need to be serializable.
>   // There's some magic to serializing the hadoop conf. see the hadoop
> wrapper class in the source
>   val writer = FileSystem.*get*(null).create(new Path("s3://..."))
>   bytes.foreach(b => writer.write(b))
>   writer.close()
> })
>
>
>
> The more complicated but pretty approach would be to either implement a
> custom datasource.
>
>
>
> *From: *"Duan,Bing" 
> *Date: *Thursday, January 16, 2020 at 12:35 AM
> *To: *"dev@spark.apache.org" 
> *Subject: *How to implement a "saveAsBinaryFile" function?
>
>
>
> Hi all:
>
>
>
> I read binary data(protobuf format) from filesystem by binaryFiles
> function to a RDD[Array[Byte]]   it works fine. But when I save the it to
> filesystem by saveAsTextFile, the quotation mark was be escaped like this:
>
> "\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which  should
> be "20192_1",1,24,0,2,”S66.000x001”.
>
>
>
> Anyone could give me some tip to implement a function
> like saveAsBinaryFile to persist the RDD[Array[Byte]]?
>
>
>
> Bests!
>
>
>
> Bing
>


Re: How to implement a "saveAsBinaryFile" function?

2020-01-16 Thread Long, Andrew
Hey Bing,

There’s a couple different approaches you could take.  The quickest and easiest 
would be to use the existing APIs

val bytes = spark.range(1000

bytes.foreachPartition(bytes =>{
  //W ARNING anything used in here will need to be serializable.
  // There's some magic to serializing the hadoop conf. see the hadoop wrapper 
class in the source
  val writer = FileSystem.get(null).create(new Path("s3://..."))
  bytes.foreach(b => writer.write(b))
  writer.close()
})

The more complicated but pretty approach would be to either implement a custom 
datasource.

From: "Duan,Bing" 
Date: Thursday, January 16, 2020 at 12:35 AM
To: "dev@spark.apache.org" 
Subject: How to implement a "saveAsBinaryFile" function?

Hi all:

I read binary data(protobuf format) from filesystem by binaryFiles function to 
a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by 
saveAsTextFile, the quotation mark was be escaped like this:
"\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which  should be 
"20192_1",1,24,0,2,”S66.000x001”.

Anyone could give me some tip to implement a function like saveAsBinaryFile to 
persist the RDD[Array[Byte]]?

Bests!

Bing


Re: How to implement a "saveAsBinaryFile" function?

2020-01-16 Thread Driesprong, Fokko
Hi Bing,

Good question and the answer is; it depends on what your use-case is.

If you really just want to write raw bytes, then you could create a
.foreach where you open an OutputStream and write it to some file. But this
is probably not what you want, and in practice not very handy since you
want to keep the records.

My suggestion would be to write it as Parquet or Avro, and write it to a
binary field. With Avro you have the bytes primitive which converts in
Spark to Array[Byte]: https://avro.apache.org/docs/1.9.1/spec.html Similar
to Parquet where you have the BYTE_ARRAY:
https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0

In the words of Linus Torvalds; *Talk is cheap, show me the code*:

MacBook-Pro-van-Fokko:~ fokkodriesprong$ spark-shell
20/01/16 10:58:44 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http://172.20.10.3:4040
Spark context available as 'sc' (master = local[*], app id =
local-1579168731763).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
  /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_172)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val data: Array[Array[Byte]] = Array(
 |   Array(0x19.toByte, 0x25.toByte)
 | )
data: Array[Array[Byte]] = Array(Array(25, 37))

scala> val rdd = sc.parallelize(data, 1);
rdd: org.apache.spark.rdd.RDD[Array[Byte]] = ParallelCollectionRDD[0] at
parallelize at :26

scala> rdd.toDF("byte")
res1: org.apache.spark.sql.DataFrame = [byte: binary]

scala> val df = rdd.toDF("byte")
df: org.apache.spark.sql.DataFrame = [byte: binary]

scala> df.write.parquet("/tmp/bytes/")



MacBook-Pro-van-Fokko:~ fokkodriesprong$ ls -lah /tmp/bytes/
total 24
drwxr-xr-x   6 fokkodriesprong  wheel   192B 16 jan 11:01 .
drwxrwxrwt  16 root wheel   512B 16 jan 11:01 ..
-rw-r--r--   1 fokkodriesprong  wheel 8B 16 jan 11:01 ._SUCCESS.crc
-rw-r--r--   1 fokkodriesprong  wheel12B 16 jan 11:01
.part-0-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet.crc
-rw-r--r--   1 fokkodriesprong  wheel 0B 16 jan 11:01 _SUCCESS
-rw-r--r--   1 fokkodriesprong  wheel   384B 16 jan 11:01
part-0-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet

MacBook-Pro-van-Fokko:~ fokkodriesprong$ parquet-tools schema
/tmp/bytes/part-0-d0d684bb-2371-4947-b2f3-6fca4ead69a7-c000.snappy.parquet

message spark_schema {
  optional binary byte;
}

Hope this helps.

Cheers, Fokko


Op do 16 jan. 2020 om 09:34 schreef Duan,Bing :

> Hi all:
>
> I read binary data(protobuf format) from filesystem by binaryFiles
> function to a RDD[Array[Byte]]   it works fine. But when I save the it to
> filesystem by saveAsTextFile, the quotation mark was be escaped like this:
> "\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which  should
> be "20192_1",1,24,0,2,”S66.000x001”.
>
> Anyone could give me some tip to implement a function
> like saveAsBinaryFile to persist the RDD[Array[Byte]]?
>
> Bests!
>
> Bing
>


How to implement a "saveAsBinaryFile" function?

2020-01-16 Thread Duan,Bing
Hi all:

I read binary data(protobuf format) from filesystem by binaryFiles function to 
a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by 
saveAsTextFile, the quotation mark was be escaped like this:
"\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which  should be 
"20192_1",1,24,0,2,”S66.000x001”.

Anyone could give me some tip to implement a function like saveAsBinaryFile to 
persist the RDD[Array[Byte]]?

Bests!

Bing