if you are using the kryo encoder, you can only use it to to map to/from
kryo encoded binary data.  This is because spark does not understand kryo's
encoding, its just using it as an opaque blob of bytes.

On Mon, May 23, 2016 at 1:28 AM, Han JU <ju.han.fe...@gmail.com> wrote:

> Just one more question: does Dataset suppose to be able to cast data to an
> avro type? For a very simple format (a string and a long), I can cast it to
> a tuple or case class, but not an avro type (also contains only a string
> and a long).
>
> The error is like this for this very simple type:
>
> === Result of Batch Resolution ===
> !'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0,
> string])) null else input[0, string].toString, if (isnull(input[1,
> bigint])) null else input[1, bigint],
> StructField(auctionId,StringType,true), StructField(ts,LongType,true)),
> auctionId#0, ts#1L) AS #2]   Project [createexternalrow(if
> (isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L))
> null else ts#1L, StructField(auctionId,StringType,true),
> StructField(ts,LongType,true)) AS #2]
>  +- LocalRelation [auctionId#0,ts#1L]
>
>
>                                                     +- LocalRelation
> [auctionId#0,ts#1L]
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to
> map struct<auctionId:string,ts:bigint> to Tuple1, but failed as the number
> of fields does not line up.
>  - Input schema: struct<auctionId:string,ts:bigint>
>  - Target schema: struct<value:binary>;
> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org
> $apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:201)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:168)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57)
> at org.apache.spark.sql.Dataset.as(Dataset.scala:366)
> at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35)
> at Datasets$delayedInit$body.apply(Datasets.scala:23)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at Datasets$.main(Datasets.scala:23)
> at Datasets.main(Datasets.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> 2016-05-22 22:02 GMT+02:00 Michael Armbrust <mich...@databricks.com>:
>
>> That's definitely a bug.  If you can come up with a small reproduction it
>> would be great if you could open a JIRA.
>> On May 22, 2016 12:21 PM, "Han JU" <ju.han.fe...@gmail.com> wrote:
>>
>>> Hi Michael,
>>>
>>> The error is like this under 2.0.0-preview. In 1.6.1 the error is very
>>> similar if not exactly the same.
>>> The file is a parquet file containing avro objects.
>>>
>>> Thanks!
>>>
>>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
>>> failed to compile: org.codehaus.commons.compiler.CompileException: File
>>> 'generated.java', Line 25, Column 160: No applicable constructor/method
>>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
>>> candidates are: "public static java.nio.ByteBuffer
>>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
>>> java.nio.ByteBuffer.wrap(byte[], int, int)"
>>> /* 001 */
>>> /* 002 */ public java.lang.Object generate(Object[] references) {
>>> /* 003 */   return new SpecificSafeProjection(references);
>>> /* 004 */ }
>>> /* 005 */
>>> /* 006 */ class SpecificSafeProjection extends
>>> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
>>> /* 007 */
>>> /* 008 */   private Object[] references;
>>> /* 009 */   private MutableRow mutableRow;
>>> /* 010 */   private org.apache.spark.serializer.KryoSerializerInstance
>>> serializer;
>>> /* 011 */
>>> /* 012 */
>>> /* 013 */   public SpecificSafeProjection(Object[] references) {
>>> /* 014 */     this.references = references;
>>> /* 015 */     mutableRow = (MutableRow) references[references.length -
>>> 1];
>>> /* 016 */     serializer =
>>> (org.apache.spark.serializer.KryoSerializerInstance) new
>>> org.apache.spark.serializer.KryoSerializer(new
>>> org.apache.spark.SparkConf()).newInstance();
>>> /* 017 */   }
>>> /* 018 */
>>> /* 019 */   public java.lang.Object apply(java.lang.Object _i) {
>>> /* 020 */     InternalRow i = (InternalRow) _i;
>>> /* 021 */     /* decodeusingserializer(input[0,
>>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:str...
>>> */
>>> /* 022 */     /* input[0,
>>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:struct<id:string,seatbid:...
>>> */
>>> /* 023 */     boolean isNull1 = i.isNullAt(0);
>>> /* 024 */     InternalRow value1 = isNull1 ? null : (i.getStruct(0, 7));
>>> /* 025 */     final tv.teads.model.rtb.RtbResponseEvent value = isNull1
>>> ? null : (tv.teads.model.rtb.RtbResponseEvent)
>>> serializer.deserialize(java.nio.ByteBuffer.wrap(value1), null);
>>> /* 026 */     if (isNull1) {
>>> /* 027 */       mutableRow.setNullAt(0);
>>> /* 028 */     } else {
>>> /* 029 */
>>> /* 030 */       mutableRow.update(0, value);
>>> /* 031 */     }
>>> /* 032 */
>>> /* 033 */     return mutableRow;
>>> /* 034 */   }
>>> /* 035 */ }
>>> /* 036 */
>>>
>>> at
>>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>>> at
>>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>>> at
>>> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>>> at
>>> org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2349)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>>> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>>> at
>>> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:764)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:186)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:748)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:745)
>>> at
>>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:61)
>>> at
>>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:60)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 2016-05-20 22:51 GMT+02:00 Michael Armbrust <mich...@databricks.com>:
>>>
>>>> What is the error?  I would definitely expect it to work with kryo at
>>>> least.
>>>>
>>>>
>>>> On Fri, May 20, 2016 at 2:37 AM, Han JU <ju.han.fe...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0.
>>>>> However it does not seems to work with Avro data types:
>>>>>
>>>>>
>>>>> object Datasets extends App {
>>>>>   val conf = new SparkConf()
>>>>>   conf.setAppName("Dataset")
>>>>>   conf.setMaster("local[2]")
>>>>>   conf.setIfMissing("spark.serializer",
>>>>> classOf[KryoSerializer].getName)
>>>>>   conf.setIfMissing("spark.kryo.registrator",
>>>>> classOf[DatasetKryoRegistrator].getName)
>>>>>
>>>>>   val sc = new SparkContext(conf)
>>>>>   val sql = new SQLContext(sc)
>>>>>   import sql.implicits._
>>>>>
>>>>>   implicit val encoder = Encoders.kryo[MyAvroType]
>>>>>   val data = sql.read.parquet("path/to/data").as[MyAvroType]
>>>>>
>>>>>   var c = 0
>>>>>   // BUG here
>>>>>   val sizes = data.mapPartitions { iter =>
>>>>>     List(iter.size).iterator
>>>>>   }.collect().toList
>>>>>
>>>>>   println(c)
>>>>> }
>>>>>
>>>>>
>>>>> class DatasetKryoRegistrator extends KryoRegistrator {
>>>>>   override def registerClasses(kryo: Kryo) {
>>>>>     kryo.register(
>>>>>       classOf[MyAvroType],
>>>>> AvroSerializer.SpecificRecordBinarySerializer[MyAvroType])
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> I'm using chill-avro's kryo servirilizer for avro types and I've tried
>>>>> `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them
>>>>> works. The errors seems to be that the generated code does not compile 
>>>>> with
>>>>> janino.
>>>>>
>>>>> Tested in 1.6.1 and the 2.0.0-preview. Any idea?
>>>>>
>>>>> --
>>>>> *JU Han*
>>>>>
>>>>> Software Engineer @ Teads.tv
>>>>>
>>>>> +33 0619608888
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *JU Han*
>>>
>>> Software Engineer @ Teads.tv
>>>
>>> +33 0619608888
>>>
>>
>
>
> --
> *JU Han*
>
> Software Engineer @ Teads.tv
>
> +33 0619608888
>

Reply via email to