Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Fabian Wollert
*@Fabian do you register any types / serializers via
ExecutionConfig.registerKryoType(...) /
ExecutionConfig.registerTypeWithKryoSerializer(...)?*

Nope, not at all. our flink job code has nowhere the word "Kryo" at all.

thx for looking into it ...

--


*Fabian WollertZalando SE*

E-Mail: fab...@zalando.de

Am Do., 4. Juli 2019 um 11:51 Uhr schrieb Tzu-Li (Gordon) Tai <
tzuli...@apache.org>:

> I quickly checked the implementation of duplicate() for both the
> KryoSerializer and StreamElementSerializer (which are the only serializers
> involved here).
> They seem to be correct; especially for the KryoSerializer, since
> FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when
> duplicating it, and therefore Kryo instances should not be shared at all
> across duplicates.
> This seems to rule out any duplication issues with the serializers.
>
> As a maybe relevant question, @Fabian do you register any types /
> serializers via ExecutionConfig.registerKryoType(...) /
> ExecutionConfig.registerTypeWithKryoSerializer(...)?
>
> Best,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-8836
>
> On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert  wrote:
>
>> No, not yet. We lack some knowledge in understanding this. The only thing
>> we found out that it happens most probably in the Elasticsearch Sink,
>> because:
>> - some error messages have the sink in their stack trace.
>> - when bumping the ES nodes specs on AWS, the error happens less often
>> (we haven't bumped it to super large instances yet, nor got to a state
>> where they go away completely. also this would not be the ideal fix)
>>
>> so my current assumption is that some backpressuring is not happening
>> correctly. but this is super vaguely, any other hints or support on this is
>> highly appreciated.
>>
>> --
>>
>>
>> *Fabian WollertZalando SE*
>>
>> E-Mail: fab...@zalando.de
>>
>>
>> Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> Any news on this? Have you found the cause of the error?
>>>
>>> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> Indeed looking at StreamElementSerializer the duplicate() method could
>>>> be bugged:
>>>>
>>>> @Override
>>>> public StreamElementSerializer duplicate() {
>>>>   TypeSerializer copy = typeSerializer.duplicate();
>>>>   return (copy == typeSerializer) ? this : new
>>>> StreamElementSerializer(copy);
>>>> }
>>>>
>>>> Is ti safe to return this when copy == typeSerializer ...?
>>>>
>>>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Hi Fabian,
>>>>> we had similar errors with Flink 1.3 [1][2] and the error was caused
>>>>> by the fact that a serialised was sharing the same object with multiple
>>>>> threads.
>>>>> The error was not deterministic and was changing from time to time.
>>>>> So maybe it could be something similar (IMHO).
>>>>>
>>>>> [1] http://codeha.us/apache-flink-users/msg02010.html
>>>>> [2]
>>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert 
>>>>> wrote:
>>>>>
>>>>>> additionally we have these coming with this as well all the time:
>>>>>>
>>>>>> com.esotericsoftware.kryo.KryoException: 
>>>>>> java.lang.ArrayIndexOutOfBoundsException
>>>>>> Serialization trace:
>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>  at 
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>  at 
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.streamr

Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Fabian Wollert
No, not yet. We lack some knowledge in understanding this. The only thing
we found out that it happens most probably in the Elasticsearch Sink,
because:
- some error messages have the sink in their stack trace.
- when bumping the ES nodes specs on AWS, the error happens less often (we
haven't bumped it to super large instances yet, nor got to a state where
they go away completely. also this would not be the ideal fix)

so my current assumption is that some backpressuring is not happening
correctly. but this is super vaguely, any other hints or support on this is
highly appreciated.

--


*Fabian WollertZalando SE*

E-Mail: fab...@zalando.de


Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> Any news on this? Have you found the cause of the error?
>
> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier 
> wrote:
>
>> Indeed looking at StreamElementSerializer the duplicate() method could be
>> bugged:
>>
>> @Override
>> public StreamElementSerializer duplicate() {
>>   TypeSerializer copy = typeSerializer.duplicate();
>>   return (copy == typeSerializer) ? this : new
>> StreamElementSerializer(copy);
>> }
>>
>> Is ti safe to return this when copy == typeSerializer ...?
>>
>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier 
>> wrote:
>>
>>> Hi Fabian,
>>> we had similar errors with Flink 1.3 [1][2] and the error was caused by
>>> the fact that a serialised was sharing the same object with multiple
>>> threads.
>>> The error was not deterministic and was changing from time to time.
>>> So maybe it could be something similar (IMHO).
>>>
>>> [1] http://codeha.us/apache-flink-users/msg02010.html
>>> [2]
>>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert 
>>> wrote:
>>>
>>>> additionally we have these coming with this as well all the time:
>>>>
>>>> com.esotericsoftware.kryo.KryoException: 
>>>> java.lang.ArrayIndexOutOfBoundsException
>>>> Serialization trace:
>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>at 
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>at 
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>at 
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>>>at 
>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>>>at 
>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>>>at 
>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>at 
>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>>>at 
>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>
>>>>
>>>> or
>>>>
>>>>
>>>> com.esotericsoftware.kryo.KryoException: 
>>>> java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>>> Serialization trace:
>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>>>at 
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>at 
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>at 
>

Re: Debug Kryo.Serialization Exception

2019-06-28 Thread Fabian Wollert
additionally we have these coming with this as well all the time:

com.esotericsoftware.kryo.KryoException:
java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
_children (com.fasterxml.jackson.databind.node.ObjectNode)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


or


com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
Serialization trace:
_children (com.fasterxml.jackson.databind.node.ObjectNode)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 12 more


--


*Fabian WollertZalando SE*

E-Mail: fab...@zalando.de
Phone: +49 152 03479412



Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert :

> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a Custom
> Source and Ingesting into an Elasticsearch Cluster (V.5.6). In recent
> times, we see more and more Exceptions happening like this:
>
> com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali

Debug Kryo.Serialization Exception

2019-06-27 Thread Fabian Wollert
Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a Custom
Source and Ingesting into an Elasticsearch Cluster (V.5.6). In recent
times, we see more and more Exceptions happening like this:

com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com. ^
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)

... 13 more

or

com.esotericsoftware.kryo.KryoException: Unable to find class:
com.fasterxml.jackson.databind.node.DoubleNod
com.fasterxml.jackson.databind.node.ObjectNode
Serialization trace:
_children (com.fasterxml.jackson.databind.node.ObjectNode)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException:
com.fasterxml.jackson.databind.node.DoubleNod
com.fasterxml.jackson.databind.node.ObjectNode
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 

Re: Heap Problem with Checkpoints

2018-06-20 Thread Fabian Wollert
to that last one: i'm accessing S3 from one EC2 instance which has a IAM
Role attached ...

I'll get back to you when i have those stacktraces printed ... will have to
build the project and package the custom version first, might take some
time, and also some vacation is up next ...

Cheers


--


*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de
Phone: +49 152 03479412
Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>



Am Mi., 20. Juni 2018 um 14:14 Uhr schrieb Piotr Nowojski <
pi...@data-artisans.com>:

> Btw, side questions. Could it be, that you are accessing two different
> Hadoop file systems (two different schemas) or even the same one from two
> different users (encoded in the file system URI) within the same Flink
> JobMaster?
>
> If so, the answer might be this possible resource leak in Flink:
> https://issues.apache.org/jira/browse/FLINK-9626
>
> Piotrek
>
> On 20 Jun 2018, at 13:50, Piotr Nowojski  wrote:
>
> Hi,
>
> I was looking in this more, and I have couple of suspicions, but it’s
> still hard to tell which is correct. Could you for example place a
> breakpoint (or add a code there to print a stack trace) in
> org.apache.log4j.helpers.AppenderAttachableImpl#addAppender
> And check who is calling it? Since it seems like this method is
> responsible for the growing number of ConsoleAppenders consumption.
>
> Piotrek
>
> On 20 Jun 2018, at 09:20, Fabian Wollert  wrote:
>
> Hi Piotr, thx for the hints. I checked the logs of this stack where the
> previous Heap Dump was from, there are no error messages from the
> BlobServer, it seems to work properly.
>
> But I found another issue in my setup, I had the logging not set up
> properly, so everything was logging in the default console appender. I
> changed this now to:
>
> log4j.rootLogger=INFO, FILE
> log4j.logger.akka=INFO, FILE
> log4j.logger.org.apache.kafka=INFO, FILE
> log4j.logger.org.apache.hadoop=INFO, FILE
> log4j.logger.org.apache.zookeeper=INFO, FILE
>
> # Log all info in the given file
> log4j.appender.FILE=org.apache.log4j.RollingFileAppender
> log4j.appender.FILE.File=/opt/flink/log/flink.log
> log4j.appender.FILE.MaxFileSize=100MB
> log4j.appender.FILE.MaxBackupIndex=2
> log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
> log4j.appender.FILE.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %c:%L - %m%n
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, FILE
>
> though I have this setup now, I still see memory increases, but this time
> it seems again like my first suspicion is valid:
>
> 
>
>
> 
>
> What I'm here mostly wondering now: Why is still a ConsoleAppender used
> although I defined RollingFileAppender?
>
> Sry for the back and forth between different parts of the code. But your
> help is highly appreciated!
>
> Cheers
>
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fabian.woll...@zalando.de
>
>
> Am Di., 19. Juni 2018 um 11:55 Uhr schrieb Piotr Nowojski <
> pi...@data-artisans.com>:
>
>> Hi,
>>
>> Can you search the logs/std err/std output for log entries like:
>>
>> log.warn("Failed to locally delete blob “ …) ?
>>
>> I see in the code, that if file deletion fails for whatever the reason,
>> TransientBlobCleanupTask can loop indefinitely trying to remove it over and
>> over again. That might be ok, however it’s doing it without any back off
>> time as fast as possible.
>>
>> To confirm this, could you take couple of thread dumps and check whether
>> some thread is spinning
>> in org.apache.flink.runtime.blob.TransientBlobCleanupTask#run ?
>>
>> If that’s indeed a case, the question would be why file deletion fails?
>>
>> Piotrek
>>
>> On 18 Jun 2018, at 15:48, Fabian Wollert  wrote:
>>
>> Hi Piotrek, thx a lot for your answer and sry for the late response. I
>> was running some more tests, but i still got the same problem. I was
>> analyzing a heap dump already with VisualVM, and thats how i got to the
>> intention that it was some S3 logging, but seems like i was wrong. on the
>> newer tests, the heap dump says the following (this time i used Eclipse
>> MemoryAnalyzer):
>>
>> 
>> 
>> 
>> Are you aware of problems with the BlobServer not cleaning up properly? I
>> tried also using a bigger instance, but this never stabilizes, it just
>> keeps increasing (gave it already 10GB+ Heap) ...
>>
>> Cheers
>>
>> --
>>
>>
>> *Fabian WollertZalando SE*
>>
>> E-Mail: f

Heap Problem with Checkpoints

2018-06-08 Thread Fabian Wollert
Hi, in this email thread

here, i tried to set up S3 as a filesystem backend for checkpoints. Now
everything is working (Flink V1.5.0), but the JobMaster is accumulating
Heap space, with eventually killing itself with HeapSpace OOM after several
hours. If I don't enable Checkpointing, then everything is fine. I'm using
the Flink S3 Shaded Libs (tried both the Hadoop and the Presto lib, no
difference in this regard) from the tutorial. my checkpoint settings are
this (job level):

env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

Another clue why i suspect the S3 Checkpointing is that the heapspace dump
contains a lot of char[] objects with some logs about S3 operations.

anyone has an idea where to look further on this?

Cheers

--


*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de

Tamara-Danz-Straße 1
10243 Berlin
Fax: +49 (0)30 2759 46 93
E-mail: legalnot...@zalando.co.uk
Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30
2000889349

Management Board:
Robert Gentz, David Schneider, Rubin Ritter

Chairman of the Supervisory Board:
Lothar Lanz

Person responsible for providing the contents of Zalando SE acc. to Art. 55
RStV [Interstate Broadcasting Agreement]: Rubin Ritter
Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
VAT registration number: DE 260543043


Re: Flink and AWS S3 integration: java.lang.NullPointerException: null uri host

2018-06-01 Thread Fabian Wollert
i solved it by myself, with the help of some debugging. i used
s3:///mybucket/ but it needs to be s3://mybucket/some_folder ... 2 slashes,
and also a folder needs to be specified ...
--


*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de
Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>



Am Do., 31. Mai 2018 um 09:31 Uhr schrieb Fabian Wollert :

> I'm running it in docker on EC2, cant use EMR ... yes i followed those
> instructions.
>
> Cheers
>
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fabian.woll...@zalando.de
>
>
> Am Do., 31. Mai 2018 um 03:07 Uhr schrieb Bowen Li :
>
>> Did you run Flink on AWS EMR or somewhere else? Have you read and
>> followed instructions on
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#amazon-web-services-aws
>> ?
>>
>>
>>
>> On Wed, May 30, 2018 at 7:08 AM, Fabian Wollert 
>> wrote:
>>
>>> Hi, I'm trying to set up Checkpoints for Flink Jobs with S3 as a
>>> filesystem backend. I configured the following:
>>>
>>> state.backend=filesystem
>>> state.backend.fs.checkpointdir=s3:///mybucket/
>>> state.checkpoints.dir=s3:///mybucket/
>>> state.checkpoints.num-retained=3
>>>
>>> I also copied the flink-s3-fs-hadoop-1.5.0.jar into the lib folder.
>>>
>>> I get now though the following error message:
>>>
>>> Caused by: java.lang.NullPointerException: null uri host.
>>> at java.util.Objects.requireNonNull(Objects.java:228)
>>> at
>>> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:65)
>>> at
>>> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:165)
>>> at
>>> org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(S3FileSystemFactory.java:133)
>>>
>>> I tried to dig deeper into the source code, but struggled to find
>>>
>>>- what is meant with this URI
>>>- where to configure it
>>>
>>> Can anybody give some advice how to set up the S3 Backend with the new
>>> shaded lib jar?
>>>
>>> Thanks in advance
>>> --
>>>
>>>
>>> *Fabian WollertZalando SE*
>>>
>>> E-Mail: fabian.woll...@zalando.de
>>>
>>> Tamara-Danz-Straße 1
>>> <https://maps.google.com/?q=Tamara-Danz-Stra%C3%9Fe+1+10243+Berlin=gmail=g>
>>> 10243 Berlin
>>> <https://maps.google.com/?q=Tamara-Danz-Stra%C3%9Fe+1+10243+Berlin=gmail=g>
>>> Fax: +49 (0)30 2759 46 93
>>> E-mail: legalnot...@zalando.co.uk
>>> Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30
>>> 2000889349
>>>
>>> Management Board:
>>> Robert Gentz, David Schneider, Rubin Ritter
>>>
>>> Chairman of the Supervisory Board:
>>> Lothar Lanz
>>>
>>> Person responsible for providing the contents of Zalando SE acc. to Art.
>>> 55 RStV [Interstate Broadcasting Agreement]: Rubin Ritter
>>> Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
>>> VAT registration number: DE 260543043
>>>
>>
>>


Re: Flink and AWS S3 integration: java.lang.NullPointerException: null uri host

2018-05-31 Thread Fabian Wollert
I'm running it in docker on EC2, cant use EMR ... yes i followed those
instructions.

Cheers

--


*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de


Am Do., 31. Mai 2018 um 03:07 Uhr schrieb Bowen Li :

> Did you run Flink on AWS EMR or somewhere else? Have you read and followed
> instructions on
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#amazon-web-services-aws
> ?
>
>
>
> On Wed, May 30, 2018 at 7:08 AM, Fabian Wollert  wrote:
>
>> Hi, I'm trying to set up Checkpoints for Flink Jobs with S3 as a
>> filesystem backend. I configured the following:
>>
>> state.backend=filesystem
>> state.backend.fs.checkpointdir=s3:///mybucket/
>> state.checkpoints.dir=s3:///mybucket/
>> state.checkpoints.num-retained=3
>>
>> I also copied the flink-s3-fs-hadoop-1.5.0.jar into the lib folder.
>>
>> I get now though the following error message:
>>
>> Caused by: java.lang.NullPointerException: null uri host.
>> at java.util.Objects.requireNonNull(Objects.java:228)
>> at
>> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:65)
>> at
>> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:165)
>> at
>> org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(S3FileSystemFactory.java:133)
>>
>> I tried to dig deeper into the source code, but struggled to find
>>
>>- what is meant with this URI
>>- where to configure it
>>
>> Can anybody give some advice how to set up the S3 Backend with the new
>> shaded lib jar?
>>
>> Thanks in advance
>> --
>>
>>
>> *Fabian WollertZalando SE*
>>
>> E-Mail: fabian.woll...@zalando.de
>>
>> Tamara-Danz-Straße 1
>> <https://maps.google.com/?q=Tamara-Danz-Stra%C3%9Fe+1+10243+Berlin=gmail=g>
>> 10243 Berlin
>> <https://maps.google.com/?q=Tamara-Danz-Stra%C3%9Fe+1+10243+Berlin=gmail=g>
>> Fax: +49 (0)30 2759 46 93
>> E-mail: legalnot...@zalando.co.uk
>> Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30
>> 2000889349
>>
>> Management Board:
>> Robert Gentz, David Schneider, Rubin Ritter
>>
>> Chairman of the Supervisory Board:
>> Lothar Lanz
>>
>> Person responsible for providing the contents of Zalando SE acc. to Art.
>> 55 RStV [Interstate Broadcasting Agreement]: Rubin Ritter
>> Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
>> VAT registration number: DE 260543043
>>
>
>


Flink and AWS S3 integration: java.lang.NullPointerException: null uri host

2018-05-30 Thread Fabian Wollert
Hi, I'm trying to set up Checkpoints for Flink Jobs with S3 as a filesystem
backend. I configured the following:

state.backend=filesystem
state.backend.fs.checkpointdir=s3:///mybucket/
state.checkpoints.dir=s3:///mybucket/
state.checkpoints.num-retained=3

I also copied the flink-s3-fs-hadoop-1.5.0.jar into the lib folder.

I get now though the following error message:

Caused by: java.lang.NullPointerException: null uri host.
at java.util.Objects.requireNonNull(Objects.java:228)
at
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:65)
at
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:165)
at
org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(S3FileSystemFactory.java:133)

I tried to dig deeper into the source code, but struggled to find

   - what is meant with this URI
   - where to configure it

Can anybody give some advice how to set up the S3 Backend with the new
shaded lib jar?

Thanks in advance
--


*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de

Tamara-Danz-Straße 1
10243 Berlin
Fax: +49 (0)30 2759 46 93
E-mail: legalnot...@zalando.co.uk
Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30
2000889349

Management Board:
Robert Gentz, David Schneider, Rubin Ritter

Chairman of the Supervisory Board:
Lothar Lanz

Person responsible for providing the contents of Zalando SE acc. to Art. 55
RStV [Interstate Broadcasting Agreement]: Rubin Ritter
Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
VAT registration number: DE 260543043


Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-17 Thread Fabian Wollert
TL;DR: remove all lucene and elasticsearch libs in your flink env and just
use maven to manage dependencies, when working with the flink elasticsearch
connector.

so in the first place i deleted the libs in the folder to see if its
working, but it did not. then we thought if maybe flink loads already the
libs at startup, so i packaged our flink appliance again, with out the old
lucene lib which was still loaded, and then redeployed, and et voilà, it
worked then!

thanks guys for the investigation help!

Cheers


--

*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de
Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>

2017-07-17 9:58 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:

> Hi,
>
> I would also recommend checking the `lib/` folder of your Flink
> installation to see if there is any dangling old version jars that you
> added there.
> I did a quick dependency check on the Elasticsearch 2 connector, it is
> correctly pulling in Lucene 5.5.0 only, so this dependency should not pop
> up given that the user code is packaged properly.
> As of now, I would guess that it is some dependency conflict caused by
> either the reasons mentioned above, or some other dependency in the user
> jar is pulling in a conflicting Lucene version.
>
> Of course, if you doubt otherwise and that isn’t the case, let us know the
> result of your checks so we can investigate further! Thanks.
>
> Cheers,
> Gordon
>
> On 17 July 2017 at 3:38:17 PM, Fabian Wollert (fabian.woll...@zalando.de)
> wrote:
>
> 1.3.0, but i only need the ES 2.X connector working right now, since
> that's the elasticsearch version we're using. another option would be to
> upgrade to ES 5 (at elast on dev) to see if its working as well, but that
> sounds not like fixing the problem for me :-D
>
> Cheers
> Fabian
>
>
> --
>
> *Fabian Wollert Zalando SE*
>
> E-Mail: fabian.woll...@zalando.de
> Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>
>
> 2017-07-16 15:47 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>
>> Hi,
>>
>> There was also a problem in releasing the ES 5 connector with Flink
>> 1.3.0. You only said you’re using Flink 1.3, would that be 1.3.0 or 1.3.1?
>>
>> Best,
>> Aljoscha
>>
>> On 16. Jul 2017, at 13:42, Fabian Wollert <fabian.woll...@zalando.de>
>> wrote:
>>
>> Hi Aljoscha,
>>
>> we are running Flink in Stand alone mode, inside Docker in AWS. I will
>> check tomorrow the dependencies, although i'm wondering: I'm running Flink
>> 1.3 averywhere and the appropiate ES connector which was only released with
>> 1.3, so it's weird where this dependency mix up comes from ... let's see ...
>>
>> Cheers
>> Fabian
>>
>>
>> --
>>
>> *Fabian Wollert Zalando SE*
>>
>> E-Mail: fabian.woll...@zalando.de
>> Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>
>>
>> 2017-07-14 11:15 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>>
>>> This kind of error almost always hints at a dependency clash, i.e. there
>>> is some version of this code in the class path that clashed with the
>>> version that the Flink program uses. That’s why it works in local mode,
>>> where there are probably not many other dependencies and not in cluster
>>> mode.
>>>
>>> How are you running it on the cluster? Standalone, YARN?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 13. Jul 2017, at 13:56, Fabian Wollert <fabian.woll...@zalando.de>
>>> wrote:
>>>
>>> Hi Timo, Hi Gordon,
>>>
>>> thx for the reply! I checked the connection from both clusters to each
>>> other, and i can telnet to the 9300 port of flink, so i think the
>>> connection is not an issue here.
>>>
>>> We are currently using in our live env a custom elasticsearch connector,
>>> which used some extra lib's deployed on the cluster. i found one lucene lib
>>> and deleted it (since all dependencies should be in the flink job jar), but
>>> that unfortunately did not help neither ...
>>>
>>> Cheers
>>> Fabian
>>>
>>>
>>> --
>>>
>>> *Fabian Wollert Data Engineering*
>>> *Technology*
>>>
>>> E-Mail: fabian.woll...@zalando.de
>>> Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>
>>>
>>> 2017-07-13 13:46 GMT+02:00 Timo Walther <twal...@apache.org>:
>>>
>>>> Hi Fabian,
>>>>
>>>> I loop in Gordon. Mayb

Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-17 Thread Fabian Wollert
1.3.0, but i only need the ES 2.X connector working right now, since that's
the elasticsearch version we're using. another option would be to upgrade
to ES 5 (at elast on dev) to see if its working as well, but that sounds
not like fixing the problem for me :-D

Cheers
Fabian


--

*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de
Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>

2017-07-16 15:47 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi,
>
> There was also a problem in releasing the ES 5 connector with Flink 1.3.0.
> You only said you’re using Flink 1.3, would that be 1.3.0 or 1.3.1?
>
> Best,
> Aljoscha
>
> On 16. Jul 2017, at 13:42, Fabian Wollert <fabian.woll...@zalando.de>
> wrote:
>
> Hi Aljoscha,
>
> we are running Flink in Stand alone mode, inside Docker in AWS. I will
> check tomorrow the dependencies, although i'm wondering: I'm running Flink
> 1.3 averywhere and the appropiate ES connector which was only released with
> 1.3, so it's weird where this dependency mix up comes from ... let's see ...
>
> Cheers
> Fabian
>
>
> --
>
> *Fabian WollertZalando SE*
>
> E-Mail: fabian.woll...@zalando.de
> Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>
>
> 2017-07-14 11:15 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>
>> This kind of error almost always hints at a dependency clash, i.e. there
>> is some version of this code in the class path that clashed with the
>> version that the Flink program uses. That’s why it works in local mode,
>> where there are probably not many other dependencies and not in cluster
>> mode.
>>
>> How are you running it on the cluster? Standalone, YARN?
>>
>> Best,
>> Aljoscha
>>
>> On 13. Jul 2017, at 13:56, Fabian Wollert <fabian.woll...@zalando.de>
>> wrote:
>>
>> Hi Timo, Hi Gordon,
>>
>> thx for the reply! I checked the connection from both clusters to each
>> other, and i can telnet to the 9300 port of flink, so i think the
>> connection is not an issue here.
>>
>> We are currently using in our live env a custom elasticsearch connector,
>> which used some extra lib's deployed on the cluster. i found one lucene lib
>> and deleted it (since all dependencies should be in the flink job jar), but
>> that unfortunately did not help neither ...
>>
>> Cheers
>> Fabian
>>
>>
>> --
>>
>> *Fabian WollertData Engineering*
>> *Technology*
>>
>> E-Mail: fabian.woll...@zalando.de
>> Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>
>>
>> 2017-07-13 13:46 GMT+02:00 Timo Walther <twal...@apache.org>:
>>
>>> Hi Fabian,
>>>
>>> I loop in Gordon. Maybe he knows whats happening here.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 13.07.17 um 13:26 schrieb Fabian Wollert:
>>>
>>> Hi everyone,
>>>
>>> I'm trying to make use of the new Elasticsearch Connector
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/elasticsearch.html>.
>>> I got a version running locally (with ssh tunnels to my Elasticsearch
>>> cluster in AWS) in my IDE, I see the data in Elasticsearch written
>>> perfectly, as I want it. As soon as I try to run this on our dev cluster
>>> (Flink 1.3.0, running in the same VPC like ) though, i get the following
>>> error message (in the sink):
>>>
>>> java.lang.NoSuchFieldError: LUCENE_5_5_0
>>> at org.elasticsearch.Version.(Version.java:295)
>>> at org.elasticsearch.client.transport.TransportClient$Builder.b
>>> uild(TransportClient.java:129)
>>> at org.apache.flink.streaming.connectors.elasticsearch2.Elastic
>>> search2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:65)
>>> at org.apache.flink.streaming.connectors.elasticsearch.Elastics
>>> earchSinkBase.open(ElasticsearchSinkBase.java:272)
>>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>> nFunction(FunctionUtils.java:36)
>>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>> erator.open(AbstractUdfStreamOperator.java:111)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>> perators(StreamTask.java:375)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:252)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>

Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-16 Thread Fabian Wollert
Hi Aljoscha,

we are running Flink in Stand alone mode, inside Docker in AWS. I will
check tomorrow the dependencies, although i'm wondering: I'm running Flink
1.3 averywhere and the appropiate ES connector which was only released with
1.3, so it's weird where this dependency mix up comes from ... let's see ...

Cheers
Fabian


--

*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de
Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>

2017-07-14 11:15 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:

> This kind of error almost always hints at a dependency clash, i.e. there
> is some version of this code in the class path that clashed with the
> version that the Flink program uses. That’s why it works in local mode,
> where there are probably not many other dependencies and not in cluster
> mode.
>
> How are you running it on the cluster? Standalone, YARN?
>
> Best,
> Aljoscha
>
> On 13. Jul 2017, at 13:56, Fabian Wollert <fabian.woll...@zalando.de>
> wrote:
>
> Hi Timo, Hi Gordon,
>
> thx for the reply! I checked the connection from both clusters to each
> other, and i can telnet to the 9300 port of flink, so i think the
> connection is not an issue here.
>
> We are currently using in our live env a custom elasticsearch connector,
> which used some extra lib's deployed on the cluster. i found one lucene lib
> and deleted it (since all dependencies should be in the flink job jar), but
> that unfortunately did not help neither ...
>
> Cheers
> Fabian
>
>
> --
>
> *Fabian WollertData Engineering*
> *Technology*
>
> E-Mail: fabian.woll...@zalando.de
> Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>
>
> 2017-07-13 13:46 GMT+02:00 Timo Walther <twal...@apache.org>:
>
>> Hi Fabian,
>>
>> I loop in Gordon. Maybe he knows whats happening here.
>>
>> Regards,
>> Timo
>>
>>
>> Am 13.07.17 um 13:26 schrieb Fabian Wollert:
>>
>> Hi everyone,
>>
>> I'm trying to make use of the new Elasticsearch Connector
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/elasticsearch.html>.
>> I got a version running locally (with ssh tunnels to my Elasticsearch
>> cluster in AWS) in my IDE, I see the data in Elasticsearch written
>> perfectly, as I want it. As soon as I try to run this on our dev cluster
>> (Flink 1.3.0, running in the same VPC like ) though, i get the following
>> error message (in the sink):
>>
>> java.lang.NoSuchFieldError: LUCENE_5_5_0
>> at org.elasticsearch.Version.(Version.java:295)
>> at org.elasticsearch.client.transport.TransportClient$Builder.
>> build(TransportClient.java:129)
>> at org.apache.flink.streaming.connectors.elasticsearch2.Elastic
>> search2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:65)
>> at org.apache.flink.streaming.connectors.elasticsearch.Elastics
>> earchSinkBase.open(ElasticsearchSinkBase.java:272)
>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:111)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:375)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:252)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> I first thought that this has something to do with mismatched versions,
>> but it happens to me with Elasticsearch 2.2.2 (bundled with Lucene 5.4.1)
>> and Elasticsearch 2.3 (bundled with Lucene 5.5.0).
>>
>> Can someone point to what exact version conflict is happening here (or
>> where to investigate further)? Currently my set up looks like everything is
>> actually running with Lucene 5.5.0, so I'm wondering where that error
>> message is exactly coming from. And also why it is running locally, but not
>> in the cluster. I'm still investigating if this is a general connection
>> issue from the Flink cluster to the ES cluster, but that would be
>> surprising, and also that error message would be then misleading 
>>
>> Cheers
>> Fabian
>>
>> --
>> *Fabian Wollert*
>> *Senior Data Engineer*
>>
>> *POSTAL ADDRESS*
>> *Zalando SE*
>> *11501 Berlin*
>>
>> *OFFICE*
>> *Zalando SE*
>> *Charlottenstraße 4*
>> *10969 Berlin*
>> *Germany*
>>
>> *Email: fabian.woll...@zalando.de <fabian.woll...@zalando.de>*
>> *Web: corporate.zalando.com <http://corporate.zalando.com/>*
>> *Jobs: jobs.zalando.de <http://jobs.zalando.de/>*
>>
>> *Zalando SE, Tamara-Danz-Straße 1, 10243 Berlin*
>> *Company registration: Amtsgericht Charlottenburg, HRB 158855 B*
>> *VAT registration number: DE 260543043*
>> *Management Board: Robert Gentz, David Schneider, Rubin Ritter*
>> *Chairperson of the Supervisory Board: Lothar Lanz*
>> *Registered office: Berlin*
>>
>>
>>
>
>


Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-13 Thread Fabian Wollert
Hi Timo, Hi Gordon,

thx for the reply! I checked the connection from both clusters to each
other, and i can telnet to the 9300 port of flink, so i think the
connection is not an issue here.

We are currently using in our live env a custom elasticsearch connector,
which used some extra lib's deployed on the cluster. i found one lucene lib
and deleted it (since all dependencies should be in the flink job jar), but
that unfortunately did not help neither ...

Cheers
Fabian


--

*Fabian WollertData Engineering*
*Technology*

E-Mail: fabian.woll...@zalando.de
Location: ZMAP <http://zmap.zalando.net/?q=fabian.woll...@zalando.de>

2017-07-13 13:46 GMT+02:00 Timo Walther <twal...@apache.org>:

> Hi Fabian,
>
> I loop in Gordon. Maybe he knows whats happening here.
>
> Regards,
> Timo
>
>
> Am 13.07.17 um 13:26 schrieb Fabian Wollert:
>
> Hi everyone,
>
> I'm trying to make use of the new Elasticsearch Connector
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/elasticsearch.html>.
> I got a version running locally (with ssh tunnels to my Elasticsearch
> cluster in AWS) in my IDE, I see the data in Elasticsearch written
> perfectly, as I want it. As soon as I try to run this on our dev cluster
> (Flink 1.3.0, running in the same VPC like ) though, i get the following
> error message (in the sink):
>
> java.lang.NoSuchFieldError: LUCENE_5_5_0
> at org.elasticsearch.Version.(Version.java:295)
> at org.elasticsearch.client.transport.TransportClient$
> Builder.build(TransportClient.java:129)
> at org.apache.flink.streaming.connectors.elasticsearch2.
> Elasticsearch2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.
> java:65)
> at org.apache.flink.streaming.connectors.elasticsearch.
> ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:111)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:375)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:252)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
> I first thought that this has something to do with mismatched versions,
> but it happens to me with Elasticsearch 2.2.2 (bundled with Lucene 5.4.1)
> and Elasticsearch 2.3 (bundled with Lucene 5.5.0).
>
> Can someone point to what exact version conflict is happening here (or
> where to investigate further)? Currently my set up looks like everything is
> actually running with Lucene 5.5.0, so I'm wondering where that error
> message is exactly coming from. And also why it is running locally, but not
> in the cluster. I'm still investigating if this is a general connection
> issue from the Flink cluster to the ES cluster, but that would be
> surprising, and also that error message would be then misleading 
>
> Cheers
> Fabian
>
> --
> *Fabian Wollert*
> *Senior Data Engineer*
>
> *POSTAL ADDRESS*
> *Zalando SE*
> *11501 Berlin*
>
> *OFFICE*
> *Zalando SE*
> *Charlottenstraße 4*
> *10969 Berlin*
> *Germany*
>
> *Email: fabian.woll...@zalando.de <fabian.woll...@zalando.de>*
> *Web: corporate.zalando.com <http://corporate.zalando.com>*
> *Jobs: jobs.zalando.de <http://jobs.zalando.de>*
>
> *Zalando SE, Tamara-Danz-Straße 1, 10243 Berlin*
> *Company registration: Amtsgericht Charlottenburg, HRB 158855 B*
> *VAT registration number: DE 260543043*
> *Management Board: Robert Gentz, David Schneider, Rubin Ritter*
> *Chairperson of the Supervisory Board: Lothar Lanz*
> *Registered office: Berlin*
>
>
>