[jira] [Commented] (SPARK-21851) Spark 2.0 data corruption with cache and 200 columns

2017-08-28 Thread Anton Suchaneck (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144394#comment-16144394
 ] 

Anton Suchaneck commented on SPARK-21851:
-

Not quite production, but still for relevant work. Thanks for point it out. And 
I sure learned a lesson to watch the Jiras of x.0.0 versions ;) Actually 
judging by Hortonworks 2.5 and that Spark 1.6.2 is affected, you are screwed 
either way, even if you use the old Spark :-o

> Spark 2.0 data corruption with cache and 200 columns
> 
>
> Key: SPARK-21851
> URL: https://issues.apache.org/jira/browse/SPARK-21851
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Anton Suchaneck
>
> Doing a join and cache can corrupt data as shown here:
> {code}
> import pyspark.sql.functions as F
> num_rows=200
> for num_cols in range(198, 205):
> # create data frame with id and some dummy cols
> df1=spark.range(num_rows, numPartitions=100)
> for i in range(num_cols-1):
> df1=df1.withColumn("a"+str(i), F.lit("a"))
> # create data frame with id to join
> df2=spark.range(num_rows, numPartitions=100)
> # write and read to start "fresh"
> df1.write.parquet("delme_1.parquet", mode="overwrite")
> df2.write.parquet("delme_2.parquet", mode="overwrite")
> df1=spark.read.parquet("delme_1.parquet");
> df2=spark.read.parquet("delme_2.parquet");
> df3=df1.join(df2, "id", how="left").cache()   # this cache seems to make 
> a difference
> df4=df3.filter("id<10")
> print(len(df4.columns), df4.count(), df4.cache().count())   # second 
> cache gives different result
> {code}
> Output:
> {noformat}
> 198 10 10
> 199 10 10
> 200 10 10
> 201 12 12
> 202 12 12
> 203 16 16
> 204 10 12
> {noformat}
> Occasionally the middle number is also 10 (expected result) more often. Last 
> column may show different values, but 12 and 16 are common. Sometimes you can 
> try slightly higher num_rows to get this behaviour.
> Spark version is 2.0.0.2.5.0.0-1245 on a Redhat system on a multiple node 
> YARN cluster.
> I am happy to provide more information, if you let me know what is 
> interesting.
> It's not strictly `cache` which is the problem, since `toPandas` and 
> `collect` fall for the same behavior and I basically can hardly get the data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21851) Spark 2.0 data corruption with cache and 200 columns

2017-08-28 Thread Anton Suchaneck (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144376#comment-16144376
 ] 

Anton Suchaneck commented on SPARK-21851:
-

I wish upgrading was that easy when you are in industry and using Hortonworks. 
Scary, that this means a lot of users are still affected by this bug. Can 
someone confirm that this bug affects .cache only (and not toPandas or 
collect)? Then at least I have a way around it... until someone installs a 
newer Spark.

> Spark 2.0 data corruption with cache and 200 columns
> 
>
> Key: SPARK-21851
> URL: https://issues.apache.org/jira/browse/SPARK-21851
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Anton Suchaneck
>
> Doing a join and cache can corrupt data as shown here:
> {code}
> import pyspark.sql.functions as F
> num_rows=200
> for num_cols in range(198, 205):
> # create data frame with id and some dummy cols
> df1=spark.range(num_rows, numPartitions=100)
> for i in range(num_cols-1):
> df1=df1.withColumn("a"+str(i), F.lit("a"))
> # create data frame with id to join
> df2=spark.range(num_rows, numPartitions=100)
> # write and read to start "fresh"
> df1.write.parquet("delme_1.parquet", mode="overwrite")
> df2.write.parquet("delme_2.parquet", mode="overwrite")
> df1=spark.read.parquet("delme_1.parquet");
> df2=spark.read.parquet("delme_2.parquet");
> df3=df1.join(df2, "id", how="left").cache()   # this cache seems to make 
> a difference
> df4=df3.filter("id<10")
> print(len(df4.columns), df4.count(), df4.cache().count())   # second 
> cache gives different result
> {code}
> Output:
> {noformat}
> 198 10 10
> 199 10 10
> 200 10 10
> 201 12 12
> 202 12 12
> 203 16 16
> 204 10 12
> {noformat}
> Occasionally the middle number is also 10 (expected result) more often. Last 
> column may show different values, but 12 and 16 are common. Sometimes you can 
> try slightly higher num_rows to get this behaviour.
> Spark version is 2.0.0.2.5.0.0-1245 on a Redhat system on a multiple node 
> YARN cluster.
> I am happy to provide more information, if you let me know what is 
> interesting.
> It's not strictly `cache` which is the problem, since `toPandas` and 
> `collect` fall for the same behavior and I basically can hardly get the data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21851) Spark 2.0 data corruption with cache and 200 columns

2017-08-28 Thread Anton Suchaneck (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144108#comment-16144108
 ] 

Anton Suchaneck commented on SPARK-21851:
-

I tried on a VM with Spark 2.1 and did not see this bug. I had missed that 
Spark 2.0.0 is affected by that older ticket.
So if I'm stuck with Spark 2.0.0, will avoiding cache() circumvent this issue? 
:/

> Spark 2.0 data corruption with cache and 200 columns
> 
>
> Key: SPARK-21851
> URL: https://issues.apache.org/jira/browse/SPARK-21851
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Anton Suchaneck
>
> Doing a join and cache can corrupt data as shown here:
> {code}
> import pyspark.sql.functions as F
> num_rows=200
> for num_cols in range(198, 205):
> # create data frame with id and some dummy cols
> df1=spark.range(num_rows, numPartitions=100)
> for i in range(num_cols-1):
> df1=df1.withColumn("a"+str(i), F.lit("a"))
> # create data frame with id to join
> df2=spark.range(num_rows, numPartitions=100)
> # write and read to start "fresh"
> df1.write.parquet("delme_1.parquet", mode="overwrite")
> df2.write.parquet("delme_2.parquet", mode="overwrite")
> df1=spark.read.parquet("delme_1.parquet");
> df2=spark.read.parquet("delme_2.parquet");
> df3=df1.join(df2, "id", how="left").cache()   # this cache seems to make 
> a difference
> df4=df3.filter("id<10")
> print(len(df4.columns), df4.count(), df4.cache().count())   # second 
> cache gives different result
> {code}
> Output:
> {noformat}
> 198 10 10
> 199 10 10
> 200 10 10
> 201 12 12
> 202 12 12
> 203 16 16
> 204 10 12
> {noformat}
> Occasionally the middle number is also 10 (expected result) more often. Last 
> column may show different values, but 12 and 16 are common. Sometimes you can 
> try slightly higher num_rows to get this behaviour.
> Spark version is 2.0.0.2.5.0.0-1245 on a Redhat system on a multiple node 
> YARN cluster.
> I am happy to provide more information, if you let me know what is 
> interesting.
> It's not strictly `cache` which is the problem, since `toPandas` and 
> `collect` fall for the same behavior and I basically can hardly get the data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21851) Spark 2.0 data corruption with cache and 200 columns

2017-08-28 Thread Anton Suchaneck (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Suchaneck updated SPARK-21851:

Description: 
Doing a join and cache can corrupt data as shown here:

{code}
import pyspark.sql.functions as F

num_rows=200

for num_cols in range(198, 205):
# create data frame with id and some dummy cols
df1=spark.range(num_rows, numPartitions=100)
for i in range(num_cols-1):
df1=df1.withColumn("a"+str(i), F.lit("a"))
# create data frame with id to join
df2=spark.range(num_rows, numPartitions=100)

# write and read to start "fresh"
df1.write.parquet("delme_1.parquet", mode="overwrite")
df2.write.parquet("delme_2.parquet", mode="overwrite")

df1=spark.read.parquet("delme_1.parquet");
df2=spark.read.parquet("delme_2.parquet");

df3=df1.join(df2, "id", how="left").cache()   # this cache seems to make a 
difference
df4=df3.filter("id<10")

print(len(df4.columns), df4.count(), df4.cache().count())   # second cache 
gives different result
{code}
Output:
{noformat}
198 10 10
199 10 10
200 10 10
201 12 12
202 12 12
203 16 16
204 10 12
{noformat}
Occasionally the middle number is also 10 (expected result) more often. Last 
column may show different values, but 12 and 16 are common. Sometimes you can 
try slightly higher num_rows to get this behaviour.

Spark version is 2.0.0.2.5.0.0-1245 on a Redhat system on a multiple node YARN 
cluster.

I am happy to provide more information, if you let me know what is interesting.

It's not strictly `cache` which is the problem, since `toPandas` and `collect` 
fall for the same behavior and I basically can hardly get the data.

  was:
Doing a join and cache can corrupt data as shown here:

{code:Python}
import pyspark.sql.functions as F

num_rows=200

for num_cols in range(198, 205):
# create data frame with id and some dummy cols
df1=spark.range(num_rows, numPartitions=100)
for i in range(num_cols-1):
df1=df1.withColumn("a"+str(i), F.lit("a"))
# create data frame with id to join
df2=spark.range(num_rows, numPartitions=100)

# write and read to start "fresh"
df1.write.parquet("delme_1.parquet", mode="overwrite")
df2.write.parquet("delme_2.parquet", mode="overwrite")

df1=spark.read.parquet("delme_1.parquet");
df2=spark.read.parquet("delme_2.parquet");

df3=df1.join(df2, "id", how="left").cache()   # this cache seems to make a 
difference
df4=df3.filter("id<10")

print(len(df4.columns), df4.count(), df4.cache().count())   # second cache 
gives different result
{code}
Output:
{noformat}
198 10 10
199 10 10
200 10 10
201 12 12
202 12 12
203 16 16
204 10 12
{noformat}
Occasionally the middle number is also 10 (expected result) more often. Last 
column may show different values, but 12 and 16 are common. Sometimes you can 
try slightly higher num_rows to get this behaviour.

Spark version is 2.0.0.2.5.0.0-1245 on a Redhat system on a multiple node YARN 
cluster.

I am happy to provide more information, if you let me know what is interesting.

It's not strictly `cache` which is the problem, since `toPandas` and `collect` 
fall for the same behavior and I basically can hardly get the data.


> Spark 2.0 data corruption with cache and 200 columns
> 
>
> Key: SPARK-21851
> URL: https://issues.apache.org/jira/browse/SPARK-21851
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Anton Suchaneck
>
> Doing a join and cache can corrupt data as shown here:
> {code}
> import pyspark.sql.functions as F
> num_rows=200
> for num_cols in range(198, 205):
> # create data frame with id and some dummy cols
> df1=spark.range(num_rows, numPartitions=100)
> for i in range(num_cols-1):
> df1=df1.withColumn("a"+str(i), F.lit("a"))
> # create data frame with id to join
> df2=spark.range(num_rows, numPartitions=100)
> # write and read to start "fresh"
> df1.write.parquet("delme_1.parquet", mode="overwrite")
> df2.write.parquet("delme_2.parquet", mode="overwrite")
> df1=spark.read.parquet("delme_1.parquet");
> df2=spark.read.parquet("delme_2.parquet");
> df3=df1.join(df2, "id", how="left").cache()   # this cache seems to make 
> a difference
> df4=df3.filter("id<10")
> print(len(df4.columns), df4.count(), df4.cache().count())   # second 
> cache gives different result
> {code}
> Output:
> {noformat}
> 198 10 10
> 199 10 10
> 200 10 10
> 201 12 12
> 202 12 12
> 203 16 16
> 204 10 12
> {noformat}
> Occasionally the middle number is also 10 (expected result) more often. Last 
> column may show different values, but 12 and 16 are common. Sometimes you can 
> try slightly higher num_rows to get this behaviour.
> Spark version is 2.0.0.2.5.0.0-1245 on a Redhat s

[jira] [Created] (SPARK-21851) Spark 2.0 data corruption with cache and 200 columns

2017-08-28 Thread Anton Suchaneck (JIRA)
Anton Suchaneck created SPARK-21851:
---

 Summary: Spark 2.0 data corruption with cache and 200 columns
 Key: SPARK-21851
 URL: https://issues.apache.org/jira/browse/SPARK-21851
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.0.0
Reporter: Anton Suchaneck


Doing a join and cache can corrupt data as shown here:

{code:Python}
import pyspark.sql.functions as F

num_rows=200

for num_cols in range(198, 205):
# create data frame with id and some dummy cols
df1=spark.range(num_rows, numPartitions=100)
for i in range(num_cols-1):
df1=df1.withColumn("a"+str(i), F.lit("a"))
# create data frame with id to join
df2=spark.range(num_rows, numPartitions=100)

# write and read to start "fresh"
df1.write.parquet("delme_1.parquet", mode="overwrite")
df2.write.parquet("delme_2.parquet", mode="overwrite")

df1=spark.read.parquet("delme_1.parquet");
df2=spark.read.parquet("delme_2.parquet");

df3=df1.join(df2, "id", how="left").cache()   # this cache seems to make a 
difference
df4=df3.filter("id<10")

print(len(df4.columns), df4.count(), df4.cache().count())   # second cache 
gives different result
{code}
Output:
{noformat}
198 10 10
199 10 10
200 10 10
201 12 12
202 12 12
203 16 16
204 10 12
{noformat}
Occasionally the middle number is also 10 (expected result) more often. Last 
column may show different values, but 12 and 16 are common. Sometimes you can 
try slightly higher num_rows to get this behaviour.

Spark version is 2.0.0.2.5.0.0-1245 on a Redhat system on a multiple node YARN 
cluster.

I am happy to provide more information, if you let me know what is interesting.

It's not strictly `cache` which is the problem, since `toPandas` and `collect` 
fall for the same behavior and I basically can hardly get the data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org