Just one more question: does Dataset suppose to be able to cast data to an avro type? For a very simple format (a string and a long), I can cast it to a tuple or case class, but not an avro type (also contains only a string and a long).
The error is like this for this very simple type: === Result of Batch Resolution === !'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0, string])) null else input[0, string].toString, if (isnull(input[1, bigint])) null else input[1, bigint], StructField(auctionId,StringType,true), StructField(ts,LongType,true)), auctionId#0, ts#1L) AS #2] Project [createexternalrow(if (isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L)) null else ts#1L, StructField(auctionId,StringType,true), StructField(ts,LongType,true)) AS #2] +- LocalRelation [auctionId#0,ts#1L] +- LocalRelation [auctionId#0,ts#1L] Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<auctionId:string,ts:bigint> to Tuple1, but failed as the number of fields does not line up. - Input schema: struct<auctionId:string,ts:bigint> - Target schema: struct<value:binary>; at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org $apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:201) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:168) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57) at org.apache.spark.sql.Dataset.as(Dataset.scala:366) at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35) at Datasets$delayedInit$body.apply(Datasets.scala:23) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at Datasets$.main(Datasets.scala:23) at Datasets.main(Datasets.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 2016-05-22 22:02 GMT+02:00 Michael Armbrust <mich...@databricks.com>: > That's definitely a bug. If you can come up with a small reproduction it > would be great if you could open a JIRA. > On May 22, 2016 12:21 PM, "Han JU" <ju.han.fe...@gmail.com> wrote: > >> Hi Michael, >> >> The error is like this under 2.0.0-preview. In 1.6.1 the error is very >> similar if not exactly the same. >> The file is a parquet file containing avro objects. >> >> Thanks! >> >> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: >> failed to compile: org.codehaus.commons.compiler.CompileException: File >> 'generated.java', Line 25, Column 160: No applicable constructor/method >> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow"; >> candidates are: "public static java.nio.ByteBuffer >> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer >> java.nio.ByteBuffer.wrap(byte[], int, int)" >> /* 001 */ >> /* 002 */ public java.lang.Object generate(Object[] references) { >> /* 003 */ return new SpecificSafeProjection(references); >> /* 004 */ } >> /* 005 */ >> /* 006 */ class SpecificSafeProjection extends >> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection { >> /* 007 */ >> /* 008 */ private Object[] references; >> /* 009 */ private MutableRow mutableRow; >> /* 010 */ private org.apache.spark.serializer.KryoSerializerInstance >> serializer; >> /* 011 */ >> /* 012 */ >> /* 013 */ public SpecificSafeProjection(Object[] references) { >> /* 014 */ this.references = references; >> /* 015 */ mutableRow = (MutableRow) references[references.length - 1]; >> /* 016 */ serializer = >> (org.apache.spark.serializer.KryoSerializerInstance) new >> org.apache.spark.serializer.KryoSerializer(new >> org.apache.spark.SparkConf()).newInstance(); >> /* 017 */ } >> /* 018 */ >> /* 019 */ public java.lang.Object apply(java.lang.Object _i) { >> /* 020 */ InternalRow i = (InternalRow) _i; >> /* 021 */ /* decodeusingserializer(input[0, >> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:str... >> */ >> /* 022 */ /* input[0, >> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:struct<id:string,seatbid:... >> */ >> /* 023 */ boolean isNull1 = i.isNullAt(0); >> /* 024 */ InternalRow value1 = isNull1 ? null : (i.getStruct(0, 7)); >> /* 025 */ final tv.teads.model.rtb.RtbResponseEvent value = isNull1 ? >> null : (tv.teads.model.rtb.RtbResponseEvent) >> serializer.deserialize(java.nio.ByteBuffer.wrap(value1), null); >> /* 026 */ if (isNull1) { >> /* 027 */ mutableRow.setNullAt(0); >> /* 028 */ } else { >> /* 029 */ >> /* 030 */ mutableRow.update(0, value); >> /* 031 */ } >> /* 032 */ >> /* 033 */ return mutableRow; >> /* 034 */ } >> /* 035 */ } >> /* 036 */ >> >> at >> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) >> at >> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) >> at >> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) >> at >> org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) >> at >> org.spark_project.guava.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620) >> at >> org.spark_project.guava.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362) >> at >> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2349) >> at >> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) >> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) >> at >> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) >> at >> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) >> at >> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:764) >> at >> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:186) >> at >> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36) >> at >> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:748) >> at >> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:745) >> at >> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:61) >> at >> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:60) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) >> at org.apache.spark.scheduler.Task.run(Task.scala:85) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> 2016-05-20 22:51 GMT+02:00 Michael Armbrust <mich...@databricks.com>: >> >>> What is the error? I would definitely expect it to work with kryo at >>> least. >>> >>> >>> On Fri, May 20, 2016 at 2:37 AM, Han JU <ju.han.fe...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. >>>> However it does not seems to work with Avro data types: >>>> >>>> >>>> object Datasets extends App { >>>> val conf = new SparkConf() >>>> conf.setAppName("Dataset") >>>> conf.setMaster("local[2]") >>>> conf.setIfMissing("spark.serializer", classOf[KryoSerializer].getName) >>>> conf.setIfMissing("spark.kryo.registrator", >>>> classOf[DatasetKryoRegistrator].getName) >>>> >>>> val sc = new SparkContext(conf) >>>> val sql = new SQLContext(sc) >>>> import sql.implicits._ >>>> >>>> implicit val encoder = Encoders.kryo[MyAvroType] >>>> val data = sql.read.parquet("path/to/data").as[MyAvroType] >>>> >>>> var c = 0 >>>> // BUG here >>>> val sizes = data.mapPartitions { iter => >>>> List(iter.size).iterator >>>> }.collect().toList >>>> >>>> println(c) >>>> } >>>> >>>> >>>> class DatasetKryoRegistrator extends KryoRegistrator { >>>> override def registerClasses(kryo: Kryo) { >>>> kryo.register( >>>> classOf[MyAvroType], >>>> AvroSerializer.SpecificRecordBinarySerializer[MyAvroType]) >>>> } >>>> } >>>> >>>> >>>> I'm using chill-avro's kryo servirilizer for avro types and I've tried >>>> `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them >>>> works. The errors seems to be that the generated code does not compile with >>>> janino. >>>> >>>> Tested in 1.6.1 and the 2.0.0-preview. Any idea? >>>> >>>> -- >>>> *JU Han* >>>> >>>> Software Engineer @ Teads.tv >>>> >>>> +33 0619608888 >>>> >>> >>> >> >> >> -- >> *JU Han* >> >> Software Engineer @ Teads.tv >> >> +33 0619608888 >> > -- *JU Han* Software Engineer @ Teads.tv +33 0619608888