Re: withColumn gives "Can only zip RDDs with same number of elements in each partition" but not with a LIMIT on the dataframe

2016-12-20 Thread Richard Startin
I think limit repartitions your data into a single partition if called as a non 
terminal operator. Hence zip works after limit because you only have one 
partition.

In practice, I have found joins to be much more applicable than zip because of 
the strict limitation of identical partitions.

https://richardstartin.com

On 20 Dec 2016, at 16:04, Jack Wenger 
mailto:jack.wenge...@gmail.com>> wrote:

Hello,

I'm facing a strange behaviour with Spark 1.5.0 (Cloudera 5.5.1).
I'm loading data from Hive with HiveContext (~42M records) and then try to add 
a new column with "withColumn" and a UDF.
Finally i'm suppose to create a new Hive table from this dataframe.


Here is the code :

_
_


DATETIME_TO_COMPARE = "-12-31 23:59:59.99"

myFunction = udf(lambda col: 0 if col != DATETIME_TO_COMPARE else 1, 
IntegerType())

df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable WHERE 
col4 == someValue")

df2 = df1.withColumn("myNewCol", myFunction(df1.col3))
df2.registerTempTable("df2")

hc.sql("create table my_db.new_table as select * from df2")

_
_


But I get this error :


py4j.protocol.Py4JJavaError: An error occurred while calling o36.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in 
stage 2.0 failed 4 times, most recent failure: Lost task 18.3 in stage 2.0 (TID 
186, lxpbda25.ra1.intra.groupama.fr): 
org.apache.spark.SparkException: Can only zip RDDs with same number of elements 
in each partition
at 
org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.hasNext(RDD.scala:832)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)




What is suprising is that if I modify the select statement by addind a LIMIT 
1 (which is more than twice the number of records in my table), then 
it's working :

_
_

df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable WHERE 
col4 == someValue" LIMIT 1)

_
_

In both cases, if I run a count() on df1, I'm getting the same number : 42 593 
052

Is it a bug or am I missing something ?
If it is not a bug, what am I doing wrong ?


Thank you !


Jack


withColumn gives "Can only zip RDDs with same number of elements in each partition" but not with a LIMIT on the dataframe

2016-12-20 Thread Jack Wenger
Hello,

I'm facing a strange behaviour with Spark 1.5.0 (Cloudera 5.5.1).
I'm loading data from Hive with HiveContext (~42M records) and then try to
add a new column with "withColumn" and a UDF.
Finally i'm suppose to create a new Hive table from this dataframe.


Here is the code :

_
_


DATETIME_TO_COMPARE = "-12-31 23:59:59.99"

myFunction = udf(lambda col: 0 if col != DATETIME_TO_COMPARE else 1,
IntegerType())

df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable
WHERE col4 == someValue")

df2 = df1.withColumn("myNewCol", myFunction(df1.col3))
df2.registerTempTable("df2")

hc.sql("create table my_db.new_table as select * from df2")

_
_


But I get this error :

py4j.protocol.Py4JJavaError: An error occurred while calling o36.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 18 in stage 2.0 failed 4 times, most recent failure: Lost task
18.3 in stage 2.0 (TID 186, lxpbda25.ra1.intra.groupama.fr):
org.apache.spark.SparkException: Can only zip RDDs with same number of
elements in each partition
at 
org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.hasNext(RDD.scala:832)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)





What is suprising is that if I modify the select statement by addind a
LIMIT 1 (which is more than twice the number of records in my
table), then it's working :

_
_

df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable
WHERE col4 == someValue" LIMIT 1)

_
_

In both cases, if I run a count() on df1, I'm getting the same number : 42
593 052

Is it a bug or am I missing something ?
If it is not a bug, what am I doing wrong ?


Thank you !


Jack