[ https://issues.apache.org/jira/browse/SPARK-23739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413530#comment-16413530 ]
Florencio commented on SPARK-23739: ----------------------------------- Thanks Cody. The kafka version is version=0.10.0.1 and I checked the class org.apache.kafka.common.requests.LeaveGroupResponse is inside the assembly. Additionally, we check that we had installed the kafka version 0.8 in the cluster which was used by Druid, we uninstalled this kafka version and the application do not present the class not found error but I have java.lang.OutOfMemoryError: Java heap space. I attach part of the error: _18/03/23 15:20:08 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again._ _18/03/23 15:20:12 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM_ _18/03/23 15:20:12 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again._ _18/03/23 15:20:16 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again._ _18/03/23 15:20:21 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again._ _18/03/23 15:20:25 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again._ _18/03/23 15:20:28 WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout, java.util.concurrent.TimeoutException_ _java.util.concurrent.TimeoutException_ _at java.util.concurrent.FutureTask.get(FutureTask.java:205)_ _at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67)_ _18/03/23 15:20:33 WARN TransportChannelHandler: Exception in connection from NODE/NODE_IP:33018_ _java.io.IOException: Connection reset by peer_ _at sun.nio.ch.FileDispatcherImpl.read0(Native Method)_ _at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)_ _at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)_ _at sun.nio.ch.IOUtil.read(IOUtil.java:192)_ _at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)_ _at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)_ _at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)_ _at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)_ _at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)_ _at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)_ _at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)_ _at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)_ _at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)_ _at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)_ _at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)_ _at java.lang.Thread.run(Thread.java:745)_ _18/03/23 15:20:47 ERROR TransportResponseHandler: Still have 4 requests outstanding when connection from NODE/NODE_IP:33018 is closed_ _18/03/23 15:20:47 ERROR OneForOneBlockFetcher: Failed while starting block fetches_ _java.io.IOException: Connection reset by peer_ _at sun.nio.ch.FileDispatcherImpl.read0(Native Method)_ _at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)_ _at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)_ _at sun.nio.ch.IOUtil.read(IOUtil.java:192)_ _at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)_ _at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)_ _18/03/23 15:21:02 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms_ _18/03/23 15:21:01 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again._ _18/03/23 15:21:05 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again._ _18/03/23 15:21:05 ERROR TransportRequestHandler: Error sending result RpcResponse\{requestId=5287967802569519759, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to /172.20.13.58:33820; closing connection_ _io.netty.handler.codec.EncoderException: java.lang.OutOfMemoryError: Java heap space_ _at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106)_ _at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:743)_ _at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:735)_ _at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:820)_ _at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:728)_ _at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284)_ _at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:743)_ _at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:806)_ _at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:818)_ _at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:799)_ _at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:835)_ _at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1017)_ _at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:256)_ _at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:194)_ _at org.apache.spark.network.server.TransportRequestHandler.access$000(TransportRequestHandler.java:55)_ Thanks. > Spark structured streaming long running problem > ----------------------------------------------- > > Key: SPARK-23739 > URL: https://issues.apache.org/jira/browse/SPARK-23739 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.1.0 > Reporter: Florencio > Priority: Critical > Labels: spark, streaming, structured > > I had a problem with long running spark structured streaming in spark 2.1. > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.common.requests.LeaveGroupResponse. > The detailed error is the following: > 18/03/16 16:10:57 INFO StreamExecution: Committed offsets for batch 2110. > Metadata OffsetSeqMetadata(0,1521216656590) > 18/03/16 16:10:57 INFO KafkaSource: GetBatch called with start = > Some(\{"TopicName":{"2":5520197,"1":5521045,"3":5522054,"0":5527915}}), end = > \{"TopicName":{"2":5522730,"1":5523577,"3":5524586,"0":5530441}} > 18/03/16 16:10:57 INFO KafkaSource: Partitions added: Map() > 18/03/16 16:10:57 ERROR StreamExecution: Query [id = > a233b9ff-cc39-44d3-b953-a255986c04bf, runId = > 8520e3c0-2455-4ac1-9021-8518fb58b3f8] terminated with error > java.util.zip.ZipException: invalid code lengths set > at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:164) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at java.io.FilterInputStream.read(FilterInputStream.java:107) > at > org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:354) > at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322) > at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303) > at org.apache.spark.util.Utils$.copyStream(Utils.scala:362) > at > org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:45) > at > org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:83) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:173) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2101) > at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) > at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.map(RDD.scala:369) > at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:499) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > 18/03/16 16:10:57 ERROR ClientUtils: Failed to close coordinator > java.lang.NoClassDefFoundError: > org/apache/kafka/common/requests/LeaveGroupResponse > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendLeaveGroupRequest(AbstractCoordinator.java:575) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:566) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:555) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:377) > at org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:66) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1383) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1364) > at org.apache.spark.sql.kafka010.KafkaSource.stop(KafkaSource.scala:311) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$stopSources$1.apply(StreamExecution.scala:574) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$stopSources$1.apply(StreamExecution.scala:572) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.sql.execution.streaming.StreamExecution.stopSources(StreamExecution.scala:572) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:325) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191) > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.common.requests.LeaveGroupResponse > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 15 more > 18/03/16 16:10:57 WARN StreamExecution: Failed to stop streaming source: > KafkaSource[Subscribe[TPusciteStazMinuto]]. Resources may have leaked. > org.apache.kafka.common.KafkaException: Failed to close kafka consumer > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org