Just one more question: does Dataset suppose to be able to cast data to an
avro type? For a very simple format (a string and a long), I can cast it to
a tuple or case class, but not an avro type (also contains only a string
and a long).

The error is like this for this very simple type:

=== Result of Batch Resolution ===
!'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0,
string])) null else input[0, string].toString, if (isnull(input[1,
bigint])) null else input[1, bigint],
StructField(auctionId,StringType,true), StructField(ts,LongType,true)),
auctionId#0, ts#1L) AS #2]   Project [createexternalrow(if
(isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L))
null else ts#1L, StructField(auctionId,StringType,true),
StructField(ts,LongType,true)) AS #2]
 +- LocalRelation [auctionId#0,ts#1L]


                                                  +- LocalRelation
[auctionId#0,ts#1L]

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to
map struct<auctionId:string,ts:bigint> to Tuple1, but failed as the number
of fields does not line up.
 - Input schema: struct<auctionId:string,ts:bigint>
 - Target schema: struct<value:binary>;
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org
$apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:201)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:168)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57)
at org.apache.spark.sql.Dataset.as(Dataset.scala:366)
at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35)
at Datasets$delayedInit$body.apply(Datasets.scala:23)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at Datasets$.main(Datasets.scala:23)
at Datasets.main(Datasets.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

2016-05-22 22:02 GMT+02:00 Michael Armbrust <mich...@databricks.com>:

> That's definitely a bug.  If you can come up with a small reproduction it
> would be great if you could open a JIRA.
> On May 22, 2016 12:21 PM, "Han JU" <ju.han.fe...@gmail.com> wrote:
>
>> Hi Michael,
>>
>> The error is like this under 2.0.0-preview. In 1.6.1 the error is very
>> similar if not exactly the same.
>> The file is a parquet file containing avro objects.
>>
>> Thanks!
>>
>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
>> failed to compile: org.codehaus.commons.compiler.CompileException: File
>> 'generated.java', Line 25, Column 160: No applicable constructor/method
>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
>> candidates are: "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[], int, int)"
>> /* 001 */
>> /* 002 */ public java.lang.Object generate(Object[] references) {
>> /* 003 */   return new SpecificSafeProjection(references);
>> /* 004 */ }
>> /* 005 */
>> /* 006 */ class SpecificSafeProjection extends
>> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
>> /* 007 */
>> /* 008 */   private Object[] references;
>> /* 009 */   private MutableRow mutableRow;
>> /* 010 */   private org.apache.spark.serializer.KryoSerializerInstance
>> serializer;
>> /* 011 */
>> /* 012 */
>> /* 013 */   public SpecificSafeProjection(Object[] references) {
>> /* 014 */     this.references = references;
>> /* 015 */     mutableRow = (MutableRow) references[references.length - 1];
>> /* 016 */     serializer =
>> (org.apache.spark.serializer.KryoSerializerInstance) new
>> org.apache.spark.serializer.KryoSerializer(new
>> org.apache.spark.SparkConf()).newInstance();
>> /* 017 */   }
>> /* 018 */
>> /* 019 */   public java.lang.Object apply(java.lang.Object _i) {
>> /* 020 */     InternalRow i = (InternalRow) _i;
>> /* 021 */     /* decodeusingserializer(input[0,
>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:str...
>> */
>> /* 022 */     /* input[0,
>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:struct<id:string,seatbid:...
>> */
>> /* 023 */     boolean isNull1 = i.isNullAt(0);
>> /* 024 */     InternalRow value1 = isNull1 ? null : (i.getStruct(0, 7));
>> /* 025 */     final tv.teads.model.rtb.RtbResponseEvent value = isNull1 ?
>> null : (tv.teads.model.rtb.RtbResponseEvent)
>> serializer.deserialize(java.nio.ByteBuffer.wrap(value1), null);
>> /* 026 */     if (isNull1) {
>> /* 027 */       mutableRow.setNullAt(0);
>> /* 028 */     } else {
>> /* 029 */
>> /* 030 */       mutableRow.update(0, value);
>> /* 031 */     }
>> /* 032 */
>> /* 033 */     return mutableRow;
>> /* 034 */   }
>> /* 035 */ }
>> /* 036 */
>>
>> at
>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>> at
>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>> at
>> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>> at
>> org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>> at
>> org.spark_project.guava.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620)
>> at
>> org.spark_project.guava.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362)
>> at
>> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2349)
>> at
>> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>> at
>> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>> at
>> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:764)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:186)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:748)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:745)
>> at
>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:61)
>> at
>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:60)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2016-05-20 22:51 GMT+02:00 Michael Armbrust <mich...@databricks.com>:
>>
>>> What is the error?  I would definitely expect it to work with kryo at
>>> least.
>>>
>>>
>>> On Fri, May 20, 2016 at 2:37 AM, Han JU <ju.han.fe...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0.
>>>> However it does not seems to work with Avro data types:
>>>>
>>>>
>>>> object Datasets extends App {
>>>>   val conf = new SparkConf()
>>>>   conf.setAppName("Dataset")
>>>>   conf.setMaster("local[2]")
>>>>   conf.setIfMissing("spark.serializer", classOf[KryoSerializer].getName)
>>>>   conf.setIfMissing("spark.kryo.registrator",
>>>> classOf[DatasetKryoRegistrator].getName)
>>>>
>>>>   val sc = new SparkContext(conf)
>>>>   val sql = new SQLContext(sc)
>>>>   import sql.implicits._
>>>>
>>>>   implicit val encoder = Encoders.kryo[MyAvroType]
>>>>   val data = sql.read.parquet("path/to/data").as[MyAvroType]
>>>>
>>>>   var c = 0
>>>>   // BUG here
>>>>   val sizes = data.mapPartitions { iter =>
>>>>     List(iter.size).iterator
>>>>   }.collect().toList
>>>>
>>>>   println(c)
>>>> }
>>>>
>>>>
>>>> class DatasetKryoRegistrator extends KryoRegistrator {
>>>>   override def registerClasses(kryo: Kryo) {
>>>>     kryo.register(
>>>>       classOf[MyAvroType],
>>>> AvroSerializer.SpecificRecordBinarySerializer[MyAvroType])
>>>>   }
>>>> }
>>>>
>>>>
>>>> I'm using chill-avro's kryo servirilizer for avro types and I've tried
>>>> `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them
>>>> works. The errors seems to be that the generated code does not compile with
>>>> janino.
>>>>
>>>> Tested in 1.6.1 and the 2.0.0-preview. Any idea?
>>>>
>>>> --
>>>> *JU Han*
>>>>
>>>> Software Engineer @ Teads.tv
>>>>
>>>> +33 0619608888
>>>>
>>>
>>>
>>
>>
>> --
>> *JU Han*
>>
>> Software Engineer @ Teads.tv
>>
>> +33 0619608888
>>
>


-- 
*JU Han*

Software Engineer @ Teads.tv

+33 0619608888

Reply via email to