if you are using the kryo encoder, you can only use it to to map to/from kryo encoded binary data. This is because spark does not understand kryo's encoding, its just using it as an opaque blob of bytes.
On Mon, May 23, 2016 at 1:28 AM, Han JU <ju.han.fe...@gmail.com> wrote: > Just one more question: does Dataset suppose to be able to cast data to an > avro type? For a very simple format (a string and a long), I can cast it to > a tuple or case class, but not an avro type (also contains only a string > and a long). > > The error is like this for this very simple type: > > === Result of Batch Resolution === > !'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0, > string])) null else input[0, string].toString, if (isnull(input[1, > bigint])) null else input[1, bigint], > StructField(auctionId,StringType,true), StructField(ts,LongType,true)), > auctionId#0, ts#1L) AS #2] Project [createexternalrow(if > (isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L)) > null else ts#1L, StructField(auctionId,StringType,true), > StructField(ts,LongType,true)) AS #2] > +- LocalRelation [auctionId#0,ts#1L] > > > +- LocalRelation > [auctionId#0,ts#1L] > > Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to > map struct<auctionId:string,ts:bigint> to Tuple1, but failed as the number > of fields does not line up. > - Input schema: struct<auctionId:string,ts:bigint> > - Target schema: struct<value:binary>; > at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org > $apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:201) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:168) > at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57) > at org.apache.spark.sql.Dataset.as(Dataset.scala:366) > at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35) > at Datasets$delayedInit$body.apply(Datasets.scala:23) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at Datasets$.main(Datasets.scala:23) > at Datasets.main(Datasets.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > > 2016-05-22 22:02 GMT+02:00 Michael Armbrust <mich...@databricks.com>: > >> That's definitely a bug. If you can come up with a small reproduction it >> would be great if you could open a JIRA. >> On May 22, 2016 12:21 PM, "Han JU" <ju.han.fe...@gmail.com> wrote: >> >>> Hi Michael, >>> >>> The error is like this under 2.0.0-preview. In 1.6.1 the error is very >>> similar if not exactly the same. >>> The file is a parquet file containing avro objects. >>> >>> Thanks! >>> >>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: >>> failed to compile: org.codehaus.commons.compiler.CompileException: File >>> 'generated.java', Line 25, Column 160: No applicable constructor/method >>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow"; >>> candidates are: "public static java.nio.ByteBuffer >>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer >>> java.nio.ByteBuffer.wrap(byte[], int, int)" >>> /* 001 */ >>> /* 002 */ public java.lang.Object generate(Object[] references) { >>> /* 003 */ return new SpecificSafeProjection(references); >>> /* 004 */ } >>> /* 005 */ >>> /* 006 */ class SpecificSafeProjection extends >>> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection { >>> /* 007 */ >>> /* 008 */ private Object[] references; >>> /* 009 */ private MutableRow mutableRow; >>> /* 010 */ private org.apache.spark.serializer.KryoSerializerInstance >>> serializer; >>> /* 011 */ >>> /* 012 */ >>> /* 013 */ public SpecificSafeProjection(Object[] references) { >>> /* 014 */ this.references = references; >>> /* 015 */ mutableRow = (MutableRow) references[references.length - >>> 1]; >>> /* 016 */ serializer = >>> (org.apache.spark.serializer.KryoSerializerInstance) new >>> org.apache.spark.serializer.KryoSerializer(new >>> org.apache.spark.SparkConf()).newInstance(); >>> /* 017 */ } >>> /* 018 */ >>> /* 019 */ public java.lang.Object apply(java.lang.Object _i) { >>> /* 020 */ InternalRow i = (InternalRow) _i; >>> /* 021 */ /* decodeusingserializer(input[0, >>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:str... >>> */ >>> /* 022 */ /* input[0, >>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:struct<id:string,seatbid:... >>> */ >>> /* 023 */ boolean isNull1 = i.isNullAt(0); >>> /* 024 */ InternalRow value1 = isNull1 ? null : (i.getStruct(0, 7)); >>> /* 025 */ final tv.teads.model.rtb.RtbResponseEvent value = isNull1 >>> ? null : (tv.teads.model.rtb.RtbResponseEvent) >>> serializer.deserialize(java.nio.ByteBuffer.wrap(value1), null); >>> /* 026 */ if (isNull1) { >>> /* 027 */ mutableRow.setNullAt(0); >>> /* 028 */ } else { >>> /* 029 */ >>> /* 030 */ mutableRow.update(0, value); >>> /* 031 */ } >>> /* 032 */ >>> /* 033 */ return mutableRow; >>> /* 034 */ } >>> /* 035 */ } >>> /* 036 */ >>> >>> at >>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) >>> at >>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) >>> at >>> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) >>> at >>> org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) >>> at >>> org.spark_project.guava.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620) >>> at >>> org.spark_project.guava.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362) >>> at >>> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2349) >>> at >>> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) >>> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) >>> at >>> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) >>> at >>> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) >>> at >>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:764) >>> at >>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:186) >>> at >>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36) >>> at >>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:748) >>> at >>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:745) >>> at >>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:61) >>> at >>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:60) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) >>> at org.apache.spark.scheduler.Task.run(Task.scala:85) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> 2016-05-20 22:51 GMT+02:00 Michael Armbrust <mich...@databricks.com>: >>> >>>> What is the error? I would definitely expect it to work with kryo at >>>> least. >>>> >>>> >>>> On Fri, May 20, 2016 at 2:37 AM, Han JU <ju.han.fe...@gmail.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. >>>>> However it does not seems to work with Avro data types: >>>>> >>>>> >>>>> object Datasets extends App { >>>>> val conf = new SparkConf() >>>>> conf.setAppName("Dataset") >>>>> conf.setMaster("local[2]") >>>>> conf.setIfMissing("spark.serializer", >>>>> classOf[KryoSerializer].getName) >>>>> conf.setIfMissing("spark.kryo.registrator", >>>>> classOf[DatasetKryoRegistrator].getName) >>>>> >>>>> val sc = new SparkContext(conf) >>>>> val sql = new SQLContext(sc) >>>>> import sql.implicits._ >>>>> >>>>> implicit val encoder = Encoders.kryo[MyAvroType] >>>>> val data = sql.read.parquet("path/to/data").as[MyAvroType] >>>>> >>>>> var c = 0 >>>>> // BUG here >>>>> val sizes = data.mapPartitions { iter => >>>>> List(iter.size).iterator >>>>> }.collect().toList >>>>> >>>>> println(c) >>>>> } >>>>> >>>>> >>>>> class DatasetKryoRegistrator extends KryoRegistrator { >>>>> override def registerClasses(kryo: Kryo) { >>>>> kryo.register( >>>>> classOf[MyAvroType], >>>>> AvroSerializer.SpecificRecordBinarySerializer[MyAvroType]) >>>>> } >>>>> } >>>>> >>>>> >>>>> I'm using chill-avro's kryo servirilizer for avro types and I've tried >>>>> `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them >>>>> works. The errors seems to be that the generated code does not compile >>>>> with >>>>> janino. >>>>> >>>>> Tested in 1.6.1 and the 2.0.0-preview. Any idea? >>>>> >>>>> -- >>>>> *JU Han* >>>>> >>>>> Software Engineer @ Teads.tv >>>>> >>>>> +33 0619608888 >>>>> >>>> >>>> >>> >>> >>> -- >>> *JU Han* >>> >>> Software Engineer @ Teads.tv >>> >>> +33 0619608888 >>> >> > > > -- > *JU Han* > > Software Engineer @ Teads.tv > > +33 0619608888 >