[ https://issues.apache.org/jira/browse/SPARK-19711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luis Felipe Sant Ana updated SPARK-19711: ----------------------------------------- Attachment: mv_demand_20170221.csv > Bug in gapply function > ---------------------- > > Key: SPARK-19711 > URL: https://issues.apache.org/jira/browse/SPARK-19711 > Project: Spark > Issue Type: Bug > Components: SparkR > Affects Versions: 2.1.0 > Environment: Using Databricks plataform. > Reporter: Luis Felipe Sant Ana > Attachments: mv_demand_20170221.csv, resume.R > > > I have a dataframe in SparkR like > CNPJ PID DATA N > 1 10140281000131 10000000000021 2015-04-23 1 > 2 10140281000131 10000000000021 2015-04-27 1 > 3 10140281000131 10000000000021 2015-04-02 1 > 4 10140281000131 10000000000021 2015-11-10 1 > 5 10140281000131 10000000000021 2016-11-14 1 > 6 10140281000131 10000000000021 2015-04-03 1 > And, I want to group by columns CNPJ and PID using gapply() function, filling > in the column DATA with date. Then I fill in the missing dates with zeros. > The code: > schema <- structType(structField("CNPJ", "string"), > structField("PID", "string"), > structField("DATA", "date"), > structField("N", "double")) > result <- gapply( > ds_filtered, > c("CNPJ", "PID"), > function(key, x) { > dts <- data.frame(key, DATA = seq(min(as.Date(x$DATA)), as.Date(e_date), > "days")) > colnames(dts)[c(1, 2)] <- c("CNPJ", "PID") > > y <- data.frame(key, DATA = as.Date(x$DATA), N = x$N) > colnames(y)[c(1, 2)] <- c("CNPJ", "PID") > > y <- dplyr::left_join(dts, > y, > by = c("CNPJ", "PID", "DATA")) > > y[is.na(y$N), 4] <- 0 > > data.frame(CNPJ = as.character(y$CNPJ), > PID = as.character(y$PID), > DATA = y$DATA, > N = y$N) > }, > schema) > Error: > Error in handleErrors(returnStatus, conn) : > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 92.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 92.0 (TID 7032, 10.93.243.111, executor 0): org.apache.spark.SparkException: > R computation failed with > Error in writeType(con, serdeType) : > Unsupported type for serialization factor > Calls: outputResult ... serializeRow -> writeList -> writeObject -> writeType > Execution halted > at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) > at > org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:404) > at > org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:386) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2784) > at > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2354) > at > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2354) > at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:2768) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2767) > at org.apache.spark.sql.Dataset.collect(Dataset.scala:2354) > at org.apache.spark.sql.api.r.SQLUtils$.dfToCols(SQLUtils.scala:208) > at org.apache.spark.sql.api.r.SQLUtils.dfToCols(SQLUtils.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:167) > at > org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:108) > at > org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:40) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) > at > io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.SparkException: R computation failed with > Error in writeType(con, serdeType) : > Unsupported type for serialization factor > Calls: outputResult ... serializeRow -> writeList -> writeObject -> writeType > Execution halted > at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) > at > org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:404) > at > org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:386) > at org.apache.spark.rdd.RDD$$an > With gapplyCollect() function this work. > Thank you! > -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org