Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-17 Thread Tzu-Li (Gordon) Tai
With task chaining as you're saying, could you help clarify how it works 
please?
Operator can be chained to be executed by a single task thread. See [1] for 
more details on that.

Basically, when two operators are chained together, the output of the first 
operator is immediately chained to the processElement of the next operator; 
it’s therefore just a consecutive invocation of processElements on the chained 
operators. There will be no thread-to-thread handover or buffering.

For example, a 
byte[] record can return from our parser a List of 10 SuccessEvents and 1 
ErrorEvent; we want to publish each Event immediately.
In that case, I would suggest using flatMap here, followed by chained splits 
and then sinks.

Using flatMap, you can collect elements as you iterate through the list element 
(i.e. `collector.collect(...)`). If the sinks are properly chained (which 
should be the case if there is no keyBy before the sink and you haven’t 
explicitly configured otherwise [2]), then for each .collect(...) the sink 
write will be invoked as part of the chain.

Effectively, this would then be writing to Kafka / Cassandra for every element 
as you iterate through that list (happening in the same thread since everything 
is chained), and matches what you have in mind.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html#tasks-and-operator-chains
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups

On 17 July 2017 at 2:06:52 PM, earellano (eric.arell...@ge.com) wrote:

Hi,  

Tzu-Li (Gordon) Tai wrote  
> These seems odd. Are your events intended to be a list? If not, this  
> should be a `DataStream  
>   
> `.  
>  
> From the code snippet you’ve attached in the first post, it seems like  
> you’ve initialized your source incorrectly.  
>  
> `env.fromElements(List<...>)` will take the whole list as a single event,  
> thus your source is only emitting a single list as a record.  

Ah sorry for the confusion. So the original code snippet isn't our actual  
code - it's a simplified and generified version so that it would be easy to  
reproduce the Null Pointer Exception without having to show our whole code  
base.  

To clarify, our input actually uses a Kafka Consumer that reads a byte[],  
which is then passed to our external library parser which takes a byte[] and  
converts it into a List. This is why we have to use  
DataStream>, rather than just DataStream. It's a  
requirement from the parser we have to use, because each byte[] array record  
can create both a SuccessEvent(s) and/or ErrorEvent(s).  

Our motivation for using the above map & for loop with conditional output  
logic was that we have to work with this whole List and not just  
individual Events, but don't want to wait for the whole list to be processed  
for the event at the beginning of the list to be outputted. For example, a  
byte[] record can return from our parser a List of 10 SuccessEvents and 1  
ErrorEvent; we want to publish each Event immediately. Low latency is  
extremely important to us.  

--  

With task chaining as you're saying, could you help clarify how it works  
please? With each record of type List and calling the Split Operator  
followed by the sink operators, does that whole record/list have to be split  
before it can then go on to the sink? Or does task chaining mean it  
immediately gets outputted to the sink?  


Thanks so much for all this help by the way!  




--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14300.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-17 Thread Fabian Wollert
1.3.0, but i only need the ES 2.X connector working right now, since that's
the elasticsearch version we're using. another option would be to upgrade
to ES 5 (at elast on dev) to see if its working as well, but that sounds
not like fixing the problem for me :-D

Cheers
Fabian


--

*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de
Location: ZMAP 

2017-07-16 15:47 GMT+02:00 Aljoscha Krettek :

> Hi,
>
> There was also a problem in releasing the ES 5 connector with Flink 1.3.0.
> You only said you’re using Flink 1.3, would that be 1.3.0 or 1.3.1?
>
> Best,
> Aljoscha
>
> On 16. Jul 2017, at 13:42, Fabian Wollert 
> wrote:
>
> Hi Aljoscha,
>
> we are running Flink in Stand alone mode, inside Docker in AWS. I will
> check tomorrow the dependencies, although i'm wondering: I'm running Flink
> 1.3 averywhere and the appropiate ES connector which was only released with
> 1.3, so it's weird where this dependency mix up comes from ... let's see ...
>
> Cheers
> Fabian
>
>
> --
>
> *Fabian WollertZalando SE*
>
> E-Mail: fabian.woll...@zalando.de
> Location: ZMAP 
>
> 2017-07-14 11:15 GMT+02:00 Aljoscha Krettek :
>
>> This kind of error almost always hints at a dependency clash, i.e. there
>> is some version of this code in the class path that clashed with the
>> version that the Flink program uses. That’s why it works in local mode,
>> where there are probably not many other dependencies and not in cluster
>> mode.
>>
>> How are you running it on the cluster? Standalone, YARN?
>>
>> Best,
>> Aljoscha
>>
>> On 13. Jul 2017, at 13:56, Fabian Wollert 
>> wrote:
>>
>> Hi Timo, Hi Gordon,
>>
>> thx for the reply! I checked the connection from both clusters to each
>> other, and i can telnet to the 9300 port of flink, so i think the
>> connection is not an issue here.
>>
>> We are currently using in our live env a custom elasticsearch connector,
>> which used some extra lib's deployed on the cluster. i found one lucene lib
>> and deleted it (since all dependencies should be in the flink job jar), but
>> that unfortunately did not help neither ...
>>
>> Cheers
>> Fabian
>>
>>
>> --
>>
>> *Fabian WollertData Engineering*
>> *Technology*
>>
>> E-Mail: fabian.woll...@zalando.de
>> Location: ZMAP 
>>
>> 2017-07-13 13:46 GMT+02:00 Timo Walther :
>>
>>> Hi Fabian,
>>>
>>> I loop in Gordon. Maybe he knows whats happening here.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 13.07.17 um 13:26 schrieb Fabian Wollert:
>>>
>>> Hi everyone,
>>>
>>> I'm trying to make use of the new Elasticsearch Connector
>>> .
>>> I got a version running locally (with ssh tunnels to my Elasticsearch
>>> cluster in AWS) in my IDE, I see the data in Elasticsearch written
>>> perfectly, as I want it. As soon as I try to run this on our dev cluster
>>> (Flink 1.3.0, running in the same VPC like ) though, i get the following
>>> error message (in the sink):
>>>
>>> java.lang.NoSuchFieldError: LUCENE_5_5_0
>>> at org.elasticsearch.Version.(Version.java:295)
>>> at org.elasticsearch.client.transport.TransportClient$Builder.b
>>> uild(TransportClient.java:129)
>>> at org.apache.flink.streaming.connectors.elasticsearch2.Elastic
>>> search2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:65)
>>> at org.apache.flink.streaming.connectors.elasticsearch.Elastics
>>> earchSinkBase.open(ElasticsearchSinkBase.java:272)
>>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>> nFunction(FunctionUtils.java:36)
>>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>> erator.open(AbstractUdfStreamOperator.java:111)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>> perators(StreamTask.java:375)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:252)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> I first thought that this has something to do with mismatched versions,
>>> but it happens to me with Elasticsearch 2.2.2 (bundled with Lucene 5.4.1)
>>> and Elasticsearch 2.3 (bundled with Lucene 5.5.0).
>>>
>>> Can someone point to what exact version conflict is happening here (or
>>> where to investigate further)? Currently my set up looks like everything is
>>> actually running with Lucene 5.5.0, so I'm wondering where that error
>>> message is exactly coming from. And also why it is running locally, but not
>>> in the cluster. I'm still investigating if this is a general connection
>>> issue from the Flink cluster to the ES cluster, but that would be
>>> surprising, and also that error message would be then misleading 
>>>
>>> Cheers
>>> Fabian
>>>
>>> --
>>> *Fabian Wollert*
>>> *Senior Data Engineer*
>>>
>>> *POSTAL ADDRESS*
>>> *Zalando SE*
>

Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-17 Thread Tzu-Li (Gordon) Tai
Hi,

I would also recommend checking the `lib/` folder of your Flink installation to 
see if there is any dangling old version jars that you added there.
I did a quick dependency check on the Elasticsearch 2 connector, it is 
correctly pulling in Lucene 5.5.0 only, so this dependency should not pop up 
given that the user code is packaged properly.
As of now, I would guess that it is some dependency conflict caused by either 
the reasons mentioned above, or some other dependency in the user jar is 
pulling in a conflicting Lucene version.

Of course, if you doubt otherwise and that isn’t the case, let us know the 
result of your checks so we can investigate further! Thanks.

Cheers,
Gordon

On 17 July 2017 at 3:38:17 PM, Fabian Wollert (fabian.woll...@zalando.de) wrote:

1.3.0, but i only need the ES 2.X connector working right now, since that's the 
elasticsearch version we're using. another option would be to upgrade to ES 5 
(at elast on dev) to see if its working as well, but that sounds not like 
fixing the problem for me :-D

Cheers
Fabian


--
Fabian Wollert
Zalando SE

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-16 15:47 GMT+02:00 Aljoscha Krettek :
Hi,

There was also a problem in releasing the ES 5 connector with Flink 1.3.0. You 
only said you’re using Flink 1.3, would that be 1.3.0 or 1.3.1?

Best,
Aljoscha

On 16. Jul 2017, at 13:42, Fabian Wollert  wrote:

Hi Aljoscha,

we are running Flink in Stand alone mode, inside Docker in AWS. I will check 
tomorrow the dependencies, although i'm wondering: I'm running Flink 1.3 
averywhere and the appropiate ES connector which was only released with 1.3, so 
it's weird where this dependency mix up comes from ... let's see ...

Cheers
Fabian


--
Fabian Wollert
Zalando SE

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-14 11:15 GMT+02:00 Aljoscha Krettek :
This kind of error almost always hints at a dependency clash, i.e. there is 
some version of this code in the class path that clashed with the version that 
the Flink program uses. That’s why it works in local mode, where there are 
probably not many other dependencies and not in cluster mode.

How are you running it on the cluster? Standalone, YARN?

Best,
Aljoscha

On 13. Jul 2017, at 13:56, Fabian Wollert  wrote:

Hi Timo, Hi Gordon,

thx for the reply! I checked the connection from both clusters to each other, 
and i can telnet to the 9300 port of flink, so i think the connection is not an 
issue here. 

We are currently using in our live env a custom elasticsearch connector, which 
used some extra lib's deployed on the cluster. i found one lucene lib and 
deleted it (since all dependencies should be in the flink job jar), but that 
unfortunately did not help neither ...

Cheers
Fabian


--
Fabian Wollert
Data Engineering
Technology

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-13 13:46 GMT+02:00 Timo Walther :
Hi Fabian,

I loop in Gordon. Maybe he knows whats happening here.

Regards,
Timo


Am 13.07.17 um 13:26 schrieb Fabian Wollert:
Hi everyone,

I'm trying to make use of the new Elasticsearch Connector. I got a version 
running locally (with ssh tunnels to my Elasticsearch cluster in AWS) in my 
IDE, I see the data in Elasticsearch written perfectly, as I want it. As soon 
as I try to run this on our dev cluster (Flink 1.3.0, running in the same VPC 
like ) though, i get the following error message (in the sink):

java.lang.NoSuchFieldError: LUCENE_5_5_0
at org.elasticsearch.Version.(Version.java:295)
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:129)
at 
org.apache.flink.streaming.connectors.elasticsearch2.Elasticsearch2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:65)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

I first thought that this has something to do with mismatched versions, but it 
happens to me with Elasticsearch 2.2.2 (bundled with Lucene 5.4.1) and 
Elasticsearch 2.3 (bundled with Lucene 5.5.0). 

Can someone point to what exact version conflict is happening here (or where to 
investigate further)? Currently my set up looks like everything is actually 
running with Lucene 5.5.0, so I'm wondering where that error message is exactly 
coming from. And also why it is running locally, but not in the cluster. I'm 
still investigating if this is a general connection issue from the Flink 
cluster to the ES cluster, but that would be surpri

Re: Reading static data

2017-07-17 Thread Fabian Hueske
You could either use a broadcast variable [1] or the distributed cache [2].

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#broadcast-variables
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#distributed-cache

2017-07-14 20:18 GMT+02:00 Mohit Anchlia :

> Is there a way to accomplish this for the batch operations?
>
> On Thu, Jul 13, 2017 at 4:59 AM, Timo Walther  wrote:
>
>> Hi Mohit,
>>
>> do you plan to implement a batch or streaming job? If it is a streaming
>> job: You can use a connected stream (see [1], Slide 34). The static data is
>> one side of the stream that could be updated from time to time and will
>> always propagated (using a broadcast()) to all workers that do filtering,
>> augmentation etc.
>>
>> [1] http://training.data-artisans.com/dataStream/1-intro.html
>>
>> I hope this helps.
>>
>> Timo
>>
>>
>> Am 13.07.17 um 02:16 schrieb Mohit Anchlia:
>>
>> What is the best way to read a map of lookup data? This lookup data is
>>> like a small short lived data that is available in transformation to do
>>> things like filtering, additional augmentation of data etc.
>>>
>>
>>
>>
>


Latency Measurement

2017-07-17 Thread Paolo Cristofanelli
Hi,

I would like to understand how to measure the latency of a record.
I have set up a simple project with a Kafka consumer that reads from a
topic and performs a simple map (with a thread sleep inside).

In order to measure the latency of this mapper I have added
env.getConfig().setLatencyTrackingInterval(10);

After that, I was planning to access the latency through the webUI
interface but the related graph does not show any values.
I do not understand why. I was thinking that I in the graph I should
observe at least the sleep duration.

I also have another question:

I am using a count window, aggregating every 100 input records and then I
perform a map. I want to see the latency as the difference between the time
at which the output record is emitted and the arrival time of the earliest
input record.

For example, the first value arrives at x. After x +5 I all the 100 values
arrived and the system can aggregate them. Now I perform the map operation
and we emit the output record at time x+15.
I would like to obtain 15 as latency.
Do you have any suggestion on how to proceed?

Thanks for your time,
Paolo Cristofanelli


Re: Latency Measurement

2017-07-17 Thread Chesnay Schepler

Hello,

As for 1), my suspicion is that this is caused by chaining. If the map 
function is chained to the kafka source then the latency markers are 
always immediately forwarded, regardless of what your map function is doing.
If the map function is indeed chained to the source, could you try again 
after disabling the chain by calling `X.map(...).createNewChain()` and 
report back?


As for 2), I don't think this is possible right now.

Regards,
Chesnay

On 17.07.2017 12:42, Paolo Cristofanelli wrote:

Hi,

I would like to understand how to measure the latency of a record.
I have set up a simple project with a Kafka consumer that reads from a 
topic and performs a simple map (with a thread sleep inside).


In order to measure the latency of this mapper I have added 
env.getConfig().setLatencyTrackingInterval(10);


After that, I was planning to access the latency through the webUI 
interface but the related graph does not show any values.
I do not understand why. I was thinking that I in the graph I should 
observe at least the sleep duration.


I also have another question:

I am using a count window, aggregating every 100 input records and 
then I perform a map. I want to see the latency as the difference 
between the time at which the output record is emitted and the arrival 
time of the earliest input record.


For example, the first value arrives at x. After x +5 I all the 100 
values arrived and the system can aggregate them. Now I perform the 
map operation and we emit the output record at time x+15.

I would like to obtain 15 as latency.
Do you have any suggestion on how to proceed?

Thanks for your time,
Paolo Cristofanelli





FileNotFoundException when restoring checkpoint

2017-07-17 Thread Shai Kaplan
Hi.
I'm running Flink 1.3.1 with checkpoints stored in Azure blobs. Incremental 
checkpoints feature is on.
The job is trying to restore a checkpoint and consistently gets:

java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: 
wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3
at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem.open(NativeAzureFileSystem.java:1905)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1276)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1458)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1319)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1493)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:965)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)

The name of the missing file is sometimes different, but it's always a missing 
file in checkpoint 37. The last successful checkpoint number was 41, so I'm 
guessing that's the checkpoint it's trying to restore, but because of the 
incremental checkpointing it also needs files from previous checkpoints, which 
are apparently missing. Could this be a problem in the interface with Azure? If 
some files failed to write, why didn't the checkpoint fail?

When I realized nothing is going to change I canceled the job, and started it 
from a savepoint, which was checkpoint number 40. I actually expected it to 
fail, and that I would have to restore it from a savepoint prior to the 
apparently corrupted checkpoint number 37, but it didn't fail. Should I infer 
that savepoints are self-contained and are not incremental?


Re: FileNotFoundException when restoring checkpoint

2017-07-17 Thread Chesnay Schepler

Hello,

If i recall correctly savepoints are always self-contained even if 
incremental checkpointing is enabled.

However, this doesn't appear to be documented anywhere.

As for the missing file, I'm looping in Stefan who is more knowledgeable 
about incremental checkpointing (and potentially know issues).


Regards,
Chesnay

On 17.07.2017 13:12, Shai Kaplan wrote:


Hi.

I'm running Flink 1.3.1 with checkpoints stored in Azure blobs. 
Incremental checkpoints feature is on.


The job is trying to restore a checkpoint and consistently gets:

java.lang.IllegalStateException: Could not initialize keyed state backend.

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)


at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.FileNotFoundException: 
wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3


at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem.open(NativeAzureFileSystem.java:1905)


at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)

at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)


at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)


at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)


at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)


at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1276)


at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1458)


at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1319)


at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1493)


at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:965)


at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)


The name of the missing file is sometimes different, but it's always a 
missing file in checkpoint 37. The last successful checkpoint number 
was 41, so I'm guessing that's the checkpoint it's trying to restore, 
but because of the incremental checkpointing it also needs files from 
previous checkpoints, which are apparently missing. Could this be a 
problem in the interface with Azure? If some files failed to write, 
why didn't the checkpoint fail?


When I realized nothing is going to change I canceled the job, and 
started it from a savepoint, which was checkpoint number 40. I 
actually expected it to fail, and that I would have to restore it from 
a savepoint prior to the apparently corrupted checkpoint number 37, 
but it didn't fail. Should I infer that savepoints are self-contained 
and are not incremental?






Re: FileNotFoundException when restoring checkpoint

2017-07-17 Thread Stefan Richter
Hi,

You assumed correctly that savepoints are always self-contained. Are you using 
externalized checkpoints? There is a known problem in that was fixed in the 
latest master and will go into 1.3.2, but this might be a different problem.

You are also correct that incremental checkpoints can reference files from 
previous checkpoints. Do you ever manually delete any checkpoint directories? 
Because they might still be referenced in other checkpoints.

I would assume that the missing file was written completely, because otherwise 
the checkpoint would already fail. However I am unsure about the exact 
guarantees (e.g. about visibility) in Azure blobs. Can you check if this file 
was ever created and when it starts to disappear? Does the directory of 
checkpoint 37 still exist in the file system?

Best,
Stefan

> Am 17.07.2017 um 13:12 schrieb Shai Kaplan :
> 
> Hi.
> I'm running Flink 1.3.1 with checkpoints stored in Azure blobs. Incremental 
> checkpoints feature is on.
> The job is trying to restore a checkpoint and consistently gets:
>  
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3
>  
> 
> at 
> org.apache.hadoop.fs.azure.NativeAzureFileSystem.open(NativeAzureFileSystem.java:1905)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
> at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1276)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1458)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1319)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1493)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:965)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>  
> The name of the missing file is sometimes different, but it's always a 
> missing file in checkpoint 37. The last successful checkpoint number was 41, 
> so I'm guessing that's the checkpoint it's trying to restore, but because of 
> the incremental checkpointing it also needs files from previous checkpoints, 
> which are apparently missing. Could this be a problem in the interface with 
> Azure? If some files failed to write, why didn't the checkpoint fail?
>  
> When I realized nothing is going to change I canceled the job, and started it 
> from a savepoint, which was checkpoint number 40. I actually expected it to 
> fail, and that I would have to restore it from a savepoint prior to the 
> apparently corrupted checkpoint number 37, but it didn't fail. Should I infer 
> that savepoints are self-contained and are not incremental?



Re: FileNotFoundException when restoring checkpoint

2017-07-17 Thread Stefan Richter
After giving it a second thought, this problem could a side effect of the issue 
fixed in https://issues.apache.org/jira/browse/FLINK-6964 
. If you want, you can try if 
your problem is fixed in the latest master. This fix will also go into the 
1.3.2 release branch today.

> Am 17.07.2017 um 14:37 schrieb Stefan Richter :
> 
> Hi,
> 
> You assumed correctly that savepoints are always self-contained. Are you 
> using externalized checkpoints? There is a known problem in that was fixed in 
> the latest master and will go into 1.3.2, but this might be a different 
> problem.
> 
> You are also correct that incremental checkpoints can reference files from 
> previous checkpoints. Do you ever manually delete any checkpoint directories? 
> Because they might still be referenced in other checkpoints.
> 
> I would assume that the missing file was written completely, because 
> otherwise the checkpoint would already fail. However I am unsure about the 
> exact guarantees (e.g. about visibility) in Azure blobs. Can you check if 
> this file was ever created and when it starts to disappear? Does the 
> directory of checkpoint 37 still exist in the file system?
> 
> Best,
> Stefan
> 
>> Am 17.07.2017 um 13:12 schrieb Shai Kaplan > >:
>> 
>> Hi.
>> I'm running Flink 1.3.1 with checkpoints stored in Azure blobs. Incremental 
>> checkpoints feature is on.
>> The job is trying to restore a checkpoint and consistently gets:
>>  
>> java.lang.IllegalStateException: Could not initialize keyed state backend.
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.FileNotFoundException: 
>> wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3
>>  
>> 
>> at 
>> org.apache.hadoop.fs.azure.NativeAzureFileSystem.open(NativeAzureFileSystem.java:1905)
>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>> at 
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
>> at 
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
>> at 
>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>> at 
>> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1276)
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1458)
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1319)
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1493)
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:965)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>>  
>> The name of the missing file is sometimes different, but it's always a 
>> missing file in checkpoint 37. The last successful checkpoint number was 41, 
>> so I'm guessing that's the checkpoint it's trying to restore, but because of 
>> the incremental checkpointing it also needs files from previous checkpoints, 
>> which are apparently missing. Could this be a problem in the interface with 
>> Azure? If some files failed to write, why didn't the checkpoint fail?
>>  
>> When I realized nothing is going to change I canceled the job, and started 
>> it from a savepoint, which was checkpoint number 40. I actually expected it 
>> to fail, and that I would have to restore it from a savepoint prior to the 
>> apparently corrupted checkpoint number 37, but it didn't fail. Should I 
>> infer that savepoints are self-contained and are not incremental?
> 



Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-17 Thread Fabian Wollert
TL;DR: remove all lucene and elasticsearch libs in your flink env and just
use maven to manage dependencies, when working with the flink elasticsearch
connector.

so in the first place i deleted the libs in the folder to see if its
working, but it did not. then we thought if maybe flink loads already the
libs at startup, so i packaged our flink appliance again, with out the old
lucene lib which was still loaded, and then redeployed, and et voilà, it
worked then!

thanks guys for the investigation help!

Cheers


--

*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de
Location: ZMAP 

2017-07-17 9:58 GMT+02:00 Tzu-Li (Gordon) Tai :

> Hi,
>
> I would also recommend checking the `lib/` folder of your Flink
> installation to see if there is any dangling old version jars that you
> added there.
> I did a quick dependency check on the Elasticsearch 2 connector, it is
> correctly pulling in Lucene 5.5.0 only, so this dependency should not pop
> up given that the user code is packaged properly.
> As of now, I would guess that it is some dependency conflict caused by
> either the reasons mentioned above, or some other dependency in the user
> jar is pulling in a conflicting Lucene version.
>
> Of course, if you doubt otherwise and that isn’t the case, let us know the
> result of your checks so we can investigate further! Thanks.
>
> Cheers,
> Gordon
>
> On 17 July 2017 at 3:38:17 PM, Fabian Wollert (fabian.woll...@zalando.de)
> wrote:
>
> 1.3.0, but i only need the ES 2.X connector working right now, since
> that's the elasticsearch version we're using. another option would be to
> upgrade to ES 5 (at elast on dev) to see if its working as well, but that
> sounds not like fixing the problem for me :-D
>
> Cheers
> Fabian
>
>
> --
>
> *Fabian Wollert Zalando SE*
>
> E-Mail: fabian.woll...@zalando.de
> Location: ZMAP 
>
> 2017-07-16 15:47 GMT+02:00 Aljoscha Krettek :
>
>> Hi,
>>
>> There was also a problem in releasing the ES 5 connector with Flink
>> 1.3.0. You only said you’re using Flink 1.3, would that be 1.3.0 or 1.3.1?
>>
>> Best,
>> Aljoscha
>>
>> On 16. Jul 2017, at 13:42, Fabian Wollert 
>> wrote:
>>
>> Hi Aljoscha,
>>
>> we are running Flink in Stand alone mode, inside Docker in AWS. I will
>> check tomorrow the dependencies, although i'm wondering: I'm running Flink
>> 1.3 averywhere and the appropiate ES connector which was only released with
>> 1.3, so it's weird where this dependency mix up comes from ... let's see ...
>>
>> Cheers
>> Fabian
>>
>>
>> --
>>
>> *Fabian Wollert Zalando SE*
>>
>> E-Mail: fabian.woll...@zalando.de
>> Location: ZMAP 
>>
>> 2017-07-14 11:15 GMT+02:00 Aljoscha Krettek :
>>
>>> This kind of error almost always hints at a dependency clash, i.e. there
>>> is some version of this code in the class path that clashed with the
>>> version that the Flink program uses. That’s why it works in local mode,
>>> where there are probably not many other dependencies and not in cluster
>>> mode.
>>>
>>> How are you running it on the cluster? Standalone, YARN?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 13. Jul 2017, at 13:56, Fabian Wollert 
>>> wrote:
>>>
>>> Hi Timo, Hi Gordon,
>>>
>>> thx for the reply! I checked the connection from both clusters to each
>>> other, and i can telnet to the 9300 port of flink, so i think the
>>> connection is not an issue here.
>>>
>>> We are currently using in our live env a custom elasticsearch connector,
>>> which used some extra lib's deployed on the cluster. i found one lucene lib
>>> and deleted it (since all dependencies should be in the flink job jar), but
>>> that unfortunately did not help neither ...
>>>
>>> Cheers
>>> Fabian
>>>
>>>
>>> --
>>>
>>> *Fabian Wollert Data Engineering*
>>> *Technology*
>>>
>>> E-Mail: fabian.woll...@zalando.de
>>> Location: ZMAP 
>>>
>>> 2017-07-13 13:46 GMT+02:00 Timo Walther :
>>>
 Hi Fabian,

 I loop in Gordon. Maybe he knows whats happening here.

 Regards,
 Timo


 Am 13.07.17 um 13:26 schrieb Fabian Wollert:

 Hi everyone,

 I'm trying to make use of the new Elasticsearch Connector
 .
 I got a version running locally (with ssh tunnels to my Elasticsearch
 cluster in AWS) in my IDE, I see the data in Elasticsearch written
 perfectly, as I want it. As soon as I try to run this on our dev cluster
 (Flink 1.3.0, running in the same VPC like ) though, i get the following
 error message (in the sink):

 java.lang.NoSuchFieldError: LUCENE_5_5_0
 at org.elasticsearch.Version.(Version.java:295)
 at org.elasticsearch.client.transport.TransportClient$Builder.b
 uild(TransportClient.java:129)
 at org.apache.flink.stre

Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-17 Thread earellano
Tzu-Li (Gordon) Tai wrote
> Basically, when two operators are chained together, the output of the
> first operator is immediately chained to the processElement of the next
> operator; it’s therefore just a consecutive invocation of processElements
> on the chained operators. There will be no thread-to-thread handover or
> buffering.

Okay great, chaining tasks does sound like what we want then.



Tzu-Li (Gordon) Tai wrote
> In that case, I would suggest using flatMap here, followed by chained
> splits and then sinks.

We changed our code to roughly follow this suggestion, but I'm not sure
we're doing this correctly? Is there a better way you recommend chaining the
tasks? As written below, are individual Events within the List being sent to
their respective sinks right away, or does the whole list have to split
first?

 

We also had issues getting flatMap to work, and map seemed more appropriate.
Our parser.parse() function has a one-to-one mapping between an input byte[]
to a List, and that never changes, so a map seems to make
sense to us. Were you suggesting a flatMap instead of map at this stage of
calling our parser, or did you mean to use a flatMap() after the parser and
before the split()?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14312.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.