Re: Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions

2015-06-17 Thread Cheng Lian
Since SPARK-8406 is serious, we hope to ship it ASAP, possibly next 
week, but I can't say it's a promise yet. However, you can cherry pick 
the commit as soon as the fix is merged into branch-1.4. Sorry for the 
troubles!


Cheng

On 6/17/15 1:42 AM, Nathan McCarthy wrote:

Thanks Cheng. Nice find!

Let me know if there is anything we can do to help on this end with 
contributing a fix or testing.


Side note - any ideas on the 1.4.1 eta? There are a few bug fixes we 
need in there.


Cheers,
Nathan

From: Cheng Lian
Date: Wednesday, 17 June 2015 6:25 pm
To: Nathan, "user@spark.apache.org <mailto:user@spark.apache.org>"
Subject: Re: Spark 1.4 DataFrame Parquet file writing - missing random 
rows/partitions


Hi Nathan,

Thanks a lot for the detailed report, especially the information about 
nonconsecutive part numbers. It's confirmed to be a race condition bug 
and just filed https://issues.apache.org/jira/browse/SPARK-8406 to 
track this. Will deliver a fix ASAP and this will be included in 1.4.1.


Best,
Cheng

On 6/16/15 12:30 AM, Nathan McCarthy wrote:

Hi all,

Looks like data frame parquet writing is very broken in Spark 1.4.0. 
We had no problems with Spark 1.3.


When trying to save a data frame with *569610608* rows.

dfc.write.format("parquet").save(“/data/map_parquet_file")

We get random results between runs. Caching the data frame in memory 
makes no difference. It looks like the write out misses some of the 
RDD partitions. We have an RDD with *6750* partitions. When we write 
out we get less files out than the number of partitions. When reading 
the data back in and running a count, we get smaller number of rows.


I’ve tried counting the rows in all different ways. All return the 
same result, *560214031* rows, missing about 9.4 million rows (0.15%).


qc.read.parquet("/data/map_parquet_file").count
qc.read.parquet("/data/map_parquet_file").rdd.count
qc.read.parquet("/data/map_parquet_file").mapPartitions{itr => var c 
= 0; itr.foreach(_ => c = c + 1); Seq(c).toIterator }.reduce(_ + _)


Looking on HDFS the files, there are /6643/ .parquet files. 107 
missing partitions (about 0.15%).


Then writing out the same cached DF again to a new file gives *6717* 
files on hdfs (about 33 files missing or 0.5%);


dfc.write.parquet(“/data/map_parquet_file_2")

And we get *566670107* rows back (about 3million missing ~0.5%);

qc.read.parquet("/data/map_parquet_file_2").count

Writing the same df out to json writes the expected number (*6750*) 
of parquet files and returns the right number of rows /569610608/.


dfc.write.format("json").save("/data/map_parquet_file_3")
qc.read.format("json").load("/data/map_parquet_file_3").count

One thing to note is that the parquet part files on HDFS are not the 
normal sequential part numbers like for the json output and parquet 
output in Spark 1.3.


part-r-06151.gz.parquet  part-r-118401.gz.parquet 
 part-r-146249.gz.parquet  part-r-196755.gz.parquet 
 part-r-35811.gz.parquet part-r-55628.gz.parquet 
 part-r-73497.gz.parquet  part-r-97237.gz.parquet
part-r-06161.gz.parquet  part-r-118406.gz.parquet 
 part-r-146254.gz.parquet  part-r-196763.gz.parquet 
 part-r-35826.gz.parquet part-r-55647.gz.parquet 
 part-r-73500.gz.parquet  _SUCCESS


We are using MapR 4.0.2 for hdfs.

Any ideas?

Cheers,
Nathan







Re: Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions

2015-06-17 Thread Nathan McCarthy
Thanks Cheng. Nice find!

Let me know if there is anything we can do to help on this end with 
contributing a fix or testing.

Side note - any ideas on the 1.4.1 eta? There are a few bug fixes we need in 
there.

Cheers,
Nathan

From: Cheng Lian
Date: Wednesday, 17 June 2015 6:25 pm
To: Nathan, "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Spark 1.4 DataFrame Parquet file writing - missing random 
rows/partitions

Hi Nathan,

Thanks a lot for the detailed report, especially the information about 
nonconsecutive part numbers. It's confirmed to be a race condition bug and just 
filed https://issues.apache.org/jira/browse/SPARK-8406 to track this. Will 
deliver a fix ASAP and this will be included in 1.4.1.

Best,
Cheng

On 6/16/15 12:30 AM, Nathan McCarthy wrote:
Hi all,

Looks like data frame parquet writing is very broken in Spark 1.4.0. We had no 
problems with Spark 1.3.

When trying to save a data frame with 569610608 rows.

  dfc.write.format("parquet").save(“/data/map_parquet_file")

We get random results between runs. Caching the data frame in memory makes no 
difference. It looks like the write out misses some of the RDD partitions. We 
have an RDD with 6750 partitions. When we write out we get less files out than 
the number of partitions. When reading the data back in and running a count, we 
get smaller number of rows.

I’ve tried counting the rows in all different ways. All return the same result, 
560214031 rows, missing about 9.4 million rows (0.15%).

  qc.read.parquet("/data/map_parquet_file").count
  qc.read.parquet("/data/map_parquet_file").rdd.count
  qc.read.parquet("/data/map_parquet_file").mapPartitions{itr => var c = 0; 
itr.foreach(_ => c = c + 1); Seq(c).toIterator }.reduce(_ + _)

Looking on HDFS the files, there are 6643 .parquet files. 107 missing 
partitions (about 0.15%).

Then writing out the same cached DF again to a new file gives 6717 files on 
hdfs (about 33 files missing or 0.5%);

  dfc.write.parquet(“/data/map_parquet_file_2")

And we get 566670107 rows back (about 3million missing ~0.5%);

  qc.read.parquet("/data/map_parquet_file_2").count

Writing the same df out to json writes the expected number (6750) of parquet 
files and returns the right number of rows 569610608.

  dfc.write.format("json").save("/data/map_parquet_file_3")
  qc.read.format("json").load("/data/map_parquet_file_3").count

One thing to note is that the parquet part files on HDFS are not the normal 
sequential part numbers like for the json output and parquet output in Spark 
1.3.

part-r-06151.gz.parquet  part-r-118401.gz.parquet  part-r-146249.gz.parquet  
part-r-196755.gz.parquet  part-r-35811.gz.parquet   part-r-55628.gz.parquet  
part-r-73497.gz.parquet  part-r-97237.gz.parquet
part-r-06161.gz.parquet  part-r-118406.gz.parquet  part-r-146254.gz.parquet  
part-r-196763.gz.parquet  part-r-35826.gz.parquet   part-r-55647.gz.parquet  
part-r-73500.gz.parquet  _SUCCESS

We are using MapR 4.0.2 for hdfs.

Any ideas?

Cheers,
Nathan




Re: Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions

2015-06-17 Thread Cheng Lian

Hi Nathan,

Thanks a lot for the detailed report, especially the information about 
nonconsecutive part numbers. It's confirmed to be a race condition bug 
and just filed https://issues.apache.org/jira/browse/SPARK-8406 to track 
this. Will deliver a fix ASAP and this will be included in 1.4.1.


Best,
Cheng

On 6/16/15 12:30 AM, Nathan McCarthy wrote:

Hi all,

Looks like data frame parquet writing is very broken in Spark 1.4.0. 
We had no problems with Spark 1.3.


When trying to save a data frame with *569610608* rows.

dfc.write.format("parquet").save(“/data/map_parquet_file")

We get random results between runs. Caching the data frame in memory 
makes no difference. It looks like the write out misses some of the 
RDD partitions. We have an RDD with *6750* partitions. When we write 
out we get less files out than the number of partitions. When reading 
the data back in and running a count, we get smaller number of rows.


I’ve tried counting the rows in all different ways. All return the 
same result, *560214031* rows, missing about 9.4 million rows (0.15%).


qc.read.parquet("/data/map_parquet_file").count
qc.read.parquet("/data/map_parquet_file").rdd.count
qc.read.parquet("/data/map_parquet_file").mapPartitions{itr => var c = 
0; itr.foreach(_ => c = c + 1); Seq(c).toIterator }.reduce(_ + _)


Looking on HDFS the files, there are /6643/ .parquet files. 107 
missing partitions (about 0.15%).


Then writing out the same cached DF again to a new file gives *6717* 
files on hdfs (about 33 files missing or 0.5%);


dfc.write.parquet(“/data/map_parquet_file_2")

And we get *566670107* rows back (about 3million missing ~0.5%);

qc.read.parquet("/data/map_parquet_file_2").count

Writing the same df out to json writes the expected number (*6750*) of 
parquet files and returns the right number of rows /569610608/.


dfc.write.format("json").save("/data/map_parquet_file_3")
qc.read.format("json").load("/data/map_parquet_file_3").count

One thing to note is that the parquet part files on HDFS are not the 
normal sequential part numbers like for the json output and parquet 
output in Spark 1.3.


part-r-06151.gz.parquet  part-r-118401.gz.parquet 
 part-r-146249.gz.parquet  part-r-196755.gz.parquet 
 part-r-35811.gz.parquet part-r-55628.gz.parquet 
 part-r-73497.gz.parquet  part-r-97237.gz.parquet
part-r-06161.gz.parquet  part-r-118406.gz.parquet 
 part-r-146254.gz.parquet  part-r-196763.gz.parquet 
 part-r-35826.gz.parquet part-r-55647.gz.parquet 
 part-r-73500.gz.parquet  _SUCCESS


We are using MapR 4.0.2 for hdfs.

Any ideas?

Cheers,
Nathan





Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions

2015-06-16 Thread Nathan McCarthy
Hi all,

Looks like data frame parquet writing is very broken in Spark 1.4.0. We had no 
problems with Spark 1.3.

When trying to save a data frame with 569610608 rows.

  dfc.write.format("parquet").save(“/data/map_parquet_file")

We get random results between runs. Caching the data frame in memory makes no 
difference. It looks like the write out misses some of the RDD partitions. We 
have an RDD with 6750 partitions. When we write out we get less files out than 
the number of partitions. When reading the data back in and running a count, we 
get smaller number of rows.

I’ve tried counting the rows in all different ways. All return the same result, 
560214031 rows, missing about 9.4 million rows (0.15%).

  qc.read.parquet("/data/map_parquet_file").count
  qc.read.parquet("/data/map_parquet_file").rdd.count
  qc.read.parquet("/data/map_parquet_file").mapPartitions{itr => var c = 0; 
itr.foreach(_ => c = c + 1); Seq(c).toIterator }.reduce(_ + _)

Looking on HDFS the files, there are 6643 .parquet files. 107 missing 
partitions (about 0.15%).

Then writing out the same cached DF again to a new file gives 6717 files on 
hdfs (about 33 files missing or 0.5%);

  dfc.write.parquet(“/data/map_parquet_file_2")

And we get 566670107 rows back (about 3million missing ~0.5%);

  qc.read.parquet("/data/map_parquet_file_2").count

Writing the same df out to json writes the expected number (6750) of parquet 
files and returns the right number of rows 569610608.

  dfc.write.format("json").save("/data/map_parquet_file_3")
  qc.read.format("json").load("/data/map_parquet_file_3").count

One thing to note is that the parquet part files on HDFS are not the normal 
sequential part numbers like for the json output and parquet output in Spark 
1.3.

part-r-06151.gz.parquet  part-r-118401.gz.parquet  part-r-146249.gz.parquet  
part-r-196755.gz.parquet  part-r-35811.gz.parquet   part-r-55628.gz.parquet  
part-r-73497.gz.parquet  part-r-97237.gz.parquet
part-r-06161.gz.parquet  part-r-118406.gz.parquet  part-r-146254.gz.parquet  
part-r-196763.gz.parquet  part-r-35826.gz.parquet   part-r-55647.gz.parquet  
part-r-73500.gz.parquet  _SUCCESS

We are using MapR 4.0.2 for hdfs.

Any ideas?

Cheers,
Nathan