[ 
https://issues.apache.org/jira/browse/SPARK-23739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413530#comment-16413530
 ] 

Florencio commented on SPARK-23739:
-----------------------------------

Thanks Cody. The kafka version is version=0.10.0.1 and I checked the class  
org.apache.kafka.common.requests.LeaveGroupResponse is inside the assembly. 
Additionally,  we check that we had installed the kafka version 0.8 in the 
cluster which was used by Druid, we uninstalled this kafka version and the 
application do not present the class not found error but I have 
java.lang.OutOfMemoryError: Java heap space.

I attach part of the error:

 

_18/03/23 15:20:08 WARN TaskMemoryManager: Failed to allocate a page (8388608 
bytes), try again._

_18/03/23 15:20:12 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM_
_18/03/23 15:20:12 WARN TaskMemoryManager: Failed to allocate a page (8388608 
bytes), try again._
_18/03/23 15:20:16 WARN TaskMemoryManager: Failed to allocate a page (8388608 
bytes), try again._
_18/03/23 15:20:21 WARN TaskMemoryManager: Failed to allocate a page (8388608 
bytes), try again._
_18/03/23 15:20:25 WARN TaskMemoryManager: Failed to allocate a page (8388608 
bytes), try again._
_18/03/23 15:20:28 WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout, 
java.util.concurrent.TimeoutException_
_java.util.concurrent.TimeoutException_
 _at java.util.concurrent.FutureTask.get(FutureTask.java:205)_
 _at 
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67)_
_18/03/23 15:20:33 WARN TransportChannelHandler: Exception in connection from 
NODE/NODE_IP:33018_
_java.io.IOException: Connection reset by peer_
 _at sun.nio.ch.FileDispatcherImpl.read0(Native Method)_
 _at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)_
 _at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)_
 _at sun.nio.ch.IOUtil.read(IOUtil.java:192)_
 _at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)_
 _at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)_
 _at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)_
 _at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)_
 _at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)_
 _at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)_
 _at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)_
 _at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)_
 _at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)_
 _at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)_
 _at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)_
 _at java.lang.Thread.run(Thread.java:745)_
_18/03/23 15:20:47 ERROR TransportResponseHandler: Still have 4 requests 
outstanding when connection from NODE/NODE_IP:33018 is closed_
_18/03/23 15:20:47 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches_
_java.io.IOException: Connection reset by peer_
 _at sun.nio.ch.FileDispatcherImpl.read0(Native Method)_
 _at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)_
 _at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)_
 _at sun.nio.ch.IOUtil.read(IOUtil.java:192)_
 _at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)_
 _at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)_

 

_18/03/23 15:21:02 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 
outstanding blocks after 5000 ms_
_18/03/23 15:21:01 WARN TaskMemoryManager: Failed to allocate a page (8388608 
bytes), try again._
_18/03/23 15:21:05 WARN TaskMemoryManager: Failed to allocate a page (8388608 
bytes), try again._
_18/03/23 15:21:05 ERROR TransportRequestHandler: Error sending result 
RpcResponse\{requestId=5287967802569519759, 
body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13]}} to 
/172.20.13.58:33820; closing connection_
_io.netty.handler.codec.EncoderException: java.lang.OutOfMemoryError: Java heap 
space_
 _at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106)_
 _at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:743)_
 _at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:735)_
 _at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:820)_
 _at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:728)_
 _at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284)_
 _at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:743)_
 _at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:806)_
 _at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:818)_
 _at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:799)_
 _at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:835)_
 _at 
io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1017)_
 _at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:256)_
 _at 
org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:194)_
 _at 
org.apache.spark.network.server.TransportRequestHandler.access$000(TransportRequestHandler.java:55)_

 

Thanks.

  

> Spark structured streaming long running problem
> -----------------------------------------------
>
>                 Key: SPARK-23739
>                 URL: https://issues.apache.org/jira/browse/SPARK-23739
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Florencio
>            Priority: Critical
>              Labels: spark, streaming, structured
>
> I had a problem with long running spark structured streaming in spark 2.1. 
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.requests.LeaveGroupResponse.
> The detailed error is the following:
> 18/03/16 16:10:57 INFO StreamExecution: Committed offsets for batch 2110. 
> Metadata OffsetSeqMetadata(0,1521216656590)
> 18/03/16 16:10:57 INFO KafkaSource: GetBatch called with start = 
> Some(\{"TopicName":{"2":5520197,"1":5521045,"3":5522054,"0":5527915}}), end = 
> \{"TopicName":{"2":5522730,"1":5523577,"3":5524586,"0":5530441}}
> 18/03/16 16:10:57 INFO KafkaSource: Partitions added: Map()
> 18/03/16 16:10:57 ERROR StreamExecution: Query [id = 
> a233b9ff-cc39-44d3-b953-a255986c04bf, runId = 
> 8520e3c0-2455-4ac1-9021-8518fb58b3f8] terminated with error
> java.util.zip.ZipException: invalid code lengths set
>  at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:164)
>  at java.io.FilterInputStream.read(FilterInputStream.java:133)
>  at java.io.FilterInputStream.read(FilterInputStream.java:107)
>  at 
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:354)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>  at org.apache.spark.util.Utils$.copyStream(Utils.scala:362)
>  at 
> org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:45)
>  at 
> org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:83)
>  at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:173)
>  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>  at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
>  at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:503)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:499)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 18/03/16 16:10:57 ERROR ClientUtils: Failed to close coordinator
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/requests/LeaveGroupResponse
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendLeaveGroupRequest(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:566)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:555)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:377)
>  at org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:66)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1383)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1364)
>  at org.apache.spark.sql.kafka010.KafkaSource.stop(KafkaSource.scala:311)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$stopSources$1.apply(StreamExecution.scala:574)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$stopSources$1.apply(StreamExecution.scala:572)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.stopSources(StreamExecution.scala:572)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:325)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.requests.LeaveGroupResponse
>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>  ... 15 more
> 18/03/16 16:10:57 WARN StreamExecution: Failed to stop streaming source: 
> KafkaSource[Subscribe[TPusciteStazMinuto]]. Resources may have leaked.
> org.apache.kafka.common.KafkaException: Failed to close kafka consumer
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to