[jira] [Created] (SPARK-13605) Bean encoder cannot handle nonbean properties - no way to Encode nonbean Java objects with columns
Steven Lewis created SPARK-13605: Summary: Bean encoder cannot handle nonbean properties - no way to Encode nonbean Java objects with columns Key: SPARK-13605 URL: https://issues.apache.org/jira/browse/SPARK-13605 Project: Spark Issue Type: New Feature Components: Java API Affects Versions: 1.6.0 Environment: Any Reporter: Steven Lewis Fix For: 1.6.0 in the current environment the only way to turn a List or JavaRDD into a DataSet with columns is to use a Encoders.bean(MyBean.class); The current implementation fails if a Bean property is not a basic type or a Bean. I would like to see one of the following 1) Default to JavaSerialization for any Java Object implementing Serializable when using bean Encoder 2) Allow an encoder which is a Mapand look up entries in encoding classes - an ideal implementation would look for the class then any interfaces and then search base classes The following code illustrates the issue /** * This class is a good Java bean but one field holds an object * which is not a bean */ public class MyBean implements Serializable { private int m_count; private String m_Name; private MyUnBean m_UnBean; public MyBean(int count, String name, MyUnBean unBean) { m_count = count; m_Name = name; m_UnBean = unBean; } public int getCount() {return m_count; } public void setCount(int count) {m_count = count;} public String getName() {return m_Name;} public void setName(String name) {m_Name = name;} public MyUnBean getUnBean() {return m_UnBean;} public void setUnBean(MyUnBean unBean) {m_UnBean = unBean;} } /** * This is a Java object which is not a bean * no getters or setters but is serializable */ public class MyUnBean implements Serializable { public final int count; public final String name; public MyUnBean(int count, String name) { this.count = count; this.name = name; } } ** * This code creates a list of objects containing MyBean - * a Java Bean containing one field which is not bean * It then attempts and fails to use a bean encoder * to make a DataSet */ public class DatasetTest { public static final Random RND = new Random(); public static final int LIST_SIZE = 100; public static String makeName() { return Integer.toString(RND.nextInt()); } public static MyUnBean makeUnBean() { return new MyUnBean(RND.nextInt(), makeName()); } public static MyBean makeBean() { return new MyBean(RND.nextInt(), makeName(), makeUnBean()); } /** * Make a list of MyBeans * @return */ public static List makeBeanList() { List holder = new ArrayList(); for (int i = 0; i < LIST_SIZE; i++) { holder.add(makeBean()); } return holder; } public static SQLContext getSqlContext() { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("BeanTest") ; Option option = sparkConf.getOption("spark.master"); if (!option.isDefined())// use local over nothing sparkConf.setMaster("local[*]"); JavaSparkContext ctx = new JavaSparkContext(sparkConf) ; return new SQLContext(ctx); } public static void main(String[] args) { SQLContext sqlContext = getSqlContext(); Encoder evidence = Encoders.bean(MyBean.class); Encoder evidence2 = Encoders.javaSerialization(MyUnBean.class); List holder = makeBeanList(); // fails at this line with // Exception in thread "main" java.lang.UnsupportedOperationException: no encoder found for com.lordjoe.testing.MyUnBean Dataset beanSet = sqlContext.createDataset( holder, evidence); long count = beanSet.count(); if(count != LIST_SIZE) throw new IllegalStateException("bad count"); } } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3637) NPE in ShuffleMapTask
[ https://issues.apache.org/jira/browse/SPARK-3637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14162600#comment-14162600 ] Steven Lewis commented on SPARK-3637: - I see the same thing running a Java Map reduce program locally and it is a blocking issue for my development especially since I have no clue as to how to address it NPE in ShuffleMapTask - Key: SPARK-3637 URL: https://issues.apache.org/jira/browse/SPARK-3637 Project: Spark Issue Type: Bug Affects Versions: 1.1.0 Reporter: Przemyslaw Pastuszka When trying to execute spark.jobserver.WordCountExample using spark-jobserver (https://github.com/ooyala/spark-jobserver) we observed that often it fails with NullPointerException in ShuffleMapTask.scala. Here are full details: {code} Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, hadoop-simple-768-worker-with-zookeeper-0): java.lang.NullPointerException: \njava.nio.ByteBuffer.wrap(ByteBuffer.java:392)\n org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)\n org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)\n org.apache.spark.scheduler.Task.run(Task.scala:54)\n org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199)\n java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n java.lang.Thread.run(Thread.java:745)\nDriver stacktrace:, errorClass: org.apache.spark.SparkException, stack: [org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1153), org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1142), org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1141), scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59), scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47), org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1141), org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:682), org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:682), scala.Option.foreach(Option.scala:236), org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:682), org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1359), akka.actor.ActorCell.receiveMessage(ActorCell.scala:498), akka.actor.ActorCell.invoke(ActorCell.scala:456), akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237), akka.dispatch.Mailbox.run(Mailbox.scala:219), akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386), scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260), scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339), scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979), scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} I am aware, that this failure may be due to the job being ill-defined by spark-jobserver (I don't know if that's the case), but if so, then it should be handled more gratefully on spark side. What's also important, that this issue doesn't happen always, which may indicate some type of race condition in the code. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org