Re: Metrics for FileSource

2022-06-13 Thread Martijn Visser
Hi,

I believe this is a case where for the FileSystem (both Source and Sink)
the metrics that are defined as part of FLIP-33 [1] have not been
implemented yet. I've created a ticket for that [2].

Best regards,

Martijn

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
[2] https://issues.apache.org/jira/browse/FLINK-28021

Op ma 13 jun. 2022 om 07:24 schreef Meghajit Mazumdar <
meghajit.mazum...@gojek.com>:

> Hi folks,
>
> Thanks for the reply.
> We have implemented our own SplitAssigner, FileReaderFormat and
> FileReaderFormat.Reader implementations. Hence, we plan to add custom
> metrics such as these:
> 1. No. of splits SplitAssigner is initialized with, number of splits
> re-added back to the SplitAssigner
> 2. Readers created per unit time
> 3. Time taken to create a reader
> 4. Time taken for the Reader to produce a single Row
> 5. Readers closed per unit time
> ... and some more
>
> However, since we haven't implemented our own FileSource or
> SplitEnumerator, we don't have visibility into the metrics of these
> components. We would ideally like to measure these:
> 1. Number of rows emitted by the source per unit time
> 2. Time taken by the enumerator to discover the splits
> 3. Total splits discovered
>
>
> Regards,
> Meghajit
>
>
> On Fri, Jun 10, 2022 at 10:04 PM Jing Ge  wrote:
>
>> Hi meghajit,
>>
>> I think it makes sense to extend the current metrics. Could you list all
>> metrics you need? Thanks!
>>
>> Best regards,
>> Jing
>>
>> On Fri, Jun 10, 2022 at 5:06 PM Lijie Wang 
>> wrote:
>>
>>> Hi Meghajit,
>>>
>>> As far as I know, currently, the FileSource does not have the metrics
>>> you need.  You can implement your own source, and register custom metrics
>>> via `SplitEnumeratorContext#metricGroup` and
>>> `SourceReaderContext#metricGroup`.
>>>
>>> Best,
>>> Lijie
>>>
>>> Meghajit Mazumdar  于2022年6月10日周五 16:36写道:
>>>
 Hello,

 We are working on a Flink project which uses FileSource to discover and
 read Parquet Files from GCS. ( using Flink 1.14)

 As part of this, we wanted to implement some health metrics around the
 code.
 I wanted to know whether Flink gathers some metrics by itself around
 FileSource, e;g, number of files discovered by the SplitEnumerator, number
 of files added back to SplitAssigner, time taken to process per split, etc 
 ?

 I checked in the official documentation
 
 but there doesn't appear to be. Is the solution then to implement
 custom metrics like this
 
 ?


 *Regards,*
 *Meghajit*

>>>
>
> --
> *Regards,*
> *Meghajit*
>


Re: Unsubscribe

2022-06-13 Thread Martijn Visser
Hi,

In order to unsubscribe, please send an email to
user-unsubscr...@flink.apache.org

Best regards,

Martijn

Op za 11 jun. 2022 om 19:46 schreef tarun joshi <1985.ta...@gmail.com>:

> Unsubscribe
>


Re: Unsubscribe

2022-06-13 Thread Martijn Visser
Hi,

In order to unsubscribe, please send an email to
user-unsubscr...@flink.apache.org

Best regards,

Martijn

Op vr 10 jun. 2022 om 17:23 schreef :

> Unsubscribe
>


Fwd: Apache flink doesn't work with avro kafka topic with multiple event types

2022-06-13 Thread Qingsheng Ren
Hi Sucheth,

If you are referring to Table / SQL API, I'm afraid it doesn't support
schema evolution or different types from one Kafka table. An
alternative way is to consume the topic with raw format [1] and do
deserialization with a UDTF. If you are using the DataStream API, you
can implement the KafkaRecordDeserializationSchema interface to
deserialize the message consumed from Kafka.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/raw/

Best regards,

Qingsheng

On Sun, Jun 12, 2022 at 12:53 PM Sucheth S  wrote:
>
> Hi,
>
> Apache Flink doesn't work with Avro serialized kafka topic with multiple 
> event types (
> TopicRecordNameStrategy for subject )
>
> Is there a way to read a generic record from avro serialized kafka topic 
> which can have messages with different schemas, basically 
> TopicRecordNameStrategy for the subject in the schema registry. ?
>
>
> Regards,
> Sucheth Shivakumar
> website : https://sucheths.com
> mobile : +1(650)-576-8050
> San Mateo, United States


Re: Flink SQL JDBC connector for Postgres can't handle UUID datatype

2022-06-13 Thread Martijn Visser
Hi Aaron,

There's currently no support in Flink indeed to insert an UUID data type
into Postgres. The Jira ticket you've included [1] is indeed the same
issue. It's just that the solution is most likely not to map it as a RAW
type, but use a STRING type instead. Is this something where you might want
to help out with a contribution?

Best regards,

Martijn

[1] https://issues.apache.org/jira/browse/FLINK-19869

Op vr 10 jun. 2022 om 08:37 schreef Xuyang :

> Unfortunately, it seems that flink currently doesn't support the type UUID
> for jdbc conector. I think you can create a new issue on jira and restart
> the dicussion about this.
>
>
> --
> Best!
> Xuyang
>
>
> At 2022-06-09 22:40:45, "Aaron Weiss"  wrote:
>
> Hey there,
>
> We have just started to use the Flink SQL JDBC connector to do some writes
> to Postgres.  Our Postgres table has a UUID column we need to write to
> through Flink SQL.  However, it doesn't appear that the connector supports
> that type in any way.
>
> UUID isn't listed in the Postgres datatypes in the Data Type Mapping
> section here
> .
> We attempted to use VARCHAR and STRING types in Flink SQL for this, but get
> errors doing SELECT or INSERT on the UUID column.
>
> We saw this open issue  
> that
> we think is similar to what we're experiencing.
>
> Any help would be appreciated!
>
> *Error Message when Inserting:*
> flink-jobmanager Caused by: java.sql.BatchUpdateException: Batch entry 0
> INSERT INTO users(email, tenant_uid)
> VALUES ('aaron.we...@broadcom.com',
> '76170855-6678-05ed-f913-f581fdc1aa87')
> was aborted:
> ERROR: column "tenant_uid" is of type uuid but expression is of type
> character varying
>
> *Flink SQL table DDL:*
> CREATE TABLE ica.postgresql.users (
>   email VARCHAR,
>   tenant_uid VARCHAR
> ) WITH (
>'connector' = 'jdbc',
>'url' = 'jdbc:postgresql://ica-postgresql:5432/ica',
>'username' = '',
>'password' = '',
>'table-name' = 'users'
> );
>
> *Postgres table DDL:*
> create table users
> (
> user_uid uuid default gen_random_uuid() not null,
> email text not null,
> created_date timestamp_ica default get_now_ica(),
> modified_date timestamp_ica default get_now_ica(),
> last_detected_date timestamp_ica default get_now_ica(),
> tenant_uid uuid not null,
> hits bigint default 0 not null,
> first_name text,
> last_name text,
> account_name text,
> domain text,
> title text,
> threat_score integer,
> org_unit text,
> ldap_dn text,
> ldap_memberof text[],
> ldap_hierarchical_memberof text[],
> source_user_id text,
> constraint pk_users
> primary key (tenant_uid, user_uid),
> constraint uk_users
> unique (tenant_uid, email)
> )
>
> --
>
> *Aaron Weiss*
> R&D Software Engineer   | Information Security Group   | Symantec
> Enterprise Division
> Broadcom
>
> mobile: 404-550-4299
>
> Atlanta, GA (USA)
> aaron.we...@broadcom.com   | broadcom.com
>
> This electronic communication and the information and any files
> transmitted with it, or attached to it, are confidential and are intended
> solely for the use of the individual or entity to whom it is addressed and
> may contain information that is confidential, legally privileged, protected
> by privacy laws, or otherwise restricted from disclosure to anyone else. If
> you are not the intended recipient or the person responsible for delivering
> the e-mail to the intended recipient, you are hereby notified that any use,
> copying, distributing, dissemination, forwarding, printing, or copying of
> this e-mail is strictly prohibited. If you received this e-mail in error,
> please return the e-mail to the sender, delete it from your computer, and
> destroy any printed copy of it.
>
>


Re: Kafka Consumer commit error

2022-06-13 Thread Martijn Visser
Hi Christian,

I would expect that after the broker comes back up and recovers completely,
these error messages would disappear automagically. It should not require a
restart (only time). Flink doesn't rely on Kafka's checkpointing mechanism
for fault tolerance.

Best regards,

Martijn

Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz <
christian.lor...@mapp.com>:

> Hi,
>
>
>
> we have some issues with a job using the flink-sql-connector-kafka (flink
> 1.15.0/standalone cluster). If one broker e.g. is restarted for
> maintainance (replication-factor=2), the taskmanagers executing the job are
> constantly logging errors on each checkpoint creation:
>
>
>
> Failed to commit consumer offsets for checkpoint 50659
>
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
> Offset commit failed with a retriable exception. You should retry
> committing the latest consumed offsets.
>
> Caused by:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
>
>
>
> AFAICT the error itself is produced by the underlying kafka consumer.
> Unfortunately this error cannot be reproduced on our test system.
>
> From my understanding this error might occur once, but follow up
> checkpoints / kafka commits should be fine again.
>
> Currently my only way of “fixing” the issue is to restart the taskmanagers.
>
>
>
> Is there maybe some kafka consumer setting which would help to circumvent
> this?
>
>
>
> Kind regards,
>
> Christian
>
> Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63,
> 80335 München.
> Registered with the District Court München HRB 226181
> Managing Directors: Frasier, Christopher & Warren, Steve
>
> This e-mail is from Mapp Digital and its international legal entities and
> may contain information that is confidential or proprietary.
> If you are not the intended recipient, do not read, copy or distribute the
> e-mail or any attachments. Instead, please notify the sender and delete the
> e-mail and any attachments.
> Please consider the environment before printing. Thank you.
>


Re: Flink Shaded dependencies and extending Flink APIs

2022-06-13 Thread Qingsheng Ren
Hi Andrew,

This is indeed a tricky case since Flink doesn't provide non-shaded
JAR for flink-json. One hacky solution in my mind is like:

1. Create a module let's say "wikimedia-event-utilities-shaded" that
relocates Jackson in the same way and uses the same Jackson version as
flink-shaded-jackson
2. Deploy the module to a local or remote Maven repository
3. Let your custom format depend on the
"wikimedia-event-utilities-shaded" module, then all Jackson
dependencies are relocated in the same way.

Another solution is that you can serialize then deserialize the
"different" ObjectNode to do the conversion but this sacrifices the
performance.

Hope this could be helpful!

Best regards,

Qingsheng

On Thu, Jun 9, 2022 at 8:29 PM Andrew Otto  wrote:
>
> Hi all,
>
> I'm working on an integration project trying to write some library code that 
> will allow us at the Wikimedia Foundation to use Flink with our 'Event 
> Platform'.  Specifically, I'm trying to write a reusable step near the end of 
> a pipeline that will ensure our JSON events satisfy some criteria before 
> producing them to Kafka.  Details here.
>
> I'm experimenting with writing my own custom format to do this.  But all I 
> really need to do is override JsonRowDataSerializationSchema's serialize 
> method and augment and validate the ObjectNode before it is serialized to 
> byte[].
>
> I'm running into an issue where the ObjectNode that is used by Flink here is 
> the shaded one: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,
>  whereas the WMF code I want to use to augment the ObjectNode is using a 
> regular non shaded one.  I can't pass the shaded ObjectNode instance to a 
> function that takes a non shaded one, and I can't cast the shaded ObjectNode 
> to non shaded either.
>
> My Q is: is there a way to extend Flink APIs that use shaded dependencies?  I 
> suppose I could copy/paste the whole of the "json" format code that I need 
> into my project and just make it my own, but this feels quite obnoxious.
>
> Thank you!
> -Andrew Otto
>  Wikimedia Foundation
>
>


Re: Kafka Consumer commit error

2022-06-13 Thread Alexander Fedulov
Hi Christian,

you should check if the exceptions that you see after the broker is back
from maintenance are the same as the ones you posted here. If you are using
EXACTLY_ONCE, it could be that the later errors are caused by Kafka purging
transactions that Flink attempts to commit [1].

Best,
Alexander Fedulov

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance

On Mon, Jun 13, 2022 at 12:04 PM Martijn Visser 
wrote:

> Hi Christian,
>
> I would expect that after the broker comes back up and recovers
> completely, these error messages would disappear automagically. It should
> not require a restart (only time). Flink doesn't rely on Kafka's
> checkpointing mechanism for fault tolerance.
>
> Best regards,
>
> Martijn
>
> Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz <
> christian.lor...@mapp.com>:
>
>> Hi,
>>
>>
>>
>> we have some issues with a job using the flink-sql-connector-kafka (flink
>> 1.15.0/standalone cluster). If one broker e.g. is restarted for
>> maintainance (replication-factor=2), the taskmanagers executing the job are
>> constantly logging errors on each checkpoint creation:
>>
>>
>>
>> Failed to commit consumer offsets for checkpoint 50659
>>
>> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
>> Offset commit failed with a retriable exception. You should retry
>> committing the latest consumed offsets.
>>
>> Caused by:
>> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
>> The coordinator is not available.
>>
>>
>>
>> AFAICT the error itself is produced by the underlying kafka consumer.
>> Unfortunately this error cannot be reproduced on our test system.
>>
>> From my understanding this error might occur once, but follow up
>> checkpoints / kafka commits should be fine again.
>>
>> Currently my only way of “fixing” the issue is to restart the
>> taskmanagers.
>>
>>
>>
>> Is there maybe some kafka consumer setting which would help to circumvent
>> this?
>>
>>
>>
>> Kind regards,
>>
>> Christian
>>
>> Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63,
>> 80335 München.
>> Registered with the District Court München HRB 226181
>> Managing Directors: Frasier, Christopher & Warren, Steve
>>
>> This e-mail is from Mapp Digital and its international legal entities and
>> may contain information that is confidential or proprietary.
>> If you are not the intended recipient, do not read, copy or distribute
>> the e-mail or any attachments. Instead, please notify the sender and delete
>> the e-mail and any attachments.
>> Please consider the environment before printing. Thank you.
>>
>


Re: Flink Shaded dependencies and extending Flink APIs

2022-06-13 Thread Chesnay Schepler

Can we find a more robust way to support this?

Both flink-shaded, any relocation pattern and 
JsonRowDataSerializationSchema are Flink internals that users shouldn't 
use/rely on.


On 13/06/2022 12:26, Qingsheng Ren wrote:

Hi Andrew,

This is indeed a tricky case since Flink doesn't provide non-shaded
JAR for flink-json. One hacky solution in my mind is like:

1. Create a module let's say "wikimedia-event-utilities-shaded" that
relocates Jackson in the same way and uses the same Jackson version as
flink-shaded-jackson
2. Deploy the module to a local or remote Maven repository
3. Let your custom format depend on the
"wikimedia-event-utilities-shaded" module, then all Jackson
dependencies are relocated in the same way.

Another solution is that you can serialize then deserialize the
"different" ObjectNode to do the conversion but this sacrifices the
performance.

Hope this could be helpful!

Best regards,

Qingsheng

On Thu, Jun 9, 2022 at 8:29 PM Andrew Otto  wrote:

Hi all,

I'm working on an integration project trying to write some library code that 
will allow us at the Wikimedia Foundation to use Flink with our 'Event 
Platform'.  Specifically, I'm trying to write a reusable step near the end of a 
pipeline that will ensure our JSON events satisfy some criteria before 
producing them to Kafka.  Details here.

I'm experimenting with writing my own custom format to do this.  But all I 
really need to do is override JsonRowDataSerializationSchema's serialize method 
and augment and validate the ObjectNode before it is serialized to byte[].

I'm running into an issue where the ObjectNode that is used by Flink here is 
the shaded one: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,
 whereas the WMF code I want to use to augment the ObjectNode is using a 
regular non shaded one.  I can't pass the shaded ObjectNode instance to a 
function that takes a non shaded one, and I can't cast the shaded ObjectNode to 
non shaded either.

My Q is: is there a way to extend Flink APIs that use shaded dependencies?  I suppose I 
could copy/paste the whole of the "json" format code that I need into my 
project and just make it my own, but this feels quite obnoxious.

Thank you!
-Andrew Otto
  Wikimedia Foundation






How to implement unnest() as udtf using Python?

2022-06-13 Thread John Tipper
Hi all,

Flink doesn’t support the unnest() function, which takes an array and creates a 
row for each element in the array. I have column containing an array of 
map and I’d like to implement my own unnest.

I try this as an initial do-nothing implementation:

@udtf(result_types=Datatypes.MAP(
key_type=Datatypes.STRING(), value_type=Datatypes.STRING()))
def my_unnest(arr):
return []

I get an error when Flink starts:

No match found for function signature my_unnest()

Is there something that I’m missing in my definition please?

Many thanks,

John


exception while activating calculateThroughp

2022-06-13 Thread Sigalit Eliazov
Hi all


We are using the flink k8s operator latest version with flink 1.14 in order
to deploy our pipelines in application mode (one job per cluster, one job
manager + one task manager)

Once in a few minutes I receive the following error in the job manager and
all the tasks are being restarted.

I understand that the throughput is calculator using data size and time
interval

there are cases where this time interval is negative and i am not able to
understand what can cause it or can i affect on it in any configuration



java.lang.IllegalArgumentException: Time should be non negative

at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:792)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:784)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.4.jar:1.14.4]

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.4.jar:1.14.4]


and idea/comments are welcome

Thanks

Sigalit


Re: How to implement unnest() as udtf using Python?

2022-06-13 Thread Martijn Visser
Hi John,

You're mentioning that Flink doesn't support UNNEST, but it does [1]. Would
this work for you?

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#array-expansion


Op ma 13 jun. 2022 om 13:55 schreef John Tipper :

> Hi all,
>
> Flink doesn’t support the unnest() function, which takes an array and
> creates a row for each element in the array. I have column containing an
> array of map and I’d like to implement my own unnest.
>
> I try this as an initial do-nothing implementation:
>
> @udtf(result_types=Datatypes.MAP(
> key_type=Datatypes.STRING(), value_type=Datatypes.STRING()))
> def my_unnest(arr):
> return []
>
> I get an error when Flink starts:
>
> No match found for function signature my_unnest( VARCHAR(2147483647)) MAP ARRAY>)
>
> Is there something that I’m missing in my definition please?
>
> Many thanks,
>
> John
>


Re: exception while activating calculateThroughp

2022-06-13 Thread Lijie Wang
Hi sigalit,
It's a known bug that has been fixed in Flink 1.15.0. See [1] and [2] for
details.

[1] https://issues.apache.org/jira/browse/FLINK-27712
[2] https://issues.apache.org/jira/browse/FLINK-25454

Best,
Lijie

Sigalit Eliazov  于2022年6月13日周一 20:17写道:

> Hi all
>
>
> We are using the flink k8s operator latest version with flink 1.14 in
> order to deploy our pipelines in application mode (one job per cluster, one
> job manager + one task manager)
>
> Once in a few minutes I receive the following error in the job manager and
> all the tasks are being restarted.
>
> I understand that the throughput is calculator using data size and time
> interval
>
> there are cases where this time interval is negative and i am not able to
> understand what can cause it or can i affect on it in any configuration
>
>
>
> java.lang.IllegalArgumentException: Time should be non negative
>
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:792)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:784)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>
>
> and idea/comments are welcome
>
> Thanks
>
> Sigalit
>


Re: How to implement unnest() as udtf using Python?

2022-06-13 Thread John Tipper
You’re a legend, thank you so much, I was looking on the internal functions 
docs page, not that one!

John

Sent from my iPhone

On 13 Jun 2022, at 13:21, Martijn Visser  wrote:


Hi John,

You're mentioning that Flink doesn't support UNNEST, but it does [1]. Would 
this work for you?

Best regards,

Martijn

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#array-expansion

Op ma 13 jun. 2022 om 13:55 schreef John Tipper 
mailto:john_tip...@hotmail.com>>:
Hi all,

Flink doesn’t support the unnest() function, which takes an array and creates a 
row for each element in the array. I have column containing an array of 
map and I’d like to implement my own unnest.

I try this as an initial do-nothing implementation:

@udtf(result_types=Datatypes.MAP(
key_type=Datatypes.STRING(), value_type=Datatypes.STRING()))
def my_unnest(arr):
return []

I get an error when Flink starts:

No match found for function signature my_unnest()

Is there something that I’m missing in my definition please?

Many thanks,

John


Re:unsubscribe

2022-06-13 Thread Xuyang
Hi, you can send any contents to user-unsubscr...@flink.apache.org to 
unsubscribe.



在 2022-06-12 11:41:27,"chenshu...@foxmail.com"  写道:

unsubscribe
退订


chenshu...@foxmail.com

NegativeArraySizeException trying to take a savepoint

2022-06-13 Thread Mike Barborak
When trying to savepoint our job, we are getting the stack trace below. Is 
there a way to know more about this failure? Like which function in the job 
graph is associated with the problematic state and which key (assuming it is 
keyed state)?

Or is there a fix for this exception? The only mention of this exception that I 
can find is in [1] and [2]. [1] has a message at the bottom saying that the 
issue was fixed in RocksDb in 2018. And while we do have a part of the job 
graph that matches the pattern discussed in these two links, our attempts to 
reproduce the problem by pumping messages through at a rate millions of times 
higher than normal have not worked.

We are using Flink version 1.13.5.

Thanks,
Mike

[1] https://issues.apache.org/jira/browse/FLINK-9268
[2] https://www.mail-archive.com/user@flink.apache.org/msg34915.html

Caused by: java.lang.Exception: Could not materialize checkpoint 49768 for 
operator KeyedProcess -> KeyedProcess -> re-operator-output -> Sink: Kafka sink 
to ec.platform.braid.responses-rtw (9/15)#0.
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)
... 4 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException: -785722504
at java.base/java.util.concurrent.FutureTask.report(Unknown 
Source)
at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128)
... 3 more
Caused by: java.lang.NegativeArraySizeException: -785722504
at org.rocksdb.RocksIterator.$$YJP$$value0(Native Method)
at org.rocksdb.RocksIterator.value0(RocksIterator.java)
at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
at 
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:103)
at 
org.apache.flink.contrib.streaming.state.iterator.RocksSingleStateIterator.value(RocksSingleStateIterator.java:66)
at 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:202)
at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:210)
at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)
at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)
at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)


Re: Kafka Consumer commit error

2022-06-13 Thread Christian Lorenz
Hi Martijn,

thanks for replying. I would also expect the behavior you describe below. 
AFAICT it was also like this with Flink 1.14. I am aware that Flink is using 
checkpointing for fault tolerance, but for example the Kafka offsets are part 
of our monitoring and this will lead to alerts. Other applications which use 
the Kafka client directly also do not show repeated commit failures once all 
Kafka brokers are online again.
I think this occurs in Flink jobs using Flinks Kafka Connector directly 
(KafkaSource) or via a Kafka SQL Connector based application.

Will try to write a small job to verify this behavior, as we also use 
flink-avro-confluent-registry which makes it harder to understand the root of 
the issue.

Best regards,
Christian

Von: Martijn Visser 
Datum: Montag, 13. Juni 2022 um 12:05
An: Christian Lorenz 
Cc: "user@flink.apache.org" 
Betreff: Re: Kafka Consumer commit error

This email has reached Mapp via an external source

Hi Christian,

I would expect that after the broker comes back up and recovers completely, 
these error messages would disappear automagically. It should not require a 
restart (only time). Flink doesn't rely on Kafka's checkpointing mechanism for 
fault tolerance.

Best regards,

Martijn

Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz 
mailto:christian.lor...@mapp.com>>:
Hi,

we have some issues with a job using the flink-sql-connector-kafka (flink 
1.15.0/standalone cluster). If one broker e.g. is restarted for maintainance 
(replication-factor=2), the taskmanagers executing the job are constantly 
logging errors on each checkpoint creation:

Failed to commit consumer offsets for checkpoint 50659
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
 Offset commit failed with a retriable exception. You should retry committing 
the latest consumed offsets.
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
 The coordinator is not available.

AFAICT the error itself is produced by the underlying kafka consumer. 
Unfortunately this error cannot be reproduced on our test system.
From my understanding this error might occur once, but follow up checkpoints / 
kafka commits should be fine again.
Currently my only way of “fixing” the issue is to restart the taskmanagers.

Is there maybe some kafka consumer setting which would help to circumvent this?

Kind regards,
Christian
Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
München.
Registered with the District Court München HRB 226181
Managing Directors: Frasier, Christopher & Warren, Steve
This e-mail is from Mapp Digital and its international legal entities and may 
contain information that is confidential or proprietary.
If you are not the intended recipient, do not read, copy or distribute the 
e-mail or any attachments. Instead, please notify the sender and delete the 
e-mail and any attachments.
Please consider the environment before printing. Thank you.
Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
München.
Registered with the District Court München HRB 226181
Managing Directors: Frasier, Christopher & Warren, Steve
This e-mail is from Mapp Digital and its international legal entities and may 
contain information that is confidential or proprietary.
If you are not the intended recipient, do not read, copy or distribute the 
e-mail or any attachments. Instead, please notify the sender and delete the 
e-mail and any attachments.
Please consider the environment before printing. Thank you.


Re: NegativeArraySizeException trying to take a savepoint

2022-06-13 Thread Martijn Visser
Hi Mike,

It would be worthwhile to check if this still occurs in Flink 1.14, since
Flink bumped to a newer version of RocksDB in that version. Is that a
possibility for you?

Best regards,

Martijn

Op ma 13 jun. 2022 om 15:21 schreef Mike Barborak :

> When trying to savepoint our job, we are getting the stack trace below. Is
> there a way to know more about this failure? Like which function in the job
> graph is associated with the problematic state and which key (assuming it
> is keyed state)?
>
>
>
> Or is there a fix for this exception? The only mention of this exception
> that I can find is in [1] and [2]. [1] has a message at the bottom saying
> that the issue was fixed in RocksDb in 2018. And while we do have a part of
> the job graph that matches the pattern discussed in these two links, our
> attempts to reproduce the problem by pumping messages through at a rate
> millions of times higher than normal have not worked.
>
>
>
> We are using Flink version 1.13.5.
>
>
>
> Thanks,
>
> Mike
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-9268
>
> [2] https://www.mail-archive.com/user@flink.apache.org/msg34915.html
>
>
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 49768 for
> operator KeyedProcess -> KeyedProcess -> re-operator-output -> Sink: Kafka
> sink to ec.platform.braid.responses-rtw (9/15)#0.
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)
>
> ... 4 more
>
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NegativeArraySizeException: -785722504
>
> at
> java.base/java.util.concurrent.FutureTask.report(Unknown Source)
>
> at java.base/java.util.concurrent.FutureTask.get(Unknown
> Source)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
>
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128)
>
> ... 3 more
>
> Caused by: java.lang.NegativeArraySizeException: -785722504
>
> at org.rocksdb.RocksIterator.$$YJP$$value0(Native Method)
>
> at org.rocksdb.RocksIterator.value0(RocksIterator.java)
>
> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>
> at
> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:103)
>
> at
> org.apache.flink.contrib.streaming.state.iterator.RocksSingleStateIterator.value(RocksSingleStateIterator.java:66)
>
> at
> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:202)
>
> at
> org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:210)
>
> at
> org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)
>
> at
> org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)
>
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
>
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
>
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
>
> at java.base/java.util.concurrent.FutureTask.run(Unknown
> Source)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
>


Re: Kafka Consumer commit error

2022-06-13 Thread Christian Lorenz
Hi Alexander,

thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this 
application. Do you think this might still be related?

Best regards,
Christian


Von: Alexander Fedulov 
Datum: Montag, 13. Juni 2022 um 13:06
An: "user@flink.apache.org" 
Cc: Christian Lorenz 
Betreff: Re: Kafka Consumer commit error

This email has reached Mapp via an external source

Hi Christian,

you should check if the exceptions that you see after the broker is back from 
maintenance are the same as the ones you posted here. If you are using 
EXACTLY_ONCE, it could be that the later errors are caused by Kafka purging 
transactions that Flink attempts to commit [1].

Best,
Alexander Fedulov

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance

On Mon, Jun 13, 2022 at 12:04 PM Martijn Visser 
mailto:martijnvis...@apache.org>> wrote:
Hi Christian,

I would expect that after the broker comes back up and recovers completely, 
these error messages would disappear automagically. It should not require a 
restart (only time). Flink doesn't rely on Kafka's checkpointing mechanism for 
fault tolerance.

Best regards,

Martijn

Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz 
mailto:christian.lor...@mapp.com>>:
Hi,

we have some issues with a job using the flink-sql-connector-kafka (flink 
1.15.0/standalone cluster). If one broker e.g. is restarted for maintainance 
(replication-factor=2), the taskmanagers executing the job are constantly 
logging errors on each checkpoint creation:

Failed to commit consumer offsets for checkpoint 50659
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
 Offset commit failed with a retriable exception. You should retry committing 
the latest consumed offsets.
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
 The coordinator is not available.

AFAICT the error itself is produced by the underlying kafka consumer. 
Unfortunately this error cannot be reproduced on our test system.
From my understanding this error might occur once, but follow up checkpoints / 
kafka commits should be fine again.
Currently my only way of “fixing” the issue is to restart the taskmanagers.

Is there maybe some kafka consumer setting which would help to circumvent this?

Kind regards,
Christian
Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
München.
Registered with the District Court München HRB 226181
Managing Directors: Frasier, Christopher & Warren, Steve
This e-mail is from Mapp Digital and its international legal entities and may 
contain information that is confidential or proprietary.
If you are not the intended recipient, do not read, copy or distribute the 
e-mail or any attachments. Instead, please notify the sender and delete the 
e-mail and any attachments.
Please consider the environment before printing. Thank you.
Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
München.
Registered with the District Court München HRB 226181
Managing Directors: Frasier, Christopher & Warren, Steve
This e-mail is from Mapp Digital and its international legal entities and may 
contain information that is confidential or proprietary.
If you are not the intended recipient, do not read, copy or distribute the 
e-mail or any attachments. Instead, please notify the sender and delete the 
e-mail and any attachments.
Please consider the environment before printing. Thank you.


Re: Apache flink doesn't work with avro kafka topic with multiple event types

2022-06-13 Thread Sucheth S
Thanks, I'll check it out.

On Mon, Jun 13, 2022 at 2:40 AM Qingsheng Ren  wrote:

> Hi Sucheth,
>
> If you are referring to Table / SQL API, I'm afraid it doesn't support
> schema evolution or different types from one Kafka table. An
> alternative way is to consume the topic with raw format [1] and do
> deserialization with a UDTF. If you are using the DataStream API, you
> can implement the KafkaRecordDeserializationSchema interface to
> deserialize the message consumed from Kafka.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/raw/
>
> Best regards,
>
> Qingsheng
>
> On Sun, Jun 12, 2022 at 12:53 PM Sucheth S  wrote:
> >
> > Hi,
> >
> > Apache Flink doesn't work with Avro serialized kafka topic with multiple
> event types (
> > TopicRecordNameStrategy for subject )
> >
> > Is there a way to read a generic record from avro serialized kafka topic
> which can have messages with different schemas, basically
> TopicRecordNameStrategy for the subject in the schema registry. ?
> >
> >
> > Regards,
> > Sucheth Shivakumar
> > website : https://sucheths.com
> > mobile : +1(650)-576-8050
> > San Mateo, United States
>
-- 
Regards,
Sucheth Shivakumar
website: https://sucheths.com
Mobile : +1(650)-576-8050
San Mateo, United States


Flink running same task on different Task Manager

2022-06-13 Thread Great Info
Hi,
I have one flink job which has two tasks
Task1- Source some static data over https and keep it in memory, this keeps
refreshing it every 1 hour
Task2- Process some real-time events from Kafka and uses static data to
validate something and transform, then forward to other Kafka topic.

so far, everything was running on the same Task manager(same node), but due
to some recent scaling requirements need to enable partitioning on Task2
and that will make some partitions run on other task managers. but other
task managers don't have the static data

is there a way to run Task1 on all the task managers? I don't want to
enable broadcasting since it is a little huge and also I can not persist
data in DB due to data regulations.


Flink - aggregated output with status progression

2022-06-13 Thread Dheeraj Taneja
Hello,

I have a stream of events that are coming over Kafka. Each event progresses
through a series of statuses. I want to display aggregated output of how
many events are in a particular status. If suppose an event has progressed
from status A to Status C then that event needs to be only counted for the
last status that it was in. Below is an example data and expected output.
What is the most effective way of doing this in Flink?

Sample Data

Event1: Event(Id1, statusA, 2022-06-09T16:15:08Z)
Event2: Event(Id2, statusA, 2022-06-09T16:20:08Z)
Event3: Event(Id1, statusB, 2022-06-09T16:25:08Z)
Event4: Event(id1, statusC, 2022-06-09T16:26:08Z)
Event4: Event(id3, statusC, 2022-06-09T16:30:08Z)

Outcome

Status A - 1 (Id2)
Status B - 0 (none)
Status C - 2 (Id1 & Id3)


Re: Kafka Consumer commit error

2022-06-13 Thread Alexander Fedulov
Hi Christian,

thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this
> application. Do you think this might still be related?


No, in that case, Kafka transactions are not used, so it should not be
relevant.

Best,
Alexander Fedulov

On Mon, Jun 13, 2022 at 3:48 PM Christian Lorenz 
wrote:

> Hi Alexander,
>
>
>
> thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this
> application. Do you think this might still be related?
>
>
>
> Best regards,
>
> Christian
>
>
>
>
>
> *Von: *Alexander Fedulov 
> *Datum: *Montag, 13. Juni 2022 um 13:06
> *An: *"user@flink.apache.org" 
> *Cc: *Christian Lorenz 
> *Betreff: *Re: Kafka Consumer commit error
>
>
>
> This email has reached Mapp via an external source
>
>
>
> Hi Christian,
>
>
>
> you should check if the exceptions that you see after the broker is back
> from maintenance are the same as the ones you posted here. If you are using
> EXACTLY_ONCE, it could be that the later errors are caused by Kafka purging
> transactions that Flink attempts to commit [1].
>
>
>
> Best,
>
> Alexander Fedulov
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance
>
>
>
> On Mon, Jun 13, 2022 at 12:04 PM Martijn Visser 
> wrote:
>
> Hi Christian,
>
>
>
> I would expect that after the broker comes back up and recovers
> completely, these error messages would disappear automagically. It should
> not require a restart (only time). Flink doesn't rely on Kafka's
> checkpointing mechanism for fault tolerance.
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz <
> christian.lor...@mapp.com>:
>
> Hi,
>
>
>
> we have some issues with a job using the flink-sql-connector-kafka (flink
> 1.15.0/standalone cluster). If one broker e.g. is restarted for
> maintainance (replication-factor=2), the taskmanagers executing the job are
> constantly logging errors on each checkpoint creation:
>
>
>
> Failed to commit consumer offsets for checkpoint 50659
>
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
> Offset commit failed with a retriable exception. You should retry
> committing the latest consumed offsets.
>
> Caused by:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
>
>
>
> AFAICT the error itself is produced by the underlying kafka consumer.
> Unfortunately this error cannot be reproduced on our test system.
>
> From my understanding this error might occur once, but follow up
> checkpoints / kafka commits should be fine again.
>
> Currently my only way of “fixing” the issue is to restart the taskmanagers.
>
>
>
> Is there maybe some kafka consumer setting which would help to circumvent
> this?
>
>
>
> Kind regards,
>
> Christian
>
> Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63,
> 80335 München.
> Registered with the District Court München HRB 226181
> Managing Directors: Frasier, Christopher & Warren, Steve
>
> This e-mail is from Mapp Digital and its international legal entities and
> may contain information that is confidential or proprietary.
> If you are not the intended recipient, do not read, copy or distribute the
> e-mail or any attachments. Instead, please notify the sender and delete the
> e-mail and any attachments.
> Please consider the environment before printing. Thank you.
>
> Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63,
> 80335 München.
> Registered with the District Court München HRB 226181
> Managing Directors: Frasier, Christopher & Warren, Steve
>
> This e-mail is from Mapp Digital and its international legal entities and
> may contain information that is confidential or proprietary.
> If you are not the intended recipient, do not read, copy or distribute the
> e-mail or any attachments. Instead, please notify the sender and delete the
> e-mail and any attachments.
> Please consider the environment before printing. Thank you.
>


Re: custom table source, how to support json?

2022-06-13 Thread Dian Fu
Hi Ivan,

Is your question how to parse the JSON string in PyFlink? If so, maybe you
could take a look at this [1].

Regards,
Dian

[1]
https://stackoverflow.com/questions/71820015/how-to-reference-nested-json-within-pyflink-sql-when-json-schema-varies

On Fri, Jun 10, 2022 at 7:40 PM ivan.ros...@agilent.com <
ivan.ros...@agilent.com> wrote:

> Hello,
>
>
>
> I have a flink table source working using
>
>
>
> """
>
> create table source (
>
> ts TIMESTAMP(3),
>
> log_line STRING,
>
> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>
> ) with (
>
> 'connector'='lokitail', 'query'='blah', 'url'='blah'
>
> )
>
> """)
>
>
>
> It uses a simple custom table source, which collects rows like this:
>
>
>
> ctx.collect(GenericRowData.of(
>
>
> TimestampData.fromEpochMillis(Instant.now().toEpochMilli()),
>
> StringData.fromString("field0_counter_" + count++))
>
> )
>
>
>
> I would like, instead, to just send a single JSON string, like:
>
>
>
> ctx.collect(GenericRowData.of(
>
> StringData.fromString("{\"value\" : \"field0_counter_" +
> count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}"))
>
> );
>
>
>
> And handle parsing in python flink.  Can this be done simply at the point
> of collecting the row data?
>
>
>
> Thank you,
>
>
> Ivan
>


Re: How to implement unnest() as udtf using Python?

2022-06-13 Thread Dian Fu
I second Martijn, UNNEST should be supported.

Besides, regarding the above exception, could you share an example which
could reproduce this issue?

Regards,
Dian

On Mon, Jun 13, 2022 at 8:21 PM Martijn Visser 
wrote:

> Hi John,
>
> You're mentioning that Flink doesn't support UNNEST, but it does [1].
> Would this work for you?
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#array-expansion
>
>
> Op ma 13 jun. 2022 om 13:55 schreef John Tipper :
>
>> Hi all,
>>
>> Flink doesn’t support the unnest() function, which takes an array and
>> creates a row for each element in the array. I have column containing an
>> array of map and I’d like to implement my own unnest.
>>
>> I try this as an initial do-nothing implementation:
>>
>> @udtf(result_types=Datatypes.MAP(
>> key_type=Datatypes.STRING(), value_type=Datatypes.STRING()))
>> def my_unnest(arr):
>> return []
>>
>> I get an error when Flink starts:
>>
>> No match found for function signature my_unnest(> VARCHAR(2147483647)) MAP ARRAY>)
>>
>> Is there something that I’m missing in my definition please?
>>
>> Many thanks,
>>
>> John
>>
>


Re: Exception: class java.sql.Timestamp cannot be cast to class java.time.LocalDateTime | pyflink 1.15.0

2022-06-13 Thread Dian Fu
Hi Mark,

Could you share an example which could reproduce this issue?

Regards,
Dian

On Thu, Jun 9, 2022 at 9:22 PM Márk Bartos  wrote:

> Hi,
>
> I'd like to ask for help regarding the java exception:
> Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot
> be cast to class java.time.LocalDateTime (java.sql.Timestamp is in module
> java.sql of loader 'platform'; java.time.LocalDateTime is in module
> java.base of loader 'bootstrap')
>
> Full backtrace:
>
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/opt/venv/lib/python3.8/site-packages/pyflink/lib/flink-dist-1.15.0.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
>
> +--++---+-+++
> | name |   type |  null | key | extras |
>  watermark |
>
> +--++---+-+++
> |   f0 |   CHAR(36) | FALSE | ||
>  |
> |   f1 | BIGINT | FALSE | ||
>  |
> |   f2 |  FLOAT |  TRUE | ||
>  |
> |   f3 |  FLOAT |  TRUE | ||
>  |
> |   f4 |  FLOAT |  TRUE | ||
>  |
> |   f5 |VARCHAR(64) |  TRUE | ||
>  |
> |   f6 | TIMESTAMP(3) *ROWTIME* | FALSE | ||
> SOURCE_WATERMARK() |
>
> +--++---+-+++
> 7 rows in set
>
> ++--+---+-++---+
> |   name |
> type |  null | key | extras | watermark |
>
> ++--+---+-++---+
> | f0 |
> CHAR(36) | FALSE | ||   |
> |   start_ts |
>   BIGINT | FALSE | ||   |
> | end_ts |
>   BIGINT | FALSE | ||   |
> | trajectory | ARRAY `z` FLOAT>> |  TRUE | ||   |
>
> ++--+---+-++---+
> 4 rows in set
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.8/threading.py", line 932, in
> _bootstrap_inner
> self.run()
>   File "/usr/local/lib/python3.8/threading.py", line 870, in run
> self._target(*self._args, **self._kwargs)
>   File
> "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 598, in 
> target=lambda: self._read_inputs(elements_iterator),
>   File
> "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 581, in _read_inputs
> for elements in elements_iterator:
>   File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426,
> in __next__
> return self._next()
>   File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826,
> in _next
> raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
> status = StatusCode.CANCELLED
> details = "Multiplexer hanging up"
> debug_error_string =
> "{"created":"@1654778704.584603399","description":"Error received from peer
> ipv4:127.0.0.1:43123","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Multiplexer
> hanging up","grpc_status":1}"
> >
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.8/threading.py", line 932, in
> _bootstrap_inner
> self.run()
>   File "/usr/local/lib/python3.8/threading.py", line 870, in run
> self._target(*self._args, **self._kwargs)
>   File
> "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 598, in 
> target=lambda: self._read_inputs(elements_iterator),
>   File
> "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 581, in _read_inputs
> for elements in elements_iterator:
>   File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426,
> in __next__
> return self._next()
>   File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826,
> in _next
> raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
> status = StatusCode.CANCELLED
> details = "Multiplexer hanging up"
> debug_error_string =
> "{"created":"@1654778704.633462921","description":"Error received from peer
> ipv4:127.0.0.1:41365","file":"src/core/lib/surface/call.cc"

Re:Flink - aggregated output with status progression

2022-06-13 Thread Xuyang
Hi, what about use "Top1 + Agg" or "UDAF" for your scene.


The main idea I think is that when the event changed from status A to C, Flink 
needs to send a `DELETE` data to downstream to delete the old data and send a 
new one to downstream again. And `TOP1` will keep the newest one with same `Id`.




Please tell me if this plan works.







--

Best!
Xuyang




At 2022-06-14 01:55:14, "Dheeraj Taneja"  wrote:

Hello, 



I have a stream of events that are coming over Kafka. Each event progresses 
through a series of statuses. I want to display aggregated output of how many 
events are in a particular status. If suppose an event has progressed from 
status A to Status C then that event needs to be only counted for the last 
status that it was in. Below is an example data and expected output. What is 
the most effective way of doing this in Flink?

Sample Data

Event1: Event(Id1, statusA, 2022-06-09T16:15:08Z)
Event2: Event(Id2, statusA, 2022-06-09T16:20:08Z)
Event3: Event(Id1, statusB, 2022-06-09T16:25:08Z)
Event4: Event(id1, statusC, 2022-06-09T16:26:08Z)
Event4: Event(id3, statusC, 2022-06-09T16:30:08Z)


Outcome

Status A - 1 (Id2)
Status B - 0 (none)
Status C - 2 (Id1 & Id3)

Re: FW: FW: Issue Facing While Using EmbeddedRocksDbCheckpointing FlinkVersion(1.15.0)

2022-06-13 Thread Shuiqiang Chen
Hi,

>From the task manager's log, We can find the following exception stack
trace, seems it was the operating system related problem with rocksDB.

2022-06-04 14:45:53,878 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - KEYED
> PROCESS, Map -> Sink: Print to Std. Out (1/1)
> (3a295a887361f4f87d2eaf79e901d056) switched from INITIALIZING to FAILED on
> 2f04093f-a18f-4c2c-b02e-6d1e6ece677e @ kubernetes.docker.internal
> (dataPort=-1).
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202]
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> PythonKeyedProcessOperator_ce1331762e4644ef89dbdbc15321f049_(1/1) from any
> of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> ... 11 more
> Caused by: java.io.IOException: Could not load the native RocksDB library
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:882)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:402)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:93)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
> ~[flink-dist_2.11-1.14.4.jar:1.14.4]
> ... 11 more
> Caused by: java.lang.UnsatisfiedLinkError:
> C:\Users\Admin\AppData\Local\Temp\rocksdb-lib-cfe23cb49ce486a76d1126db2517e679\librocksdbjni-win64.dll:
> A dynamic link library (DLL) initialization routine failed
> at java.lang.ClassLoader$NativeLibrary.load(Native Method) ~[?:1.8.0_202]
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) ~[?:1.8.0_202]
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) ~[?:1.8.0_202]
> at java.lang.Runtime.load0(Runtime.java:809) ~[?:1.8.0_202]
> at java.lang.System.load(System.java:1086) ~[?:1.8.0_202]
> at
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJa

Apache Flink - Reading data from Scylla DB

2022-06-13 Thread Himanshu Sareen
Team,

I'm looking for a solution to Consume/Read data from Scylla DB into Apache 
Flink.

If anyone can guide me or share pointers it will be helpful.

Regards,
Himanshu


Flink operator deletes the FlinkDeplyoment after a while

2022-06-13 Thread Sigalit Eliazov
after few hours of running job manager and task manager generated using the
operator
i get the following message in the operator log
There really wasn't any traffic and the flink deployment is being delete

=== Finished metrics report
===
Deleting FlinkDeployment
2022-06-14 03:09:51,847 i.j.o.p.e.ReconciliationDispatcher
[ERROR][ns/job-namel] Error during event processing ExecutionScope{
resource id: CustomResourceID{name='job-name', namespace='ns'}, version:
53138} failed.
java.lang.RuntimeException: Cannot create observe config before first
deployment, this indicates a bug.
at
org.apache.flink.kubernetes.operator.config.FlinkConfigManager.getObserveConfig(FlinkConfigManager.java:137)
at
org.apache.flink.kubernetes.operator.service.FlinkService.cancelJob(FlinkService.java:357)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.shutdown(ApplicationReconciler.java:327)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:56)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:37)
at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:107)
at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:59)
at
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:68)
at
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:50)
at
io.javaoperatorsdk.operator.api.monitoring.Metrics.timeControllerExecution(Metrics.java:34)

i'm not sure I understand this behviour.
Thanks
Sigalit


Re: Apache Flink - Reading data from Scylla DB

2022-06-13 Thread yuxia
Seems you may need implement a custom connector for Scylla DB as I haven't 
found a connector on hand. 
Hope the doc[1][2] can help you implement your own connector. 
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
 
[2] https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html 

Best regards, 
Yuxia 


发件人: "Himanshu Sareen"  
收件人: "User"  
发送时间: 星期二, 2022年 6 月 14日 上午 11:29:38 
主题: Apache Flink - Reading data from Scylla DB 

Team, 

I'm looking for a solution to Consume/Read data from Scylla DB into Apache 
Flink. 

If anyone can guide me or share pointers it will be helpful. 

Regards, 
Himanshu