Re: CVE-2021-44228 - Log4j2 vulnerability

2022-02-03 Thread Martijn Visser
Hi Surendra,

You can follow the discussion on this topic in the Dev mailing list [1]. I
would expect it in the next couple of weeks.

Best regards,

Martijn

[1] https://lists.apache.org/thread/n417406j125n080vopljgfflc45yygh4

On Fri, 4 Feb 2022 at 08:49, Surendra Lalwani 
wrote:

> Hi Team,
>
> Any ETA on Flink version 1.13.6 release.
>
> Thanks and Regards ,
> Surendra Lalwani
>
>
> On Sun, Jan 9, 2022 at 3:50 PM David Morávek  wrote:
>
>> Flink community officially only supports current and previous minor
>> versions [1] (1.13, 1.14) with bug fixes. Personally I wouldn’t expect
>> there will be another patch release for 1.12.
>>
>> If you really need an extra release for the unsupported version, the most
>> straightforward approach would be manually building the Flink distribution
>> from sources [2] with the patches you need.
>>
>> [1]
>> https://flink.apache.org/downloads.html#update-policy-for-old-releases
>> [2]
>>
>> https://github.com/apache/flink/tree/release-1.12#building-apache-flink-from-source
>>
>> D.
>>
>> On Sun 9. 1. 2022 at 10:10, V N, Suchithra (Nokia - IN/Bangalore) <
>> suchithra@nokia.com> wrote:
>>
>>> Hi David,
>>>
>>>
>>>
>>> As per the below comments, Flink 1.14.3 is in preparation and this
>>> hasn't started yet for Flink 1.13.6. Flink 1.12.8 release will be
>>> planned after this? If there is no current plan, could you please let us
>>> know what will be the regular release timing for 1.12.8 version.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Suchithra
>>>
>>>
>>>
>>> *From:* David Morávek 
>>> *Sent:* Sunday, January 9, 2022 12:11 AM
>>> *To:* V N, Suchithra (Nokia - IN/Bangalore) 
>>> *Cc:* Chesnay Schepler ; Martijn Visser <
>>> mart...@ververica.com>; Michael Guterl ; Parag
>>> Somani ; patrick.eif...@sony.com; Richard
>>> Deurwaarder ; User ;
>>> subharaj.ma...@gmail.com; swamy.haj...@gmail.com
>>> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>>>
>>>
>>>
>>> Hi Suchithra,
>>>
>>>
>>>
>>> there is currently no plan on doing another 1.12 release
>>>
>>>
>>>
>>> D.
>>>
>>>
>>>
>>> On Sat 8. 1. 2022 at 18:02, V N, Suchithra (Nokia - IN/Bangalore) <
>>> suchithra@nokia.com> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> When can we expect the flink 1.12 releases with log4j 2.17.1?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Suchithra
>>>
>>>
>>>
>>> *From:* Martijn Visser 
>>> *Sent:* Thursday, January 6, 2022 7:45 PM
>>> *To:* patrick.eif...@sony.com
>>> *Cc:* David Morávek ; swamy.haj...@gmail.com;
>>> subharaj.ma...@gmail.com; V N, Suchithra (Nokia - IN/Bangalore) <
>>> suchithra@nokia.com>; Chesnay Schepler ; User <
>>> user@flink.apache.org>; Michael Guterl ; Richard
>>> Deurwaarder ; Parag Somani 
>>> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>>>
>>>
>>>
>>> Hi all,
>>>
>>>
>>>
>>> The ticket for upgrading Log4J to 2.17.0 is
>>> https://issues.apache.org/jira/browse/FLINK-25375. There's also the
>>> update to Log4j 2.17.1 which is tracked under
>>> https://issues.apache.org/jira/browse/FLINK-25472
>>>
>>>
>>>
>>> As you can see, both have a fix version set to 1.14.3 and 1.13.6. These
>>> versions haven't been released yet. Flink 1.14.3 is in preparation, this
>>> hasn't started yet for Flink 1.13.6.
>>>
>>>
>>>
>>> Best regards,
>>>
>>>
>>>
>>> Martijn
>>>
>>>
>>>
>>> On Thu, 6 Jan 2022 at 15:05,  wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> just to be sure: Which Flink Releases for 1.14 and 1.13 have the
>>> upgraded log4j version 2.17.0?
>>>
>>> Are those already deployed to docker?
>>>
>>>
>>>
>>> Many Thanks in Advance.
>>>
>>>
>>>
>>> Kind Regards,
>>>
>>>
>>>
>>> Patrick
>>>
>>> --
>>>
>>> Patrick Eifler
>>>
>>>
>>>
>>> Senior Software Engineer (BI)
>>>
>>> Cloud Gaming Engineering & Infrastructure
>>> Sony Interactive Entertainment LLC
>>>
>>> Wilhelmstraße 118, 10963 Berlin
>>>
>>>
>>> Germany
>>>
>>> E: patrick.eif...@sony.com
>>>
>>>
>>>
>>> *From: *David Morávek 
>>> *Date: *Wednesday, 29. December 2021 at 09:35
>>> *To: *narasimha 
>>> *Cc: *Debraj Manna , Martijn Visser <
>>> mart...@ververica.com>, V N, Suchithra (Nokia - IN/Bangalore) <
>>> suchithra@nokia.com>, Chesnay Schepler , user <
>>> user@flink.apache.org>, Michael Guterl , Richard
>>> Deurwaarder , Parag Somani 
>>> *Subject: *Re: CVE-2021-44228 - Log4j2 vulnerability
>>>
>>> Please follow the above mentioned ML thread for more details. Please
>>> note that this is a REGULAR release that is not motivated by the log4j CVE,
>>> so the stability of the release is the more important factor then having it
>>> out as soon as possible.
>>>
>>>
>>>
>>> D.
>>>
>>>
>>>
>>> On Mon, Dec 27, 2021 at 6:33 AM narasimha 
>>> wrote:
>>>
>>> Hi folks,
>>>
>>>
>>>
>>> When can we expect the release to be made available to the community?
>>>
>>>
>>>
>>> On Wed, Dec 22, 2021 at 3:07 PM David Morávek  wrote:
>>>
>>> Hi Debraj,
>>>
>>>
>>>
>>> we're currently not planning another emergency release as this CVE is
>>> not as critical for Flink users as the previous one. However, this patch
>>> will be included in all upcoming pat

Re: CVE-2021-44228 - Log4j2 vulnerability

2022-02-03 Thread Surendra Lalwani
Hi Team,

Any ETA on Flink version 1.13.6 release.

Thanks and Regards ,
Surendra Lalwani


On Sun, Jan 9, 2022 at 3:50 PM David Morávek  wrote:

> Flink community officially only supports current and previous minor
> versions [1] (1.13, 1.14) with bug fixes. Personally I wouldn’t expect
> there will be another patch release for 1.12.
>
> If you really need an extra release for the unsupported version, the most
> straightforward approach would be manually building the Flink distribution
> from sources [2] with the patches you need.
>
> [1] https://flink.apache.org/downloads.html#update-policy-for-old-releases
> [2]
>
> https://github.com/apache/flink/tree/release-1.12#building-apache-flink-from-source
>
> D.
>
> On Sun 9. 1. 2022 at 10:10, V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com> wrote:
>
>> Hi David,
>>
>>
>>
>> As per the below comments, Flink 1.14.3 is in preparation and this
>> hasn't started yet for Flink 1.13.6. Flink 1.12.8 release will be
>> planned after this? If there is no current plan, could you please let us
>> know what will be the regular release timing for 1.12.8 version.
>>
>>
>>
>> Regards,
>>
>> Suchithra
>>
>>
>>
>> *From:* David Morávek 
>> *Sent:* Sunday, January 9, 2022 12:11 AM
>> *To:* V N, Suchithra (Nokia - IN/Bangalore) 
>> *Cc:* Chesnay Schepler ; Martijn Visser <
>> mart...@ververica.com>; Michael Guterl ; Parag Somani
>> ; patrick.eif...@sony.com; Richard Deurwaarder <
>> rich...@xeli.eu>; User ; subharaj.ma...@gmail.com;
>> swamy.haj...@gmail.com
>> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>>
>>
>>
>> Hi Suchithra,
>>
>>
>>
>> there is currently no plan on doing another 1.12 release
>>
>>
>>
>> D.
>>
>>
>>
>> On Sat 8. 1. 2022 at 18:02, V N, Suchithra (Nokia - IN/Bangalore) <
>> suchithra@nokia.com> wrote:
>>
>> Hi,
>>
>>
>>
>> When can we expect the flink 1.12 releases with log4j 2.17.1?
>>
>>
>>
>> Thanks,
>>
>> Suchithra
>>
>>
>>
>> *From:* Martijn Visser 
>> *Sent:* Thursday, January 6, 2022 7:45 PM
>> *To:* patrick.eif...@sony.com
>> *Cc:* David Morávek ; swamy.haj...@gmail.com;
>> subharaj.ma...@gmail.com; V N, Suchithra (Nokia - IN/Bangalore) <
>> suchithra@nokia.com>; Chesnay Schepler ; User <
>> user@flink.apache.org>; Michael Guterl ; Richard
>> Deurwaarder ; Parag Somani 
>> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>>
>>
>>
>> Hi all,
>>
>>
>>
>> The ticket for upgrading Log4J to 2.17.0 is
>> https://issues.apache.org/jira/browse/FLINK-25375. There's also the
>> update to Log4j 2.17.1 which is tracked under
>> https://issues.apache.org/jira/browse/FLINK-25472
>>
>>
>>
>> As you can see, both have a fix version set to 1.14.3 and 1.13.6. These
>> versions haven't been released yet. Flink 1.14.3 is in preparation, this
>> hasn't started yet for Flink 1.13.6.
>>
>>
>>
>> Best regards,
>>
>>
>>
>> Martijn
>>
>>
>>
>> On Thu, 6 Jan 2022 at 15:05,  wrote:
>>
>> Hi,
>>
>>
>>
>> just to be sure: Which Flink Releases for 1.14 and 1.13 have the upgraded
>> log4j version 2.17.0?
>>
>> Are those already deployed to docker?
>>
>>
>>
>> Many Thanks in Advance.
>>
>>
>>
>> Kind Regards,
>>
>>
>>
>> Patrick
>>
>> --
>>
>> Patrick Eifler
>>
>>
>>
>> Senior Software Engineer (BI)
>>
>> Cloud Gaming Engineering & Infrastructure
>> Sony Interactive Entertainment LLC
>>
>> Wilhelmstraße 118, 10963 Berlin
>>
>>
>> Germany
>>
>> E: patrick.eif...@sony.com
>>
>>
>>
>> *From: *David Morávek 
>> *Date: *Wednesday, 29. December 2021 at 09:35
>> *To: *narasimha 
>> *Cc: *Debraj Manna , Martijn Visser <
>> mart...@ververica.com>, V N, Suchithra (Nokia - IN/Bangalore) <
>> suchithra@nokia.com>, Chesnay Schepler , user <
>> user@flink.apache.org>, Michael Guterl , Richard
>> Deurwaarder , Parag Somani 
>> *Subject: *Re: CVE-2021-44228 - Log4j2 vulnerability
>>
>> Please follow the above mentioned ML thread for more details. Please note
>> that this is a REGULAR release that is not motivated by the log4j CVE, so
>> the stability of the release is the more important factor then having it
>> out as soon as possible.
>>
>>
>>
>> D.
>>
>>
>>
>> On Mon, Dec 27, 2021 at 6:33 AM narasimha  wrote:
>>
>> Hi folks,
>>
>>
>>
>> When can we expect the release to be made available to the community?
>>
>>
>>
>> On Wed, Dec 22, 2021 at 3:07 PM David Morávek  wrote:
>>
>> Hi Debraj,
>>
>>
>>
>> we're currently not planning another emergency release as this CVE is not
>> as critical for Flink users as the previous one. However, this patch will
>> be included in all upcoming patch & minor releases. The patch release for
>> the 1.14.x branch is already in progress [1] (it may be bit delayed due to
>> the holiday season).
>>
>>
>>
>> [1] https://lists.apache.org/thread/24v8bh3jww7c5bvfgov9cp5mb0wtj7tk
>> 
>>
>>
>>
>> Best,
>>
>> D.
>>
>>
>>
>> On Wed, Dec 22, 2021 at 7:02 AM D

Re: Question around creating partitioned tables when running Flink from Ververica

2022-02-03 Thread Natu Lauchande
Hey Ingo,

Thanks for the quick response. I will bother you a bit more : ).

We have never used external catalogs do you perhaps have a link that we can
look at ?

The only reference that i see online is for custom catalogs is this the
same as external catalogs:
https://docs.ververica.com/user_guide/sql_development/catalogs.html#custom-catalogs


Best,
Natu

On Fri, Feb 4, 2022 at 8:59 AM Ingo Bürk  wrote:

> Hi Natu,
>
> the functionality hasn't been actively blocked, it just hasn't yet been
> implemented in the Ververica Platform Built-In Catalog. Using any
> external catalog which supports partitioning will work fine.
>
> I'll make a note internally for your request on this, though I cannot
> make any statements about timelines.
>
>
> Best
> Ingo
>
> On 04.02.22 07:25, Natu Lauchande wrote:
> > Good day,
> >
> > Although flink sql allows us to create partitioned tables, we are unable
> > to do so on vvp at the moment because of the below error:
> > Cause: Partitioned tables are not supported yet.
> > Can we understand the why the functionality was blocked or when will
> > partitioned tables be supported on vvp?
> >
> > Thanks,
> > Natu
>


Re: Question around creating partitioned tables when running Flink from Ververica

2022-02-03 Thread Ingo Bürk

Hi Natu,

the functionality hasn't been actively blocked, it just hasn't yet been 
implemented in the Ververica Platform Built-In Catalog. Using any 
external catalog which supports partitioning will work fine.


I'll make a note internally for your request on this, though I cannot 
make any statements about timelines.



Best
Ingo

On 04.02.22 07:25, Natu Lauchande wrote:

Good day,

Although flink sql allows us to create partitioned tables, we are unable 
to do so on vvp at the moment because of the below error:

Cause: Partitioned tables are not supported yet.
Can we understand the why the functionality was blocked or when will 
partitioned tables be supported on vvp?


Thanks,
Natu


Question around creating partitioned tables when running Flink from Ververica

2022-02-03 Thread Natu Lauchande
Good day,

Although flink sql allows us to create partitioned tables, we are unable to
do so on vvp at the moment because of the below error:
Cause: Partitioned tables are not supported yet.
Can we understand the why the functionality was blocked or when will
partitioned tables be supported on vvp?

Thanks,
Natu


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok it's not my data either. I think it may be a volume issue. I have
managed to consistently reproduce the error. I'll upload a reproducer ASAP.



On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:

> Ok so I tried to create a reproducer but I couldn't reproduce it. But the
> actual job once in a while throws that error. So I'm wondering if maybe one
> of the records that comes in is not valid, though I do validate prior to
> getting to the key and window operators.
>
> On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:
>
>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>> then it threw the error.
>>
>> On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:
>>
>>> Ok it may be the ElasticSearch connector causing the issue?
>>>
>>> If I use PrintSinkFunction then I get no error and my stats print as
>>> expected.
>>>
>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>> france...@ververica.com> wrote:
>>>
 Hi,
 your hash code and equals seems correct. Can you post a minimum stream
 pipeline reproducer using this class?

 FG

 On Tue, Feb 1, 2022 at 8:39 PM John Smith 
 wrote:

> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
> using low level state access APIs, this is most likely caused by
> non-deterministic shuffle key (hashCode and equals implementation).
>
> This is my class, is my hashCode deterministic?
>
> public final class MyEventCountKey {
> private final String countDateTime;
> private final String domain;
> private final String event;
>
> public MyEventCountKey(final String countDateTime, final String 
> domain, final String event) {
> this.countDateTime = countDateTime;
> this.domain = domain;
> this.event = event;
> }
>
> public String getCountDateTime() {
> return countDateTime;
> }
>
> public String getDomain() {
> return domain;
> }
>
> public String getEven() {
> return event;
> }
>
> @Override
> public String toString() {
> return countDateTime + "|" + domain + "|" + event;
> }
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> MyEventCountKey that = (MyEventCountKey) o;
> return countDateTime.equals(that.countDateTime) &&
> domain.equals(that.domain) &&
> event.equals(that.event);
> }
>
> @Override
> public int hashCode() {
> final int prime = 31;
> int result = 1;
> result = prime * result + countDateTime.hashCode();
> result = prime * result + domain.hashCode();
> result = prime * result +  event.hashCode();
> return result;
> }
> }
>
>


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok so I tried to create a reproducer but I couldn't reproduce it. But the
actual job once in a while throws that error. So I'm wondering if maybe one
of the records that comes in is not valid, though I do validate prior to
getting to the key and window operators.

On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:

> Actually maybe not because with PrintSinkFunction it ran for a bit and
> then it threw the error.
>
> On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:
>
>> Ok it may be the ElasticSearch connector causing the issue?
>>
>> If I use PrintSinkFunction then I get no error and my stats print as
>> expected.
>>
>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani 
>> wrote:
>>
>>> Hi,
>>> your hash code and equals seems correct. Can you post a minimum stream
>>> pipeline reproducer using this class?
>>>
>>> FG
>>>
>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>>> wrote:
>>>
 Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
 KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
 using low level state access APIs, this is most likely caused by
 non-deterministic shuffle key (hashCode and equals implementation).

 This is my class, is my hashCode deterministic?

 public final class MyEventCountKey {
 private final String countDateTime;
 private final String domain;
 private final String event;

 public MyEventCountKey(final String countDateTime, final String 
 domain, final String event) {
 this.countDateTime = countDateTime;
 this.domain = domain;
 this.event = event;
 }

 public String getCountDateTime() {
 return countDateTime;
 }

 public String getDomain() {
 return domain;
 }

 public String getEven() {
 return event;
 }

 @Override
 public String toString() {
 return countDateTime + "|" + domain + "|" + event;
 }

 @Override
 public boolean equals(Object o) {
 if (this == o) return true;
 if (o == null || getClass() != o.getClass()) return false;
 MyEventCountKey that = (MyEventCountKey) o;
 return countDateTime.equals(that.countDateTime) &&
 domain.equals(that.domain) &&
 event.equals(that.event);
 }

 @Override
 public int hashCode() {
 final int prime = 31;
 int result = 1;
 result = prime * result + countDateTime.hashCode();
 result = prime * result + domain.hashCode();
 result = prime * result +  event.hashCode();
 return result;
 }
 }




Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Actually maybe not because with PrintSinkFunction it ran for a bit and then
it threw the error.

On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:

> Ok it may be the ElasticSearch connector causing the issue?
>
> If I use PrintSinkFunction then I get no error and my stats print as
> expected.
>
> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani 
> wrote:
>
>> Hi,
>> your hash code and equals seems correct. Can you post a minimum stream
>> pipeline reproducer using this class?
>>
>> FG
>>
>> On Tue, Feb 1, 2022 at 8:39 PM John Smith  wrote:
>>
>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
>>> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
>>> using low level state access APIs, this is most likely caused by
>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>
>>> This is my class, is my hashCode deterministic?
>>>
>>> public final class MyEventCountKey {
>>> private final String countDateTime;
>>> private final String domain;
>>> private final String event;
>>>
>>> public MyEventCountKey(final String countDateTime, final String domain, 
>>> final String event) {
>>> this.countDateTime = countDateTime;
>>> this.domain = domain;
>>> this.event = event;
>>> }
>>>
>>> public String getCountDateTime() {
>>> return countDateTime;
>>> }
>>>
>>> public String getDomain() {
>>> return domain;
>>> }
>>>
>>> public String getEven() {
>>> return event;
>>> }
>>>
>>> @Override
>>> public String toString() {
>>> return countDateTime + "|" + domain + "|" + event;
>>> }
>>>
>>> @Override
>>> public boolean equals(Object o) {
>>> if (this == o) return true;
>>> if (o == null || getClass() != o.getClass()) return false;
>>> MyEventCountKey that = (MyEventCountKey) o;
>>> return countDateTime.equals(that.countDateTime) &&
>>> domain.equals(that.domain) &&
>>> event.equals(that.event);
>>> }
>>>
>>> @Override
>>> public int hashCode() {
>>> final int prime = 31;
>>> int result = 1;
>>> result = prime * result + countDateTime.hashCode();
>>> result = prime * result + domain.hashCode();
>>> result = prime * result +  event.hashCode();
>>> return result;
>>> }
>>> }
>>>
>>>


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok it may be the ElasticSearch connector causing the issue?

If I use PrintSinkFunction then I get no error and my stats print as
expected.

On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani 
wrote:

> Hi,
> your hash code and equals seems correct. Can you post a minimum stream
> pipeline reproducer using this class?
>
> FG
>
> On Tue, Feb 1, 2022 at 8:39 PM John Smith  wrote:
>
>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
>> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
>> using low level state access APIs, this is most likely caused by
>> non-deterministic shuffle key (hashCode and equals implementation).
>>
>> This is my class, is my hashCode deterministic?
>>
>> public final class MyEventCountKey {
>> private final String countDateTime;
>> private final String domain;
>> private final String event;
>>
>> public MyEventCountKey(final String countDateTime, final String domain, 
>> final String event) {
>> this.countDateTime = countDateTime;
>> this.domain = domain;
>> this.event = event;
>> }
>>
>> public String getCountDateTime() {
>> return countDateTime;
>> }
>>
>> public String getDomain() {
>> return domain;
>> }
>>
>> public String getEven() {
>> return event;
>> }
>>
>> @Override
>> public String toString() {
>> return countDateTime + "|" + domain + "|" + event;
>> }
>>
>> @Override
>> public boolean equals(Object o) {
>> if (this == o) return true;
>> if (o == null || getClass() != o.getClass()) return false;
>> MyEventCountKey that = (MyEventCountKey) o;
>> return countDateTime.equals(that.countDateTime) &&
>> domain.equals(that.domain) &&
>> event.equals(that.event);
>> }
>>
>> @Override
>> public int hashCode() {
>> final int prime = 31;
>> int result = 1;
>> result = prime * result + countDateTime.hashCode();
>> result = prime * result + domain.hashCode();
>> result = prime * result +  event.hashCode();
>> return result;
>> }
>> }
>>
>>


Re: flink savepoints not (completely) relocatable ?

2022-02-03 Thread Frank Dekervel

Hello,

Thanks for the research! Good to know the cause.

Greetings,
Frank

On 03.02.22 17:18, Dawid Wysakowicz wrote:
I looked into the code again and unfortunately I have bad news :( 
Indeed we treat S3 as if it always injects entropy. Even if the 
entropy key is not specified, which effectively means it is disabled. 
I created a JIRA ticket[1] to fix it.


Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-25952

On 03/02/2022 17:02, Frank Dekervel wrote:

Hello,

I didn't know about entropy injection. I have checked, and there is 
no entropy injection configured in my flink-conf.yaml. This is the 
relevant section:


s3.access-key: ???
s3.endpoint: http://minio/
s3.path.style.access: true
s3.secret-key: ???

I see that there are still S3 paths defined in the _metadata

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ cat _metadata 
 | strings | grep s3
Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/a6d59334-2769-4a6e-b582-d38d58352021 

Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f627c959-d69d-41a1-9732-748795efb9ad 

Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f9a03af4-2868-4797-a950-10257282ed1e 


...

not all paths are existing

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ l
b81e4e28-eabd-499e-9561-b98137084a9c  _metadata

Thanks!

Greetings,
Frank

On 03.02.22 16:38, Dawid Wysakowicz wrote:


Hi Frank.

Do you use entropy injection by chance? I am afraid savepoints are 
not relocatable in combination with entropy injection as described 
here[1].


Best,

Dawid

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints


On 03/02/2022 14:44, Frank Dekervel wrote:

Hello,

I'm trying to inspect a savepoint that was stored on s3://flink/ 
(on a minio server), i downloaded it to my laptop for inspection. I 
have two KeyedProcessFunctions (state in the same savepoint) and 
strangely enough, one works perfectly and the other one doesn't.


The code is fairly simple:

val savepoint= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;

import org.apache.flink.api.scala._

// first one

val ti= createTypeInformation[AlarmMessageKey]
val tia= createTypeInformation[AlmState]
val ds= savepoint.readKeyedState("alm-1", newAlmStateReader(), ti, 
tia)

val valss= ds.collect().asScala;

// now the second one:
val savepoint2= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;
val ds_sup= savepoint.readKeyedState("ags-1", newSupStateReader()); 
// here we ser/deser in kryo not scala case class serializer. No 
idea why, but that's how its in the savepoint

val vals_sup= ds_sup.collect().asScala;

The second one seems to fail because it wants to access the 
savepoint on the original path on S3 (which my laptop doesn't have 
access to). I tought savepoints were supposed to be relocatable. 
Weirdly enough, the first one works just fine.


This is the exception i get:

[error] Caused by: 
com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied 
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; 
Request ID: 79QK1G93VPVPED3H; S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=; 
Proxy: null), S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1512)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1096)
[error]   

Re: flink savepoints not (completely) relocatable ?

2022-02-03 Thread Dawid Wysakowicz
I looked into the code again and unfortunately I have bad news :( Indeed 
we treat S3 as if it always injects entropy. Even if the entropy key is 
not specified, which effectively means it is disabled. I created a JIRA 
ticket[1] to fix it.


Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-25952

On 03/02/2022 17:02, Frank Dekervel wrote:

Hello,

I didn't know about entropy injection. I have checked, and there is no 
entropy injection configured in my flink-conf.yaml. This is the 
relevant section:


s3.access-key: ???
s3.endpoint: http://minio/
s3.path.style.access: true
s3.secret-key: ???

I see that there are still S3 paths defined in the _metadata

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ cat _metadata 
 | strings | grep s3
Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/a6d59334-2769-4a6e-b582-d38d58352021 

Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f627c959-d69d-41a1-9732-748795efb9ad 

Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f9a03af4-2868-4797-a950-10257282ed1e 


...

not all paths are existing

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ l
b81e4e28-eabd-499e-9561-b98137084a9c  _metadata

Thanks!

Greetings,
Frank

On 03.02.22 16:38, Dawid Wysakowicz wrote:


Hi Frank.

Do you use entropy injection by chance? I am afraid savepoints are 
not relocatable in combination with entropy injection as described 
here[1].


Best,

Dawid

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints


On 03/02/2022 14:44, Frank Dekervel wrote:

Hello,

I'm trying to inspect a savepoint that was stored on s3://flink/ (on 
a minio server), i downloaded it to my laptop for inspection. I have 
two KeyedProcessFunctions (state in the same savepoint) and 
strangely enough, one works perfectly and the other one doesn't.


The code is fairly simple:

val savepoint= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;

import org.apache.flink.api.scala._

// first one

val ti= createTypeInformation[AlarmMessageKey]
val tia= createTypeInformation[AlmState]
val ds= savepoint.readKeyedState("alm-1", newAlmStateReader(), ti, tia)
val valss= ds.collect().asScala;

// now the second one:
val savepoint2= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;
val ds_sup= savepoint.readKeyedState("ags-1", newSupStateReader()); 
// here we ser/deser in kryo not scala case class serializer. No 
idea why, but that's how its in the savepoint

val vals_sup= ds_sup.collect().asScala;

The second one seems to fail because it wants to access the 
savepoint on the original path on S3 (which my laptop doesn't have 
access to). I tought savepoints were supposed to be relocatable. 
Weirdly enough, the first one works just fine.


This is the exception i get:

[error] Caused by: 
com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied 
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; 
Request ID: 79QK1G93VPVPED3H; S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=; 
Proxy: null), S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1512)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1096)
[error] at 
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at 
com.facebook.presto.hive.s3.Prest

Re: flink savepoints not (completely) relocatable ?

2022-02-03 Thread Frank Dekervel

Hello,

I didn't know about entropy injection. I have checked, and there is no 
entropy injection configured in my flink-conf.yaml. This is the relevant 
section:


s3.access-key: ???
s3.endpoint: http://minio/
s3.path.style.access: true
s3.secret-key: ???

I see that there are still S3 paths defined in the _metadata

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ cat _metadata  | 
strings | grep s3
Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/a6d59334-2769-4a6e-b582-d38d58352021 

Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f627c959-d69d-41a1-9732-748795efb9ad 


Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f9a03af4-2868-4797-a950-10257282ed1e
...

not all paths are existing

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ l
b81e4e28-eabd-499e-9561-b98137084a9c  _metadata

Thanks!

Greetings,
Frank

On 03.02.22 16:38, Dawid Wysakowicz wrote:


Hi Frank.

Do you use entropy injection by chance? I am afraid savepoints are not 
relocatable in combination with entropy injection as described here[1].


Best,

Dawid

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints


On 03/02/2022 14:44, Frank Dekervel wrote:

Hello,

I'm trying to inspect a savepoint that was stored on s3://flink/ (on 
a minio server), i downloaded it to my laptop for inspection. I have 
two KeyedProcessFunctions (state in the same savepoint) and strangely 
enough, one works perfectly and the other one doesn't.


The code is fairly simple:

val savepoint= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;

import org.apache.flink.api.scala._

// first one

val ti= createTypeInformation[AlarmMessageKey]
val tia= createTypeInformation[AlmState]
val ds= savepoint.readKeyedState("alm-1", newAlmStateReader(), ti, tia)
val valss= ds.collect().asScala;

// now the second one:
val savepoint2= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;
val ds_sup= savepoint.readKeyedState("ags-1", newSupStateReader()); 
// here we ser/deser in kryo not scala case class serializer. No idea 
why, but that's how its in the savepoint

val vals_sup= ds_sup.collect().asScala;

The second one seems to fail because it wants to access the savepoint 
on the original path on S3 (which my laptop doesn't have access to). 
I tought savepoints were supposed to be relocatable. Weirdly enough, 
the first one works just fine.


This is the exception i get:

[error] Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: 
Access Denied (Service: Amazon S3; Status Code: 403; Error Code: 
AccessDenied; Request ID: 79QK1G93VPVPED3H; S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=; 
Proxy: null), S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1512)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1096)
[error] at 
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1093)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1078)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:1071)
[error] at 
com.face

Reading from Kafka kafkarecorddeserializationschema

2022-02-03 Thread HG
Hello

Most examples available still use the FlinkKafkaConsumer unfortunately.
I need to consume events from Kafka.
The format is Long,Timestamp,String,String.

Do I need to create a custom deserializer?

What also confuses me is

KafkaSource** source = KafkaSource

How does it relate to the deserializer?
Is there a kind of  type or is  fine even if the message is a
composite of Long,String...?

Regards Hans


Re: flink savepoints not (completely) relocatable ?

2022-02-03 Thread Dawid Wysakowicz

Hi Frank.

Do you use entropy injection by chance? I am afraid savepoints are not 
relocatable in combination with entropy injection as described here[1].


Best,

Dawid

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints


On 03/02/2022 14:44, Frank Dekervel wrote:

Hello,

I'm trying to inspect a savepoint that was stored on s3://flink/ (on a 
minio server), i downloaded it to my laptop for inspection. I have two 
KeyedProcessFunctions (state in the same savepoint) and strangely 
enough, one works perfectly and the other one doesn't.


The code is fairly simple:

val savepoint= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;

import org.apache.flink.api.scala._

// first one

val ti= createTypeInformation[AlarmMessageKey]
val tia= createTypeInformation[AlmState]
val ds= savepoint.readKeyedState("alm-1", newAlmStateReader(), ti, tia)
val valss= ds.collect().asScala;

// now the second one:
val savepoint2= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;
val ds_sup= savepoint.readKeyedState("ags-1", newSupStateReader()); // 
here we ser/deser in kryo not scala case class serializer. No idea 
why, but that's how its in the savepoint

val vals_sup= ds_sup.collect().asScala;

The second one seems to fail because it wants to access the savepoint 
on the original path on S3 (which my laptop doesn't have access to). I 
tought savepoints were supposed to be relocatable. Weirdly enough, the 
first one works just fine.


This is the exception i get:

[error] Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: 
Access Denied (Service: Amazon S3; Status Code: 403; Error Code: 
AccessDenied; Request ID: 79QK1G93VPVPED3H; S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=; 
Proxy: null), S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1512)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1096)
[error] at 
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1093)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1078)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:1071)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$1(PrestoS3FileSystem.java:1015)
[error] at 
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:1014)
[error] at 
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
[error] at 
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
[error] at 
java.io.FilterInputStream.read(FilterInputStream.java:83)
[error] at 
org.apache.flink.fs.s3presto.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
[error] at 
org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
[error] at 
java.io.DataInputStream.readInt(DataInputStream.java:387)
[error] at 
org.apache.flink.core.io.V

Re: Json deserialisation with .jsonValue vs format=json in Table API

2022-02-03 Thread Francesco Guardiani
Hi,

I think the more stable option would be the first one, as it also gives you
more flexibility. Reading the row as string and then parsing it in a query
definitely costs more, and makes less straightforward to use the other
Schema features of table, such as watermark definition, primary keys, etc.

I guess you can implement it straightforwardly subclassing the existing
json format provided by flink, in particular
JsonRowDataDeserializationSchema.

A third solution would be to create a SplitFunction, like the one you
created, which directly performs the parsing, outputting rows rather than
strings. This removes the double parsing issue, but still create problems
when interacting with other schema features.

Hope it helps,
FG

On Thu, Feb 3, 2022 at 3:56 PM Илья Соин  wrote:

> Hi,
>
> I’m using the Table / SQL API.
>
> I have a stream of strings, where each message contains several json
> strings separated by "\n”.
> For example:
> {“timestamp”: “2021-01-01T00:00:00”, person: {“name”: “Vasya”}}\n
> {“timestamp”: “2021-01-01T01:00:00”, person: {“name”: “Max” }}
>
> I would like to split each message by “\n”, parse each string as a json
> object and get some of the fields.
>
> AFIK there are 2 ways to do it:
>
> 1) Write custom deserialiser and provide it in source table DDL, i.e.
> CREATE TABLE source (
> timestamp STRING,
> person: ROW(name STRING)
> )
> WITH(‘format’ = ‘multiline-json’, …);
>
> 2) Use ‘format’ = ‘raw’ and extract the needed fields using .jsonValue,
> i.e.
>
> CREATE TABLE source (
> row STRING
> );
>
> env.from("source")
> .joinLateral(
> call(SplitFunction.class, $("row"), "\n").as(“msg")
> )
> .select(
>  $("msg").jsonValue("$.timestamp", DataTypes.STRING()),
>  $("msg").jsonValue(“$.person.name",
> DataTypes.STRING()).as(“name”)
>);
>
> In 2), will each call of .jsonValue parse the string all over again or
> will it reuse the same JsonNode object internally? Which option better fits
> my problem?
>
> __
> Best, Ilya


Json deserialisation with .jsonValue vs format=json in Table API

2022-02-03 Thread Илья Соин
Hi, 

I’m using the Table / SQL API. 

I have a stream of strings, where each message contains several json strings 
separated by "\n”. 
For example:
{“timestamp”: “2021-01-01T00:00:00”, person: {“name”: “Vasya”}}\n 
{“timestamp”: “2021-01-01T01:00:00”, person: {“name”: “Max” }}

I would like to split each message by “\n”, parse each string as a json object 
and get some of the fields. 

AFIK there are 2 ways to do it:

1) Write custom deserialiser and provide it in source table DDL, i.e. 
CREATE TABLE source (
timestamp STRING,
person: ROW(name STRING)
)
WITH(‘format’ = ‘multiline-json’, …);

2) Use ‘format’ = ‘raw’ and extract the needed fields using .jsonValue, i.e.

CREATE TABLE source (
row STRING
);

env.from("source")
.joinLateral(
call(SplitFunction.class, $("row"), "\n").as(“msg")
)
.select(
 $("msg").jsonValue("$.timestamp", DataTypes.STRING()),
 $("msg").jsonValue(“$.person.name", DataTypes.STRING()).as(“name”)
   );

In 2), will each call of .jsonValue parse the string all over again or will it 
reuse the same JsonNode object internally? Which option better fits my problem?

__
Best, Ilya

Flink High-Availability and Job-Manager recovery

2022-02-03 Thread Koffman, Noa (Nokia - IL/Kfar Sava)
Hi all,
We are currently deploying flink on k8s 3 nodes cluster - with 1 job-manager 
and 3 task managers
We are trying to understand the recommendation for deployment, more 
specifically for recovery from job-manager failure, and have some questions 
about that:


  1.  If we use flink HA solution (either Kubernetes-HA or zookeeper), the 
documentation states we should define the ‘high-availability.storageDir

In the examples we found, there is mostly hdfs or s3 storage.

We were wondering if we could use Kubernetes PersistentVolumes and 
PersistentVolumeClaims, if we do use that, can each job-manager have its own 
volume? Or it must be shared?

  1.  Is there a solution for jobmanager recovery without HA? With the way our 
flink is currenly configured, killing the job-manager pod, all the jobs are 
lost.

Is there a way to configure the job-manager so that if it goes down and k8s 
restarts it, it will continue from the same state (restart all the tasks, etc…)?

For this, can a Persistent Volume be used, without HDFS or external solutions?

  1.  Regarding the deployment mode: we are working with beam + flink, and 
flink is running in session mode, we have a few long running streaming 
pipelines deployed (less then 10).

Is ‘session’ mode the right deployment mode for our type of deployment? Or 
should we consider switching to something different? (Per-job/application)



Thanks










flink savepoints not (completely) relocatable ?

2022-02-03 Thread Frank Dekervel

Hello,

I'm trying to inspect a savepoint that was stored on s3://flink/ (on a 
minio server), i downloaded it to my laptop for inspection. I have two 
KeyedProcessFunctions (state in the same savepoint) and strangely 
enough, one works perfectly and the other one doesn't.


The code is fairly simple:

val savepoint= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;

import org.apache.flink.api.scala._

// first one

val ti= createTypeInformation[AlarmMessageKey]
val tia= createTypeInformation[AlmState]
val ds= savepoint.readKeyedState("alm-1", newAlmStateReader(), ti, tia)
val valss= ds.collect().asScala;

// now the second one:
val savepoint2= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;
val ds_sup= savepoint.readKeyedState("ags-1", newSupStateReader()); // 
here we ser/deser in kryo not scala case class serializer. No idea why, 
but that's how its in the savepoint

val vals_sup= ds_sup.collect().asScala;

The second one seems to fail because it wants to access the savepoint on 
the original path on S3 (which my laptop doesn't have access to). I 
tought savepoints were supposed to be relocatable. Weirdly enough, the 
first one works just fine.


This is the exception i get:

[error] Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: 
Access Denied (Service: Amazon S3; Status Code: 403; Error Code: 
AccessDenied; Request ID: 79QK1G93VPVPED3H; S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=; 
Proxy: null), S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1512)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1096)
[error] at 
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1093)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1078)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:1071)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$1(PrestoS3FileSystem.java:1015)
[error] at 
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:1014)
[error] at 
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
[error] at 
java.io.BufferedInputStream.read(BufferedInputStream.java:265)

[error] at java.io.FilterInputStream.read(FilterInputStream.java:83)
[error] at 
org.apache.flink.fs.s3presto.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
[error] at 
org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)

[error] at java.io.DataInputStream.readInt(DataInputStream.java:387)
[error] at 
org.apache.flink.core.io.VersionedIOReadableWritable.read(VersionedIOReadableWritable.java:46)
[error] at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:139)
[error] at 
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.readMetaData(FullSnapshotRestoreOperati

How to prevent check pointing of timers ?

2022-02-03 Thread Alex Drobinsky
Dear flink user,

In our project, restoring the timer's state creates numerous issues, so I
would like to know
if it is possible to avoid save/restore of timers altogether.
If it isn't possible, how could I delete all registered timers during the
open function ?

Best regards,
Alexander


Re: Creating Flink SQL Row with named fields

2022-02-03 Thread Francesco Guardiani
Hi,

Unfortunately at the moment, creating a row with named fields is not
possible from the ROW constructor.

One solution could be to wrap it in a cast, like: CAST((f0 + 12, 'Hello
world') AS ROW)
Or you could create a UDF and use the @DataTypeHint to define the row
return type, with named fields.

Feel free to open an issue about that

FG

On Wed, Feb 2, 2022 at 5:18 PM Vladislav Keda <
vladislav.k...@glowbyteconsulting.com> wrote:

> Hi,
>
> I'm trying to create Row(..) using Flink SQL, but I can't assign names to
> its fields.
>
>
> *For example:*Input table1 structure:* (id INT, some_name STRING)*
> Query:  *select *, ROW(id, some_name) as row1 from table1*
> Output result structure:
> *(id  INT , some_name  STRING, row1 ROW (EXPR$0 INT, EXPR$1 STRING))*
>
> *Each nested field has a name like EXPR$ that does not satisfy me.*
>
> *If I write, for example:*Input table1 structure:* (id INT, some_name
> STRING)*
> Query:  *select *, ROW(id as nested_id, some_name as nested_some_name) as
> row1 from table1*
> Output result structure: *(id  INT , some_name  STRING, row1 ROW (EXPR$0
> INT, EXPR$1 STRING))*
>
>
> *I will get an exception like: *
>
>
>
>
>
>
>
>
> *Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
> failed. Encountered "as" at line 1, column 20.Was expecting one of:")"
> ..."," ...at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
> at
> ru.glowbyte.streaming.core.operators.internal.sql.SqlDrivenOperator.sqlQuery(SqlDrivenOperator.java:159)
> ... 59 more*
>
> How can I set the name for the field?
>
> Flink version - 1.13.3.
>
> ---
>
> Best Regards,
> Vladislav Keda
>


Re: Pojo State Migration - NPE with field deletion

2022-02-03 Thread bastien dine
Thanks for the JIRA ticket,


This is for sure pretty critical.
The "workaround" is to not remove the field but I am not sure if this is
acceptable :)

I could work on that, but someone need to point out to me where to start,

Do I work on the PojoSerializer, to make this case not throwing an
exception ?

Or do I try to find the root cause, namely why the field serializer of the
deleted field is still present ?

Regards,
--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 2 févr. 2022 à 16:37, Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> a écrit :

> Hello,
>
>
>
> Happened to me too, here’s the JIRA ticket:
> https://issues.apache.org/jira/browse/FLINK-21752
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* bastien dine 
> *Sent:* Mittwoch, 2. Februar 2022 16:01
> *To:* user 
> *Subject:* Pojo State Migration - NPE with field deletion
>
>
>
> Hello,
>
>
>
> I have some trouble restoring a state (pojo) after deleting a field
>
> According to documentation, it should not be a problem with POJO :
>
> *"**Fields can be removed. Once removed, the previous value for the
> removed field will be dropped in future checkpoints and savepoints."*
>
>
>
> Here is a short stack trace (full trace is below) :
>
>
>
> Caused by: java.lang.NullPointerException
>
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(
> PojoSerializer.java:119)
>
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:184)
>
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:56)
>
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
>
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:46)
>
>
>
>
>
> After some debug, it seems that the deleted POJO field still has a field
> serializer in the corresponding object PojoSerializer "fieldSerializers"
> array
>
> But it is not present in the "fields", where we have a gap of 1 index (for
> example 0-1-3-4)
>
> So when serializer reach index 2 we got this NPE,
>
>
>
> Why is the deleted field serializer still present ? this should have been
> dropped when resolving schema compatibility right ?
>
> I can not find anything on that matter, could someone help me with it ?
>
> Reproduced in flink 1.13 & 1.14, can not find any related JIRA too
>
>
>
> Best Regards,
>
> Bastien
>
>
> Full stack trace :
>
> 2022-02-02 15:44:20
>
> java.io.IOException: Could not perform checkpoint 2737490 for operator
> OperatorXXX
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpointOnBarrier(StreamTask.java:1274)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:
> 147)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.triggerCheckpoint(
> SingleCheckpointBarrierHandler.java:287)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler
> .java:64)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(
> SingleCheckpointBarrierHandler.java:493)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(
> AbstractAlignedBarrierHandlerState.java:74)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> AbstractAlignedBarrierHandlerState.barrierReceived(
> AbstractAlignedBarrierHandlerState.java:66)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.lambda$processBarrier$2(
> SingleCheckpointBarrierHandler.java:234)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(
> SingleCheckpointBarrierHandler.java:262)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.processBarrier(
> SingleCheckpointBarrierHandler.java:231)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>
> at org.apache.flink.streaming.runtime.io.
> AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput
> .java:110)
>
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:496)
>
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java: