Thanks for your reply!

Actually, It is Ok when I use RDD.zip() like this:

1 def zipDatasets(m:Dataset[String], n:Dataset[Int])={

2 m.sparkSession.createDataset(m.rdd.zip(n.rdd));

3 }


But in my project, the type of Dataset is designated by the caller, so I 
introduce X,Y:


1 def zipDatasets[X: Encoder, Y: Encoder](m:Dataset[X], n:Dataset[Y])={

2 m.sparkSession.createDataset(m.rdd.zip(n.rdd));

3 }


It reports error because Y is unknown to the compiler, while the compiler needs 
ClassTag information of Y
Now I have no idea to fix it.

Regards,
bluejoe

发件人:  Anastasios Zouzias
答复:  <zouz...@gmail.com>
日期:  2017年9月14日 星期四 上午2:10
至:  bluejoe
抄送:  user
主题:  Re: compile error: No classtag available while calling RDD.zip()

Hi there,

If it is OK with you to work with DataFrames, you can do

https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c

import org.apache.spark.sql.Row   
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, 
LongType}  
 
  
 
val  df = sc.parallelize(Seq(  
 
    (1.0, 2.0), (0.0, -1.0),  
 
    (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")  
 
  
 
// Append "rowid" column of type Long  
 
val schema = df.schema  
 
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", 
LongType, false)))  
 
  
 
// Zip on RDD level  
 
val rddWithId = df.rdd.zipWithIndex  
 
// Convert back to DataFrame  
 
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => 
Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)  
 
  
 
// Show results  
 dfZippedWithId.show

Best,
Anastasios



On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏 <blue...@cnic.cn> wrote:
Hello,Since Dataset has no zip(..) methods, so I wrote following code to zip 
two datasets: 

1       def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: 
Dataset[X], n: Dataset[Y]) = {
2               val rdd = m.rdd.zip(n.rdd);
3               import spark.implicits._
4               spark.createDataset(rdd);
5       }

However, in the m.rdd.zip(…) call, compile error is reported:   No ClassTag 
available for Y

I know this error can be corrected when I declare Y as a ClassTag like this:

1       def foo[X: Encoder, Y: ClassTag](spark: SparkSession, …

But this will make line 5 report a new error:
        Unable to find encoder for type stored in a Dataset.

Now, I have no idea to solve this problem. How to declared Y as both an Encoder 
and a ClassTag?

Many thanks!

Best regards,
bluejoe
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-- 
-- Anastasios Zouzias

Reply via email to