Ebru, Javier, Flavio:

I tried to reproduce memory leak by submitting a job, that was generating 
classes with random names. And indeed I have found one. Memory was accumulating 
in `char[]` instances that belonged to `java.lang.ClassLoader#parallelLockMap`. 
OldGen memory pool was growing in size up to the point I got:

java.lang.OutOfMemoryError: Java heap space

This seems like an old known “feature” of JDK:
https://bugs.openjdk.java.net/browse/JDK-8037342 
<https://bugs.openjdk.java.net/browse/JDK-8037342>

Can any of you confirm that this is the issue that you are experiencing? If 
not, I would really need more help/information from you to track this down.

Piotrek

> On 10 Nov 2017, at 15:12, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> On 2017-11-10 13:14, Piotr Nowojski wrote:
>> jobmanager1.log and taskmanager2.log are the same. Can you also submit
>> files containing std output?
>> Piotrek
>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-10 11:04, Piotr Nowojski wrote:
>>>> Hi,
>>>> Thanks for the logs, however I do not see before mentioned exceptions
>>>> in it. It ends with java.lang.InterruptedException
>>>> Is it the correct log file? Also, could you attach the std output file
>>>> of the failing TaskManager?
>>>> Piotrek
>>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>> On 2017-11-09 20:08, Piotr Nowojski wrote:
>>>>>> Hi,
>>>>>> Could you attach full logs from those task managers? At first glance I
>>>>>> don’t see a connection between those exceptions and any memory issue
>>>>>> that you might had. It looks like a dependency issue in one (some?
>>>>>> All?) of your jobs.
>>>>>> Did you build your jars with -Pbuild-jar profile as described here:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>>>>>> ?
>>>>>> If that doesn’t help. Can you binary search which job is causing the
>>>>>> problem? There might be some Flink incompatibility between different
>>>>>> versions and rebuilding a job’s jar with a version matching to the
>>>>>> cluster version might help.
>>>>>> Piotrek
>>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>>>>>>> Btw, Ebru:
>>>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your
>>>>>>> screenshots it’s memory consumption was reasonable and stable:
>>>>>>> 596MB
>>>>>>> -> 602MB -> 597MB.
>>>>>>> PoolThreadCache memory usage ~120MB is also reasonable.
>>>>>>> Do you experience any problems, like Out Of Memory
>>>>>>> errors/crashes/long
>>>>>>> GC pauses? Or just JVM process is using more memory over time? You
>>>>>>> are
>>>>>>> aware that JVM doesn’t like to release memory back to OS once it
>>>>>>> was
>>>>>>> used? So increasing memory usage until hitting some limit (for
>>>>>>> example
>>>>>>> JVM max heap size) is expected behaviour.
>>>>>>> Piotrek
>>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com>
>>>>>>> wrote:
>>>>>>> I don’t know if this is relevant to this issue, but I was
>>>>>>> constantly getting failures trying to reproduce this leak using your
>>>>>>> Job, because you were using non deterministic getKey function:
>>>>>>> @Override
>>>>>>> public Integer getKey(Integer event) {
>>>>>>> Random randomGen = new Random((new Date()).getTime());
>>>>>>> return randomGen.nextInt() % 8;
>>>>>>> }
>>>>>>> And quoting Java doc of KeySelector:
>>>>>>> "If invoked multiple times on the same object, the returned key must
>>>>>>> be the same.”
>>>>>>> I’m trying to reproduce this issue with following job:
>>>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
>>>>>>> Where IntegerSource is just an infinite source, DisardingSink is
>>>>>>> well just discarding incoming data. I’m cancelling the job every 5
>>>>>>> seconds and so far (after ~15 minutes) my memory consumption is
>>>>>>> stable, well below maximum java heap size.
>>>>>>> Piotrek
>>>>>>> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de>
>>>>>>> wrote:
>>>>>>> Yes, I tested with just printing the stream. But it could take a
>>>>>>> lot of time to fail.
>>>>>>> On Wednesday, 8 November 2017, Piotr Nowojski
>>>>>>> <pi...@data-artisans.com> wrote:
>>>>>>> Thanks for quick answer.
>>>>>>> So it will also fail after some time with `fromElements` source
>>>>>>> instead of Kafka, right?
>>>>>>> Did you try it also without a Kafka producer?
>>>>>>> Piotrek
>>>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de>
>>>>>>> wrote:
>>>>>>> Hi,
>>>>>>> You don't need data. With data it will die faster. I tested as
>>>>>>> well with a small data set, using the fromElements source, but it
>>>>>>> will take some time to die. It's better with some data.
>>>>>>> On 8 November 2017 at 14:54, Piotr Nowojski
>>>>>>> <pi...@data-artisans.com> wrote:
>>>>>>> Hi,
>>>>>>> Thanks for sharing this job.
>>>>>>> Do I need to feed some data to the Kafka to reproduce this
>>>>>> issue with your script?
>>>>>>>> Does this OOM issue also happen when you are not using the
>>>>>> Kafka source/sink?
>>>>>>>> Piotrek
>>>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de>
>>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>> This is the test flink job we created to trigger this leak
>>>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>>>>>>>> And this is the python script we are using to execute the job
>>>>>> thousands of times to get the OOM problem
>>>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>>>>>>>> The cluster we used for this has this configuration:
>>>>>>>> Instance type: t2.large
>>>>>>>> Number of workers: 2
>>>>>>>> HeapMemory: 5500
>>>>>>>> Number of task slots per node: 4
>>>>>>>> TaskMangMemFraction: 0.5
>>>>>>>> NumberOfNetworkBuffers: 2000
>>>>>>>> We have tried several things, increasing the heap, reducing the
>>>>>> heap, more memory fraction, changes this value in the
>>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
>>>>>> work.
>>>>>>>> Thanks for your help.
>>>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>>>>>> Hi Ebru and Javier,
>>>>>>> Yes, if you could share this example job it would be helpful.
>>>>>>> Ebru: could you explain in a little more details how does
>>>>>> your Job(s)
>>>>>>> look like? Could you post some code? If you are just using
>>>>>> maps and
>>>>>>> filters there shouldn’t be any network transfers involved,
>>>>>> aside
>>>>>>> from Source and Sink functions.
>>>>>>> Piotrek
>>>>>>> On 8 Nov 2017, at 12:54, ebru
>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> Hi Javier,
>>>>>>> It would be helpful if you share your test job with us.
>>>>>>> Which configurations did you try?
>>>>>>> -Ebru
>>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez
>>>>>> <javier.lo...@zalando.de>
>>>>>>> wrote:
>>>>>>> Hi,
>>>>>>> We have been facing a similar problem. We have tried some
>>>>>> different
>>>>>>> configurations, as proposed in other email thread by Flavio
>>>>>> and
>>>>>>> Kien, but it didn't work. We have a workaround similar to
>>>>>> the one
>>>>>>> that Flavio has, we restart the taskmanagers once they reach
>>>>>> a
>>>>>>> memory threshold. We created a small test to remove all of
>>>>>> our
>>>>>>> dependencies and leave only flink native libraries. This
>>>>>> test reads
>>>>>>> data from a Kafka topic and writes it back to another topic
>>>>>> in
>>>>>>> Kafka. We cancel the job and start another every 5 seconds.
>>>>>> After
>>>>>>> ~30 minutes of doing this process, the cluster reaches the
>>>>>> OS memory
>>>>>>> limit and dies.
>>>>>>> Currently, we have a test cluster with 8 workers and 8 task
>>>>>> slots
>>>>>>> per node. We have one job that uses 56 slots, and we cannot
>>>>>> execute
>>>>>>> that job 5 times in a row because the whole cluster dies. If
>>>>>> you
>>>>>>> want, we can publish our test job.
>>>>>>> Regards,
>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek
>>>>>> <aljos...@apache.org>
>>>>>>> wrote:
>>>>>>> @Nico & @Piotr Could you please have a look at this? You
>>>>>> both
>>>>>>> recently worked on the network stack and might be most
>>>>>> familiar with
>>>>>>> this.
>>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>>>>>> <pomperma...@okkam.it>
>>>>>>> wrote:
>>>>>>> We also have the same problem in production. At the moment
>>>>>> the
>>>>>>> solution is to restart the entire Flink cluster after every
>>>>>> job..
>>>>>>> We've tried to reproduce this problem with a test (see
>>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
>>>>>> don't
>>>>>>> know whether the error produced by the test and the leak are
>>>>>>> correlated..
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA
>>>>>> EBRU
>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>>>>>> Do you use any windowing? If yes, could you please share
>>>>>> that code?
>>>>>>> If
>>>>>>> there is no stateful operation at all, it's strange where
>>>>>> the list
>>>>>>> state instances are coming from.
>>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru
>>>>>> <b20926...@cs.hacettepe.edu.tr>
>>>>>>> wrote:
>>>>>>> Hi Ufuk,
>>>>>>> We don’t explicitly define any state descriptor. We only
>>>>>> use map
>>>>>>> and filters
>>>>>>> operator. We thought that gc handle clearing the flink’s
>>>>>> internal
>>>>>>> states.
>>>>>>> So how can we manage the memory if it is always increasing?
>>>>>>> - Ebru
>>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>>>>> Hey Ebru, the memory usage might be increasing as long as a
>>>>>> job is
>>>>>>> running.
>>>>>>> This is expected (also in the case of multiple running
>>>>>> jobs). The
>>>>>>> screenshots are not helpful in that regard. :-(
>>>>>>> What kind of stateful operations are you using? Depending on
>>>>>> your
>>>>>>> use case,
>>>>>>> you have to manually call `clear()` on the state instance in
>>>>>> order
>>>>>>> to
>>>>>>> release the managed state.
>>>>>>> Best,
>>>>>>> Ufuk
>>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> Begin forwarded message:
>>>>>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>>>>>> Subject: Re: Flink memory leak
>>>>>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>>>>>> To: Ufuk Celebi <u...@apache.org>
>>>>>>> Hi Ufuk,
>>>>>>> There are there snapshots of htop output.
>>>>>>> 1. snapshot is initial state.
>>>>>>> 2. snapshot is after submitted one job.
>>>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And
>>>>>> the
>>>>>>> memory
>>>>>>> usage is always increasing over time.
>>>>>>> <1.png><2.png><3.png>
>>>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>>>>>> Hey Ebru,
>>>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's
>>>>>> causing
>>>>>>> this.
>>>>>>> Since multiple jobs are running, it will be hard to
>>>>>> understand to
>>>>>>> which job the state descriptors from the heap snapshot
>>>>>> belong to.
>>>>>>> - Is it possible to isolate the problem and reproduce the
>>>>>> behaviour
>>>>>>> with only a single job?
>>>>>>> – Ufuk
>>>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU
>>>>>> ÇETİNKAYA EBRU
>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> Hi,
>>>>>>> We are using Flink 1.3.1 in production, we have one job
>>>>>> manager and
>>>>>>> 3 task
>>>>>>> managers in standalone mode. Recently, we've noticed that we
>>>>>> have
>>>>>>> memory
>>>>>>> related problems. We use docker container to serve Flink
>>>>>> cluster. We
>>>>>>> have
>>>>>>> 300 slots and 20 jobs are running with parallelism of 10.
>>>>>> Also the
>>>>>>> job
>>>>>>> count
>>>>>>> may be change over time. Taskmanager memory usage always
>>>>>> increases.
>>>>>>> After
>>>>>>> job cancelation this memory usage doesn't decrease. We've
>>>>>> tried to
>>>>>>> investigate the problem and we've got the task manager jvm
>>>>>> heap
>>>>>>> snapshot.
>>>>>>> According to the jam heap analysis, possible memory leak was
>>>>>> Flink
>>>>>>> list
>>>>>>> state descriptor. But we are not sure that is the cause of
>>>>>> our
>>>>>>> memory
>>>>>>> problem. How can we solve the problem?
>>>>>>> We have two types of Flink job. One has no state full
>>>>>> operator
>>>>>>> contains only maps and filters and the other has time window
>>>>>> with
>>>>>>> count trigger.
>>>>>>> * We've analysed the jvm heaps again in different
>>>>>> conditions. First
>>>>>>> we analysed the snapshot when no flink jobs running on
>>>>>> cluster. (image
>>>>>>> 1)
>>>>>>> * Then, we analysed the jvm heap snapshot when the flink job
>>>>>> that has
>>>>>>> no state full operator is running. And according to the
>>>>>> results, leak
>>>>>>> suspect was NetworkBufferPool (image 2)
>>>>>>> *   Last analys, there were both two types of jobs running
>>>>>> and leak
>>>>>>> suspect was again NetworkBufferPool. (image 3)
>>>>>>> In our system jobs are regularly cancelled and resubmitted so
>>>>>> we
>>>>>>> noticed that when job is submitted some amount of memory
>>>>>> allocated and
>>>>>>> after cancelation this allocated memory never freed. So over
>>>>>> time
>>>>>>> memory usage is always increasing and exceeded the limits.
>>>>>> Links:
>>>>>> ------
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845
>>>>>> Hi Piotr,
>>>>>> There are two types of jobs.
>>>>>> In first, we use Kafka source and Kafka sink, there isn't any
>>>>>> window operator.
>>>>>>> In second job, we use Kafka source, filesystem sink and
>>>>>> elastic search sink and window operator for buffering.
>>>>>> Hi Piotrek,
>>>>>> Thanks for your reply.
>>>>>> We've tested our link cluster again. We have 360 slots, and our
>>>>>> cluster configuration is like this;
>>>>>> jobmanager.rpc.address: %JOBMANAGER%
>>>>>> jobmanager.rpc.port: 6123
>>>>>> jobmanager.heap.mb: 1536
>>>>>> taskmanager.heap.mb: 1536
>>>>>> taskmanager.numberOfTaskSlots: 120
>>>>>> taskmanager.memory.preallocate: false
>>>>>> parallelism.default: 1
>>>>>> jobmanager.web.port: 8081
>>>>>> state.backend: filesystem
>>>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR%
>>>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR%
>>>>>> taskmanager.network.numberOfBuffers: 5000
>>>>>> We are using docker based Flink cluster.
>>>>>> WE submitted 36 jobs with the parallelism of 10. After all slots
>>>>>> became full. Memory usage were increasing by the time and one by one
>>>>>> task managers start to die. And the exception was like this;
>>>>>> Taskmanager1 log:
>>>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17]
>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
>>>>>> ActorSystem[flink]
>>>>>> java.lang.NoClassDefFoundError:
>>>>>> org/apache/kafka/common/metrics/stats/Rate$1
>>>>>>   at
>>>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93)
>>>>>>   at
>>>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62)
>>>>>>   at
>>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>>>>>>   at
>>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>>>>>>   at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
>>>>>>   at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
>>>>>>   at
>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213)
>>>>>>   at
>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50)
>>>>>>   at
>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
>>>>>>   at
>>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>>>>>>   at
>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>>>>>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>>>>>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>>>>>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>>>>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>>>   at
>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>>>>>   at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>   at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>   at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>   at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>> org.apache.kafka.common.metrics.stats.Rate$1
>>>>>>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>   ... 22 more
>>>>>> Taskmanager2 log:
>>>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17]
>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
>>>>>> ActorSystem[flink]
>>>>>> Java.lang.NoClassDefFoundError:
>>>>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1
>>>>>>   at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492)
>>>>>>   at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480)
>>>>>>   at
>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213)
>>>>>>   at
>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50)
>>>>>>   at
>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
>>>>>>   at
>>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>>>>>>   at
>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>>>>>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>>>>>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>>>>>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>>>>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>>>   at
>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>>>>>   at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>   at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>   at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>   at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1
>>>>>>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>   ... 18 more
>>>>>> -Ebru
>>>>> Hi Piotrek,
>>>>> We attached the full log of the taskmanager1.
>>>>> This may not be a dependency issue because until all of the task slots is 
>>>>> full, we didn't get any No Class Def Found exception, when there is 
>>>>> available memory jobs can run without exception for days.
>>>>> Also there is Kafka Instance Already Exist exception in full log, but 
>>>>> this not relevant and doesn't effect jobs or task managers.
>>>>> -Ebru<taskmanager1.log.zip>
>>> Hi,
>>> Sorry we attached wrong log file. I've attached all task managers and job 
>>> manager's log. All task managers and job manager was killed.<logs.zip>
> <logs2-2.zip>

Reply via email to