Re: SQL Client Kafka (UPSERT?) Sink for confluent-avro

2022-03-28 Thread Ingo Bürk

Hi Georg,

which Flink version are you using? The missing property is for the 
avro-confluent format, and if I recall correctly, how these are passed 
has changed in recent versions, so it'd be good to double check you are 
using the documentation for the version you are running on.



Best
Ingo

On 24.03.22 11:57, Georg Heiler wrote:

Hi,

how can I get Flinks SQL client to nicely sink some data to either the 
regular kafka or the kafka-upsert connector?


I have a table/ topic with dummy data:
CREATE TABLE metrics_brand_stream (
     `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
     WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
   `partition` BIGINT METADATA VIRTUAL,
   `offset` BIGINT METADATA VIRTUAL,
     brand string,
     duration int,
     rating int

) WITH (
     'connector' = 'kafka',
     'topic' = 'commercials_avro',
     'scan.startup.mode' = 'earliest-offset',
     'format' = 'avro-confluent',
     'avro-confluent.schema-registry.url' = 'http://localhost:8081/ 
',

     'properties.group.id ' = 'flink-test-001',
     'properties.bootstrap.servers' = 'localhost:9092'
);

And the following aggregation:

SELECT brand,
          COUNT(*) AS cnt,
          AVG(duration) AS  duration_mean,
          AVG(rating) AS rating_mean
   FROM metrics_brand_stream
   GROUP BY brand;

When trying to define an output table:

CREATE TABLE metrics_per_brand (
     brand string,
     cnt BIGINT,
     duration_mean DOUBLE,
     rating_mean DOUBLE

) WITH (
     'connector' = 'upsert-kafka',
     'topic' = 'metrics_per_brand',
     'avro-confluent.schema-registry.url' = 'http://localhost:8081/ 
',

     'properties.group.id ' = 'flink-test-001',
     'properties.bootstrap.servers' = 'localhost:9092',
     'key.format' = 'avro-confluent',
     'value.format' = 'avro-confluent'
);

And trying to INSERT some result data:

INSERT INTO metrics_per_brand
   SELECT brand,
          COUNT(*) AS cnt,
          AVG(duration) AS  duration_mean,
          AVG(rating) AS rating_mean
   FROM metrics_brand_stream
   GROUP BY brand;

The query fails with:

org.apache.flink.table.api.ValidationException: One or more required 
options are missing.


Missing required options are:

url

But neither: 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/ 
 
nor 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/ 
 
nor 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ 
 
seems to list the right configuration (or I am misreading the 
documentation).



How can I sink data to kafka after some arbitrary computation using the 
flink-sql client using either the kafka or upsert-kafka connector where 
the input is AVRO with a schema from the confluent schema registry and 
the output should store its schema there as well (and serialize using AVRO).



Best,
Georg


Re: Flink SQL AVG with mandatory type casting

2022-03-28 Thread Ingo Bürk

Hi Georg,

the Flink implementations seem to be based off of SQL Server[1], which 
has similar (though better documented) behavior for integer-like data types.


[1] 
https://docs.microsoft.com/en-us/sql/t-sql/functions/avg-transact-sql?view=sql-server-ver15



Best
Ingo

On 24.03.22 16:08, Georg Heiler wrote:

Hi,

I observe strange behavior in Flink SQL:
For an input stream:

CREATE TABLE input_stream (
     duration int,
     rating int

) WITH (
     'connector' = 'kafka',
     'topic' = 't',
     'scan.startup.mode' = 'earliest-offset',
     'format' = 'avro-confluent',
     'avro-confluent.schema-registry.url' = 'http://localhost:8081/ 
',

     'properties.group.id ' = 'flink-test-001',
     'properties.bootstrap.servers' = 'localhost:9092'
);

The following SQL:
SELECT AVG(duration) AS  duration_mean, AVG(CAST(rating AS DOUBLE)) AS 
rating_mean FROM input_stream;


returns:

duration_mean                    rating_mean
             45              2.503373819163293

I.e. duration_mean is truncated to an INT!

Any other database system I know by default outputs a DOUBLE type for 
any input (including INT) and does not truncate it.


Why does Flink decide to truncate here? Why is a manual type cast necessary?

Best,
Georg


Re: Question around creating partitioned tables when running Flink from Ververica

2022-02-04 Thread Ingo Bürk

Hi Natu,

yes, in the docs we refer to it as custom catalogs, I apologize for the 
confusion. Custom catalogs can be added through the UI similar to custom 
connectors / formats.



Best
Ingo

On 04.02.22 08:45, Natu Lauchande wrote:

Hey Ingo,

Thanks for the quick response. I will bother you a bit more : ).

We have never used external catalogs do you perhaps have a link that we 
can look at ?


The only reference that i see online is for custom catalogs is this the 
same as external catalogs: 
https://docs.ververica.com/user_guide/sql_development/catalogs.html#custom-catalogs 
<https://docs.ververica.com/user_guide/sql_development/catalogs.html#custom-catalogs>



Best,
Natu

On Fri, Feb 4, 2022 at 8:59 AM Ingo Bürk <mailto:airbla...@apache.org>> wrote:


Hi Natu,

the functionality hasn't been actively blocked, it just hasn't yet been
implemented in the Ververica Platform Built-In Catalog. Using any
external catalog which supports partitioning will work fine.

I'll make a note internally for your request on this, though I cannot
make any statements about timelines.


Best
Ingo

On 04.02.22 07:25, Natu Lauchande wrote:
 > Good day,
 >
 > Although flink sql allows us to create partitioned tables, we are
unable
 > to do so on vvp at the moment because of the below error:
 > Cause: Partitioned tables are not supported yet.
 > Can we understand the why the functionality was blocked or when will
 > partitioned tables be supported on vvp?
 >
 > Thanks,
 > Natu



Re: Question around creating partitioned tables when running Flink from Ververica

2022-02-03 Thread Ingo Bürk

Hi Natu,

the functionality hasn't been actively blocked, it just hasn't yet been 
implemented in the Ververica Platform Built-In Catalog. Using any 
external catalog which supports partitioning will work fine.


I'll make a note internally for your request on this, though I cannot 
make any statements about timelines.



Best
Ingo

On 04.02.22 07:25, Natu Lauchande wrote:

Good day,

Although flink sql allows us to create partitioned tables, we are unable 
to do so on vvp at the moment because of the below error:

Cause: Partitioned tables are not supported yet.
Can we understand the why the functionality was blocked or when will 
partitioned tables be supported on vvp?


Thanks,
Natu


Re: Upgrade to 1.14.3

2022-01-25 Thread Ingo Bürk

Hi Sweta,

there was a non-compatible change to SourceReaderContext#metricGroup in 
the 1.14.x release line; I assume this is what you are seeing.


Did you make sure to update the connector (and any other) dependencies 
as well?



Best
Ingo

On 25.01.22 05:36, Sweta Kalakuntla wrote:

Hi,

We are on flink 1.13.3 and trying to upgrade  the cluster to 1.14.3 
version. I see that job(on 1.13.3) is unable to start up because it says 
it couldn't find metrics group(inside flinkkafkaconsumer class).


- can I deploy 1.13.3 job on 1.14.3 cluster?
- can I deploy 1.14.3 job on 1.13.3 cluster?
- How do I upgrade to 1.14.3 cluster without loosing running apps state? 
I have even tried doing savepoint that did not revive the job.


Thank you,
Sweta


Re: Show plan in UI not working.

2022-01-24 Thread Ingo Bürk

Hi John,

can you please submit this as an issue in JIRA? If you suspect it is 
related to other issues, just make a note of that in the issue as well. 
Thanks!



Ingo

On 23.01.22 18:05, John Smith wrote:
Just I'm case but in 1.14.x regardless of the job manager is leader or 
not. Before submitting a job of you click on "Show Plan" it just shows a 
blank window.


I'm assuming it's similar issue as the deserialozation ones.


Re: Unsubscribe

2021-12-17 Thread Ingo Bürk

Hi Gehad,

please send an email to user-unsubscr...@flink.apache.org in order to 
unsubscribe from the mailing list.



Ingo

On 17.12.21 14:40, Gehad Elrobey wrote:

Unsubscribe

On Thu, Nov 11, 2021 at 9:57 PM Uday Garikipati > wrote:


Unsubscribe 



Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-12-08 Thread Ingo Bürk
Hi Natu,

Something you could try is removing the packaged parquet format and
defining a custom format[1]. For this custom format you can then fix the
dependencies by packaging all of the following into the format:

* flink-sql-parquet
* flink-shaded-hadoop-2-uber
* hadoop-aws
* aws-java-sdk-bundle
* guava

This isn't entirely straight-forward, unfortunately, and I haven't verified
it. However, with Ververica Platform 2.6, to be released shortly after
Flink 1.15, it should also work again.

[1]
https://docs.ververica.com/user_guide/sql_development/connectors.html#custom-connectors-and-formats


Best
Ingo

On Tue, Dec 7, 2021 at 6:23 AM Natu Lauchande  wrote:

> Hey Timo and Flink community,
>
> I wonder if there is a fix for this issue. The last time I rollbacked to
> version 12 of Flink and downgraded Ververica.
>
> I am really keen to leverage the new features on the latest versions of
> Ververica 2.5+ , i have tried a myriad of tricks suggested ( example :
> building the image with hadoop-client libraries) :
>
> java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
> at java.lang.Class.getDeclaredConstructors(Class.java:2020)
> at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass
> .java:1961)
> at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass
> .java:272)
> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
> 2003)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850
> )
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2160)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2405)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2329)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2187)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:615)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:600)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:587)
> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:541)
> at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:322)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
> OperatorChain.java:159)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(
> StreamTask.java:551)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .runWithCleanUpOnFail(StreamTask.java:650)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
> StreamTask.java:540)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at 

Re: Views support in PostgresCatalog

2021-11-26 Thread Ingo Bürk
Hi Flavio,

that error message refers to Flink tables and Flink views, not Postgres
views: within the catalog, no Flink table or Flink view of that name
exists. Note that even Postgres views would have to be represented as Flink
tables in Flink.

In order to change the message to (Postgres) "views not being supported",
the catalog would have to make another check against the database to see if
a (Postgres) view for it exists, which seems a bit odd to do, IMO. Catalogs
can list the tables within them, so if you first list the tables you'd see
that your (Postgres) view is not contained in the catalog.

I don't think there's a reason why the catalog couldn't in theory support
representing Postgres views as Flink tables. The JDBC sink connector of
course cannot work for those, but the source connector could (I don't know
if it currently does; if not that'd be a good reason for the current
behavior of the catalog).
But yes, your assumption that the catalog doesn't support Postgres views
currently is correct, see [1].

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java#L168


Ingo

On Fri, Nov 26, 2021, 15:02 Flavio Pompermaier  wrote:

> Hi to all,
> I was trying to use a view of my Postgres database through the
> PostgresCatalog but at the moment it seems that the current implementation
> ignores views. Probably this is caused by the fact that there's no way to
> avoid INSERT statements in Flink.
> However, the thrown error is somehow misleading because the
> PostgresCatalog.getTable() throws a TableNotExistException whose errore
> message is
>
> private static final String MSG = "Table (or view) %s does not exist in
> Catalog %s.";
>
> Maybe the thrown exception could be more helpful and could report that
> views are not supported by this catalog implementation. What do you think?
>
> Best,
> Flavio
>


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-04 Thread Ingo Bürk
Hi Krzysztof,

the new, unified Source interface can only work as a scan source. Could you
maybe elaborate a bit on the connector implementation you have and how you
intend to have it work as a lookup source?


Best
Ingo

On Thu, Nov 4, 2021 at 4:11 PM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Thanks Fabian and Ingo,
> yes I forgot to add the refrence links, so here they are:
>
> [1] https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html
> [2] https://flink.apache.org/2021/09/07/connector-table-sql-api-part2
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/sources/
>
> In my case I would really need a LookupTableSource and not ScanTableSource
> since by use-case and source will get data for given parameters and I don't
> need to scan the entire resource.
>
> Cheers,
>
> czw., 4 lis 2021 o 15:48 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
>
>> Hi,
>> I was wondering if it is possible to implement a Source Table connector
>> like it is described in [1][2] with custom source that implements a new
>> Source interface [3] and not a SourceFunction.
>>
>> I already have my custom source but when I'm trying to implement a Table
>> Source from LookupTableSource or ScanTableSource like it is presented in
>> [1][2] It seems I need to have a SourceFunction object to To be able to use
>> ScanRuntimeProvider or LookupRuntimeProvider.
>>
>> In other words how can I use Source interface implementation in
>> TableSource?
>>
>> Regards,
>> Krzysztof Chmielewski
>>
>


Re: Implementing a Custom Source Connector for Table API and SQL

2021-11-04 Thread Ingo Bürk
Hi Krzystof,

instead of SourceFunctionProvider you need to use SourceProvider. If you
look at the filesystem connector you can see an example for that, too
(FileSystemTableSource#createSourceProvider).


Best
Ingo

On Thu, Nov 4, 2021 at 3:48 PM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi,
> I was wondering if it is possible to implement a Source Table connector
> like it is described in [1][2] with custom source that implements a new
> Source interface [3] and not a SourceFunction.
>
> I already have my custom source but when I'm trying to implement a Table
> Source from LookupTableSource or ScanTableSource like it is presented in
> [1][2] It seems I need to have a SourceFunction object to To be able to use
> ScanRuntimeProvider or LookupRuntimeProvider.
>
> In other words how can I use Source interface implementation in
> TableSource?
>
> Regards,
> Krzysztof Chmielewski
>


Re: [DISCUSS] Creating an external connector repository

2021-10-15 Thread Ingo Bürk
Hi Arvid,

In general I think breaking up the big repo would be a good move with many
benefits (which you have outlined already). One concern would be how to
proceed with our docs / examples if we were to really separate out all
connectors.

1. More real-life examples would essentially now depend on external
projects. Particularly if hosted outside the ASF, this would feel somewhat
odd. Or to put it differently, if flink-connector-foo is not part of Flink
itself, should the Flink Docs use it for any examples?
2. Generation of documentation (config options) wouldn't be possible unless
the docs depend on these external projects, which would create weird
version dependency cycles (Flink 1.X's docs depend on flink-connector-foo
1.X which depends on Flink 1.X).
3. Documentation would inevitably be much less consistent when split across
many repositories.

As for your approaches, how would (A) allow hosting personal / company
projects if only Flink committers can write to it?

> Connectors may receive some sort of quality seal

This sounds like a lot of work and process, and could easily become a
source of frustration.


Best
Ingo

On Fri, Oct 15, 2021 at 2:47 PM Arvid Heise  wrote:

> Dear community,
>
> Today I would like to kickstart a series of discussions around creating an
> external connector repository. The main idea is to decouple the release
> cycle of Flink with the release cycles of the connectors. This is a common
> approach in other big data analytics projects and seems to scale better
> than the current approach. In particular, it will yield the following
> changes.
>
>
>-
>
>Faster releases of connectors: New features can be added more quickly,
>bugs can be fixed immediately, and we can have faster security patches in
>case of direct or indirect (through dependencies) security flaws.
>-
>
>New features can be added to old Flink versions: If the connector API
>didn’t change, the same connector jar may be used with different Flink
>versions. Thus, new features can also immediately be used with older Flink
>versions. A compatibility matrix on each connector page will help users to
>find suitable connector versions for their Flink versions.
>-
>
>More activity and contributions around connectors: If we ease the
>contribution and development process around connectors, we will see faster
>development and also more connectors. Since that heavily depends on the
>chosen approach discussed below, more details will be shown there.
>-
>
>An overhaul of the connector page: In the future, all known connectors
>will be shown on the same page in a similar layout independent of where
>they reside. They could be hosted on external project pages (e.g., Iceberg
>and Hudi), on some company page, or may stay within the main Flink reposi
>tory. Connectors may receive some sort of quality seal such that users
>can quickly access the production-readiness and we could also add which
>community/company promises which kind of support.
>-
>
>If we take out (some) connectors out of Flink, Flink CI will be faster
>and Flink devs will experience less build stabilities (which mostly come
>from connectors). That would also speed up Flink development.
>
>
> Now I’d first like to collect your viewpoints on the ideal state. Let’s
> first recap which approaches, we currently have:
>
>
>-
>
>We have half of the connectors in the main Flink repository.
>Relatively few of them have received updates in the past couple of months.
>-
>
>Another large chunk of connectors are in Apache Bahir. It recently has
>seen the first release in 3 years.
>-
>
>There are a few other (Apache) projects that maintain a Flink
>connector, such as Apache Iceberg, Apache Hudi, and Pravega.
>-
>
>A few connectors are listed on company-related repositories, such as
>Apache Pulsar on StreamNative and CDC connectors on Ververica.
>
>
> My personal observation is that having a repository per connector seems to
> increase the activity on a connector as it’s easier to maintain. For
> example, in Apache Bahir all connectors are built against the same Flink
> version, which may not be desirable when certain APIs change; for example,
> SinkFunction will be eventually deprecated and removed but new Sink
> interface may gain more features.
>
> Now, I'd like to outline different approaches. All approaches will allow
> you to host your connector on any kind of personal, project, or company
> repository. We still want to provide a default place where users can
> contribute their connectors and hopefully grow a community around it. The
> approaches are:
>
>
>1.
>
>Create a mono-repo under the Apache umbrella where all connectors will
>reside, for example, github.com/apache/flink-connectors. That
>repository needs to follow its rules: No GitHub issues, no Dependabot or
>similar tools, and a strict 

Re: Flame Graph not showing up in UI

2021-10-14 Thread Ingo Bürk
Hi Shilpa,

what browser are you using? Are there any errors in the browser's developer
console?


Ingo

On Thu, Oct 14, 2021 at 3:17 PM Shilpa Shankar 
wrote:

> We enabled flame graphs to troubleshoot an issue with our job by adding   
> rest.flamegraph.enabled:
> true  to flink.conf . The UI does not display anything when we select an
> operator and go to FlameGraph. Is there something else that needs to be
> enabled on the flink cluster?
>
> [image: image.png]
>
> Thanks,
> Shilpa
>


Re:

2021-09-28 Thread Ingo Bürk
Hi Violeta,

in order to unsubscribe emails from Flink user mail list, send an email to
user-unsubscr...@flink.apache.org. For more information, please have a look
at https://flink.apache.org/community.html#mailing-lists.


Best

Ingo

On Tue, Sep 28, 2021 at 10:50 AM Violeta Milanović <
violeta.milanovi...@gmail.com> wrote:

> unsubscribe
>


Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-26 Thread Ingo Bürk
Hi Thomas,

I haven't encountered that before, sorry. I assume you're still using Flink
1.12? The PR I linked to updated the AWS dependencies to the minimum
required versions to use this feature, so I'm not sure just setting the
credentials provider alone would be sufficient. The PR was currently only
merged for the upcoming 1.14 release, but perhaps you could try the current
release candidate to see if it works with that? If that works we could also
think about backporting this change, we just initially didn't do that since
upgrading those dependencies has a certain operational risk and we want to
wait for user feedback first.


Best
Ingo

On Sun, Sep 26, 2021 at 8:12 AM Thomas Wang  wrote:

> Ingo,
>
> I dig into the Flink code a little bit. It looks like the key for
> specifying the roleArn and roleSessionName are
> fs.s3a.aws.credentials.provider:
> com.amazonaws.auth.WebIdentityTokenCredentialsProvider
> fs.s3a.aws.credentials.provider.role.arn: arn:aws:iam::...:role/...
> fs.s3a.aws.credentials.provider.role.sessionName: ...
>
> However, for some reason, I'm still getting the same error. Please help!
> Thanks.
>
> Thomas
>
>
> On Sat, Sep 25, 2021 at 9:36 PM Thomas Wang  wrote:
>
>> Ingo,
>>
>> It looks like I'm now seeing "Caused by: java.lang.NullPointerException:
>> You must specify a value for roleArn and roleSessionName". I assume I would
>> also need to specify that through the configuration file. Could you suggest
>> the key for this configuration? Thanks.
>>
>> Thomas
>>
>> On Sat, Sep 25, 2021 at 7:25 PM Thomas Wang  wrote:
>>
>>> Thanks Ingo. Adding the following setting worked.
>>>
>>> fs.s3a.aws.credentials.provider:
>>> com.amazonaws.auth.WebIdentityTokenCredentialsProvider
>>>
>>> Thomas
>>>
>>> On Sat, Sep 25, 2021 at 1:12 PM Ingo Bürk  wrote:
>>>
>>>> Hi Thomas,
>>>>
>>>> I think you might be looking for this:
>>>> https://github.com/apache/flink/pull/16717
>>>>
>>>>
>>>> Best
>>>> Ingo
>>>>
>>>> On Sat, Sep 25, 2021, 20:46 Thomas Wang  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm using the official docker image:
>>>>> apache/flink:1.12.1-scala_2.11-java11
>>>>>
>>>>> I'm trying to run a Flink job on an EKS cluster. The job is running
>>>>> under a k8s service account that is tied to an IAM role. If I'm not using
>>>>> s3 as RocksDB checkpoint backend, everything works just fine. However, 
>>>>> when
>>>>> I enabled s3 as RocksDB checkpoint backend, I got permission denied.
>>>>>
>>>>> The IAM role tied to the service account has the appropriate
>>>>> permissions to s3. However the underlying role tied to the EKS node
>>>>> doesn't. After debugging with AWS support, it looks like the request to s3
>>>>> was made under the EKS node role, not the role tied to the service 
>>>>> account.
>>>>> Thus the permission denial.
>>>>>
>>>>> With the same Flink application, I'm also making requests to AWS
>>>>> Secrets Manager to get some sensitive information and those requests were
>>>>> made explicitly with AWS Java SDK 2.x bundled in the same application Jar
>>>>> file. Those requests were made correctly with the IAM role tied to the
>>>>> service account.
>>>>>
>>>>> Based on the info above, I suspect Flink may be using an older version
>>>>> of the AWS SDK that doesn't support assuming an IAM role via an IODC web
>>>>> identity token file. Please see AWS doc here:
>>>>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>>>>>
>>>>> Could someone help me confirm this bug and maybe have it fixed some
>>>>> time? Thanks.
>>>>>
>>>>> Thomas
>>>>>
>>>>


Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-25 Thread Ingo Bürk
Hi Thomas,

I think you might be looking for this:
https://github.com/apache/flink/pull/16717


Best
Ingo

On Sat, Sep 25, 2021, 20:46 Thomas Wang  wrote:

> Hi,
>
> I'm using the official docker image: apache/flink:1.12.1-scala_2.11-java11
>
> I'm trying to run a Flink job on an EKS cluster. The job is running under
> a k8s service account that is tied to an IAM role. If I'm not using s3 as
> RocksDB checkpoint backend, everything works just fine. However, when I
> enabled s3 as RocksDB checkpoint backend, I got permission denied.
>
> The IAM role tied to the service account has the appropriate permissions
> to s3. However the underlying role tied to the EKS node doesn't. After
> debugging with AWS support, it looks like the request to s3 was made under
> the EKS node role, not the role tied to the service account. Thus the
> permission denial.
>
> With the same Flink application, I'm also making requests to AWS Secrets
> Manager to get some sensitive information and those requests were made
> explicitly with AWS Java SDK 2.x bundled in the same application Jar file.
> Those requests were made correctly with the IAM role tied to the service
> account.
>
> Based on the info above, I suspect Flink may be using an older version of
> the AWS SDK that doesn't support assuming an IAM role via an IODC web
> identity token file. Please see AWS doc here:
> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>
> Could someone help me confirm this bug and maybe have it fixed some time?
> Thanks.
>
> Thomas
>


Re: Creating a generic ARRAY_AGG aggregate function for Flink SQL

2021-08-24 Thread Ingo Bürk
Hi,

just FYI, we do already have issues in JIRA for this:
* https://issues.apache.org/jira/browse/FLINK-21949
* https://issues.apache.org/jira/browse/FLINK-22484


Best
Ingo

On Tue, Aug 24, 2021 at 8:23 AM Caizhi Weng  wrote:

> Hi!
>
> As far as I know, returning an array from the getValue method containing
> external data format is OK. Flink will do the conversion for you.
>
> Are you faced with any exception when using this array_agg? If yes what's
> the exception stack?
>
> You can also open a JIRA ticket to require a built-in support for
> array_agg, as this function exists in many data ware houses.
>
> Yuval Itzchakov  于2021年8月23日周一 下午7:38写道:
>
>> Hi,
>>
>> I'm trying to implement a generic ARRAY_AGG UDF function (identical to
>> the one that exists in many data WHs, e.g
>> https://docs.snowflake.com/en/sql-reference/functions/array_agg.html) to
>> utilize in Flink SQL.
>>
>> Taking reference from CollectAggFunction
>> ,
>> I tried using ArrayData to generate a GenericArrayData as an output type.
>> The problem with is I need a way to convert from the external format being
>> used in the UDF (e.g String, Integer) to the internal representation
>> required by Flink (i.e. StringData). I haven't found a straight way of
>> going about that.
>>
>> Here is a gist of the implementation
>> 
>> .
>> Would appreciate any help on how to tackle this.
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>


Re: Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Ingo Bürk
Hi Yuval,

I can expand a bit more on the technical side of validation, though as a
heads-up, I don't have a solution.

When validating entire pipelines on a logical level, you run into the
(maybe obvious) issue, that statements depend on previous statements. In
the simple case of a CREATE TABLE DDL followed by some query, ("full")
validation of the query depends on the table actually existing. On the
other hand, validating a CREATE TABLE DDL shouldn't actually execute that
DDL, creating a conflict.

Of course this is only a concern if during validation we care about the
table existing, but from the perspective of syntax this wouldn't matter.
However, Flink's parser (ParserImpl) under the hood calls
SqlToOperationConverter, which in some places does table lookups etc., so
it depends on the catalog manager. This prevents us from doing this kind of
validation. Ideally, SqlToOperationConverter would not have such a
dependency, but it takes some work to change that as operations would have
to be redesigned and "evaluated" later on.

I think, as of now, you'd have to actually use the CalciteParser directly
to bypass this call, but of course this is not accessible
(non-reflectively). I've also never tried this, so I don't know whether it
would actually work. It'd definitely be missing the ability to parse
anything handled in Flink's "extended parser" right now, but that is mostly
concerning SQL-client-specific syntax.


Best
Ingo

On Wed, Aug 18, 2021 at 2:41 PM Yuval Itzchakov  wrote:

> Thanks Ingo!
> I just discovered this a short while before you posted :)
>
> Ideally, I'd like to validate that the entire pipeline is set up
> correctly. The problem is that I can't use methods like `tableEnv.sqlQuery`
> from multiple threads, and this is really limiting my ability to speed up
> the process (today it takes over an hour to complete, which isn't
> reasonable).
>
> If anyone has any suggestions on how I can still leverage the
> TableEnvironment in the processor to validate my SQL queries I'd be happy
> to know.
>
> On Wed, Aug 18, 2021 at 2:37 PM Ingo Bürk  wrote:
>
>> Hi Yuval,
>>
>> if syntactical correctness is all you care about, parsing the SQL should
>> suffice. You can get a hold of the parser from
>> TableEnvironmentImpl#getParser and then run #parse. This will require you
>> to cast your table environment to the (internal) implementation, but maybe
>> this works for you?
>>
>>
>> Best
>> Ingo
>>
>> On Wed, Aug 18, 2021 at 12:51 PM Yuval Itzchakov 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a use-case where I need to validate hundreds of Flink SQL
>>> queries. Ideally, I'd like to run these validations in parallel. But, given
>>> that there's an issue with Calcite and the use of thread-local storage, I
>>> can only interact with the table runtime via a single thread.
>>>
>>> Ideally, I don't really care about the overall registration process of
>>> sources, transformations and sinks, I just want to make sure the syntax is
>>> correct from Flinks perspective.
>>>
>>> Is there any straightforward way of doing this?
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Ingo Bürk
Hi Yuval,

if syntactical correctness is all you care about, parsing the SQL should
suffice. You can get a hold of the parser from
TableEnvironmentImpl#getParser and then run #parse. This will require you
to cast your table environment to the (internal) implementation, but maybe
this works for you?


Best
Ingo

On Wed, Aug 18, 2021 at 12:51 PM Yuval Itzchakov  wrote:

> Hi,
>
> I have a use-case where I need to validate hundreds of Flink SQL queries.
> Ideally, I'd like to run these validations in parallel. But, given that
> there's an issue with Calcite and the use of thread-local storage, I can
> only interact with the table runtime via a single thread.
>
> Ideally, I don't really care about the overall registration process of
> sources, transformations and sinks, I just want to make sure the syntax is
> correct from Flinks perspective.
>
> Is there any straightforward way of doing this?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Allowed lateness in Flink SQL

2021-08-10 Thread Ingo Bürk
Hi Maciej,

there is no documentation for it (besides in the code itself) because it's
an experimental flag. What would you expect allow-lateness to do outside
the context of a window? Maybe you'd also be interested in
CURRENT_WATERMARK()[1][2] which will be released with 1.14 and allows some
level of late data handling.

[1] https://issues.apache.org/jira/browse/FLINK-22737
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#temporal-functions


Best
Ingo

On Tue, Aug 10, 2021 at 3:21 PM Maciej Bryński  wrote:

> Hi Guys,
> I was checking if anything changed recently with allowed lateness
> support in Flink SQL and I found this PR:
> https://github.com/apache/flink/pull/16022
>
> Is there any documentation for table.exec.emit.allow-lateness ?
> Is this option working only in window agregation?
>
> Regards,
> --
> Maciek Bryński
>


Re: Support for authenticated schema registry in debezium registry

2021-08-06 Thread Ingo Bürk
Hi Joe,

there was a follow-up issue, FLINK-23450, which was only fixed for 1.13.3
(not yet released) which I think is what you're seeing?


Best
Ingo

On Fri, Aug 6, 2021, 21:17 Joseph Lorenzini  wrote:

> Hi all,
>
>
>
> I am on flink 1.13.2. I set up create table like so:
>
>
>
> CREATE TABLE lead_buffer (
>
>   `id` INT NOT NULL,
>
>   `state` STRING NOT NULL,
>
>PRIMARY KEY (`id`) NOT ENFORCED
>
> ) WITH (
>
>   'connector'= 'kafka',
>
>   'topic' = 'buffer',
>
>   'format'= 'debezium-avro-confluent',
>
>   'debezium-avro-confluent.schema-registry.url'= 'http://localhost:8081',
>
>   'scan.startup.mode'= 'earliest-offset',
>
>   'properties.group.id' = 'customers',
>
>   'debezium-avro-confluent.basic-auth.user-info' =
> 'sr-user:sr-user-password',
>
>   'properties.bootstrap.servers'= 'localhost:9092',
>
>   'value.fields-include'= 'EXCEPT_KEY',
>
>   'properties.ssl.endpoint.identification.algorithm'= 'http',
>
>   'properties.security.protocol'= 'SASL_PLAINTEXT',
>
>   'properties.sasl.mechanism'= 'PLAIN',
>
>   'properties.sasl.jaas.config' =
> 'org.apache.kafka.common.security.plain.PlainLoginModule required
> username="kafka" password="password";')
>
>
>
>
>
> I am looking at the docs here:
>
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/#debezium-avro-confluent-basic-auth-user-info
>
>
>
> According the properties table, there is a property for setting auth to a
> schema registry: debezium-avro-confluent.basic-auth.user-info. However,
> when I set this in the DDL for creating the table I get this error:
>
>
>
>
>
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported
> options found for 'kafka'.
>
>
>
> Unsupported options:
>
>
>
> debezium-avro-confluent.basic-auth.user-info
>
>
>
>
>
> I found “FLINK-22858: avro-confluent doesn't support confluent schema
> registry that has security enabled”. However that ticket was closed as a
> duplicate of FLINK-21229, which been resolved and fixed in 1.13.2.
>
>
>
> Does anyone know if this has been in fact fixed or whether this is user
> error on my part?
>
>
>
> Thanks,
>
> Joe
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Re: How to use 'CREATE FUNCTION' statements to create parameterize functions?

2021-08-03 Thread Ingo Bürk
Hi,

SQL only offers DDL to register function classes, not instances. Such
functions must have a default constructor. You can create a class extending
your function and calling the super constructor with the respective
arguments and then use that class through the DDL.


Best
Ingo

On Tue, Aug 3, 2021, 19:02 1095193...@qq.com <1095193...@qq.com> wrote:

> Hi community,
>
> For parameterize function, like
>
> *public* *static* *class* *SubstringFunction* *extends* ScalarFunction *{*
>
>
>
>   *private* *boolean* endInclusive*;*
>
>
>
>   *public* *SubstringFunction**(**boolean* endInclusive*)* *{*
>
> *this.*endInclusive *=* endInclusive*;*
>
>   *}*
>
>
>
>   *public* String *eval**(*String s*,* Integer begin*,* Integer end*)* *{*
>
> *return* s*.*substring*(*begin*,* endInclusive *?* end *+* 1 *:* end
> *);*
>
>   *}*
>
> *}*
>
> we can register this function by pass function instance instead of
> function classes.
>
> *env**.**createTemporarySystemFunction**(**"SubstringFunction"**,* *new*
> *SubstringFunction**(true));*
>
> How to register or create this parameterize  function with 'CREATE
> FUNTION' statements.
>
> With standard 'CREATE FUNCTION' statement in Flink doc.[1]
>
> *CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION *
>
> *  [IF NOT EXISTS] [catalog_name.][db_name.]function_name *
>
> *  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]*
>
> We can only pass function_name in CREATE FUNCTION statement, is there way
> to pass function_instance ( parameterize function ) in this statement.
>
>
> *[1]*
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-function
>
>
>
> --
> 1095193...@qq.com
>


Re: Unable to use custom AWS credentials provider - 1.9.2

2021-07-30 Thread Ingo Bürk
Hi Andreas,

Such an exception can occur if the class in question (your provider) and
the one being checked (AWSCredentialsProvider) were loaded from
different class loaders.

Any chance you can try once with 1.10+ to see if it would work? It does
look like a Flink issue to me, but I'm not sure this can be worked
around in 1.9.

[Initially sent to Andreas directly by accident]


Best
Ingo

On 29.07.21 17:37, Hailu, Andreas [Engineering] wrote:
> Hi team, I’m trying to read and write from and to S3 using a custom AWS
> Credential Provider using Flink v1.9.2 on YARN.
> 
>  
> 
> I followed the instructions to create a plugins directory in our Flink
> distribution location and copy the FS implementation (I’m using
> s3-fs-hadoop) package into it. I have also placed the package that
> contains our custom CredentialsProvider implementation in that same
> directory as well.
> 
>  
> 
> $ ls /flink-1.9.2/plugins/s3-fs-hadoop/
> 
> total 20664
> 
> 14469 Jun 17 10:57 aws-hadoop-utils-0.0.9.jar ßcontains our custom
> CredentialsProvider class
> 
> 21141329 Jul 28 15:43 flink-s3-fs-hadoop-1.9.2.jar
> 
>  
> 
> I’ve placed this directory in the java classpath when running the Flink
> application. I have added the ‘fs.s3a.assumed.role.credentials.provider’
> and ‘fs.s3a.assumed.role.arn’ to our flink-conf.yaml as well. When
> trying to run a basic app that reads a file, I get the following exception:
> 
>  
> 
> Caused by: java.io.IOException: Class class
> com.gs.ep.da.lake.aws.CustomAwsCredentialProvider does not implement
> AWSCredentialsProvider
> 
>     at
> org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:400)
> 
>     at
> org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:367)
> 
>     at
> org.apache.hadoop.fs.s3a.S3ClientFactory$DefaultS3ClientFactory.createS3Client(S3ClientFactory.java:73)
> 
>  
> 
> Have I missed a step here? Do I need to make the packages also available
> in my YARN classpath as well? I saw some discussion that suggest that
> there were some related problems around this that were resolved in v1.10
> [1][2][3].
> 
>  
> 
> [1] https://issues.apache.org/jira/browse/FLINK-14574
> 
> 
> [2] https://issues.apache.org/jira/browse/FLINK-13044
> 
> 
> [3] https://issues.apache.org/jira/browse/FLINK-11956
> 
> 
>  
> 
> Best,
> 
> Andreas
> 
>  
> 
> 
> 
> 
> Your Personal Data: We may collect and process information about you
> that may be subject to data protection laws. For more information about
> how we use and disclose your personal data, how we protect your
> information, our legal basis to use your information, your rights and
> who you can contact, please refer to: www.gs.com/privacy-notices
> 


Re: Regarding state access in UDF

2021-06-30 Thread Ingo Bürk
Hi Kai,

CheckpointedFunction is not an interface meant to be used with UDFs (in the
Table API / SQL sense[1]), but is rather an interface for DataStream
API[2]; the term "user-defined function" has a different meaning there. Did
you actually try it to see if it works? I'd be surprised it it did.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/udfs/
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/user_defined_functions/


Ingo

On Thu, Jul 1, 2021 at 5:17 AM Kai Fu  wrote:

> Hi Ingo,
>
> Thank you for the reply, we actually need more fine-grained control on the
> states in UDF. Per investigation, we found that the states can be simply
> created/accessed via implementing `CheckpointedFunction` interface, please
> advise if there is any side-effect by doing that.
>
> On Wed, Jun 30, 2021 at 10:33 PM Ingo Bürk  wrote:
>
>> Hi Kai,
>>
>> AggregateFunction and TableAggregateFunction are both stateful UDF
>> interfaces. This should cover most scenarios given where they would be
>> used. If you need more fine-grained control you can also always drop down
>> into the DataStream API (using #toDataStream) and work there. Table API /
>> SQL in general are higher-level abstractions where you cannot directly
>> interact with operators.
>>
>> If this doesn't answer your question it would also be great if you could
>> explain your use case more so we can understand it. Thanks!
>>
>>
>> Best
>> Ingo
>>
>> On Wed, Jun 30, 2021 at 3:37 PM Kai Fu  wrote:
>>
>>> Hi team,
>>>
>>> We've a use case that needs to create/access state in UDF, while per the
>>> documentation
>>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/udfs/#runtime-integration>
>>> and UDF interface
>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java>.
>>> It does not provide such a way for that. We want to know if it is by design
>>> and is there any other approach for it.
>>>
>>> --
>>> *Best wishes,*
>>> *- Kai*
>>>
>>
>
> --
> *Best wishes,*
> *- Kai*
>


Re: Regarding state access in UDF

2021-06-30 Thread Ingo Bürk
Hi Kai,

AggregateFunction and TableAggregateFunction are both stateful UDF
interfaces. This should cover most scenarios given where they would be
used. If you need more fine-grained control you can also always drop down
into the DataStream API (using #toDataStream) and work there. Table API /
SQL in general are higher-level abstractions where you cannot directly
interact with operators.

If this doesn't answer your question it would also be great if you could
explain your use case more so we can understand it. Thanks!


Best
Ingo

On Wed, Jun 30, 2021 at 3:37 PM Kai Fu  wrote:

> Hi team,
>
> We've a use case that needs to create/access state in UDF, while per the
> documentation
> 
> and UDF interface
> .
> It does not provide such a way for that. We want to know if it is by design
> and is there any other approach for it.
>
> --
> *Best wishes,*
> *- Kai*
>


Re: How to configure column width in Flink SQL client?

2021-06-08 Thread Ingo Bürk
Hi Svend,

I think it definitely makes sense to open a JIRA issue for it to discuss it
also with the people working on the SQL client. Thanks for taking care of
this!


Regards
Ingo

On Wed, Jun 9, 2021 at 7:25 AM Svend  wrote:

> Thanks for the feed-back Ingo,
>
> Do you think a PR would be welcome to make that parameter configurable? At
> the place where I work, UUID are often used as column values and they are
> 36 character longs => very often a very useful piece of information to us
> is not readable.
>
> I had a quick look, the max width seems to be defined in [1], and used in
> various places like [2] and [3]. Should I open a Jira to discuss this and
> cc you in it?
>
> Cheers,
>
> Svend
>
>
> [1]
> https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L74
> [2]
> https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L102
> [3]
> https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java#L143
>
>
> On Tue, 8 Jun 2021, at 7:34 AM, Ingo Bürk wrote:
>
> Hi Svend,
>
> unfortunately the column width in the SQL client cannot currently be
> configured.
>
>
> Regards
> Ingo
>
> On Mon, Jun 7, 2021 at 4:19 PM Svend  wrote:
>
>
>
> Hi everyone,
>
> When using the Flink SQL client and displaying results interactively, it
> seems the values of any column wider than 24 characters is truncated, which
> is indicated by a '~' character, e.g. the "member_user_id" below:
>
> ```
> SELECT
>   metadata.true_as_of_timestamp_millis,
>   member_user_id,
>   membership_updated.new_status.updated_value
> FROM fandom_members_events
> WHERE
>group_id = '91170c98-2cc5-4935-9ea6-12b72d32fb3c'
>
>
> true_as_of_timestamp_mil~member_user_id
> updated_value
>  1622811665919 45ca821f-c0fc-4114-bef8-~
> (NULL)
>  1622811665919 45ca821f-c0fc-4114-bef8-~
> JOINED
>  1622118951005 b4734391-d3e1-417c-ad92-~
> (NULL)
> ...
> ```
>
> Is there a way to configure the displayed width? I didn't find any
> parameter for this in
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#sql-client-startup-options
>
>
> Thanks a lot in advance!
>
> Svend
>
>
>


Re: How to configure column width in Flink SQL client?

2021-06-07 Thread Ingo Bürk
Hi Svend,

unfortunately the column width in the SQL client cannot currently be
configured.


Regards
Ingo

On Mon, Jun 7, 2021 at 4:19 PM Svend  wrote:

>
> Hi everyone,
>
> When using the Flink SQL client and displaying results interactively, it
> seems the values of any column wider than 24 characters is truncated, which
> is indicated by a '~' character, e.g. the "member_user_id" below:
>
> ```
> SELECT
>   metadata.true_as_of_timestamp_millis,
>   member_user_id,
>   membership_updated.new_status.updated_value
> FROM fandom_members_events
> WHERE
>group_id = '91170c98-2cc5-4935-9ea6-12b72d32fb3c'
>
>
> true_as_of_timestamp_mil~member_user_id
> updated_value
>  1622811665919 45ca821f-c0fc-4114-bef8-~
> (NULL)
>  1622811665919 45ca821f-c0fc-4114-bef8-~
> JOINED
>  1622118951005 b4734391-d3e1-417c-ad92-~
> (NULL)
> ...
> ```
>
> Is there a way to configure the displayed width? I didn't find any
> parameter for this in
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#sql-client-startup-options
>
>
> Thanks a lot in advance!
>
> Svend
>


Re: SourceFunction cannot run in Batch Mode

2021-06-02 Thread Ingo Bürk
Hi Oscar,

I think you'll find your answers in [1], have a look at Yun's response a
couple emails down. Basically, SourceFunction is the legacy source stack,
and ideally you'd instead implement your source using the FLIP-27 stack[2]
where you can directly define the boundedness, but he also mentioned a
workaround.


Regards
Ingo

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-Kafka-as-bounded-source-with-DataStream-API-in-batch-mode-Flink-1-12-td40637.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/sources/#the-data-source-api

On Thu, Jun 3, 2021 at 7:29 AM 陳樺威  wrote:

> Hi,
>
> Currently, we want to use batch execution mode [0] to consume historical
> data and rebuild states for our streaming application.
> The Flink app will be run on-demand and close after complete all the file
> processing.
> We implement a SourceFuntion [1] to consume bounded parquet files from
> GCS. However, the function will be detected as Batch Mode.
>
> Our question is, how to implement a SourceFunction as a Bounded DataStream?
>
> Thanks!
> Oscar
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
>
>
>
>


Re: Apache Flink - A question about Tables API and SQL interfaces

2021-05-31 Thread Ingo Bürk
Hi everyone,

there is also [1] to introduce a CURRENT_WATERMARK function in SQL which
can help in dealing with late events. Maybe that's interesting here as well.

[1] https://issues.apache.org/jira/browse/FLINK-22737


Regards
Ingo

On Sun, May 30, 2021 at 5:31 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Mans,
>
> Regarding to your first question: I bookmarked the following mailing list
> discussion a while ago [1].
>
> Fabian Hueske as one of the major contributors to Flink answered that
> there aren't yet any trigger semantics in Flink SQL, but linked a great
> idea with a SQL extension of "EMIT".
>
> I read each Flink release notes and hope this idea is going to be
> implemented, but as far as I know, there wasn't any progress on this over
> the last years.
>
> Best regards
> Theo
>
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Do-Not-Support-Custom-Trigger-td20932.html
>
> - Ursprüngliche Mail -
> Von: "张静" 
> An: "Austin Cawley-Edwards" 
> CC: "M Singh" , "user" 
> Gesendet: Freitag, 14. Mai 2021 06:06:33
> Betreff: Re: Apache Flink - A question about Tables API and SQL interfaces
>
> Hi Mans,
>  +1 for Austin's reply.
>  I would like to add something about "allow lateness".
>  After introduce Windowing table-valued function in Flink 1.13,
> User could use two SQL solution to do window aggregate. And 'allow
> lateness' behavior is different in these two solutions.
> 1. If adopt windowing tvf window aggregate [2], 'allow lateness'
> is not supported yet.
> 2. If adopt legacy Grouped Window Functions [1], 'allow lateness'
> is supported. However, you should use the feature with caution since
> it depends on state retention configuration (`table.exec.state.ttl`
> [3]), especially if a job contains many operator except for window
> aggregate. Please see JIRA-21301 [4]. It maybe could be resolved in
> Flink-1.14.
>
> Best,
> beyond1920
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/#group-window-aggregation-deprecated
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf/
> [3]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/
> [4]:https://issues.apache.org/jira/browse/FLINK-21301
>
> Austin Cawley-Edwards  于2021年5月13日周四 下午9:57写道:
> >
> > Hi Mans,
> >
> > There are currently no public APIs for doing so, though if you're
> willing to deal with some breaking changes there are some experimental
> config options for late events in the Table API and SQL, seen in the
> WIndowEmitStrategy class[1].
> >
> > Best,
> > Austin
> >
> > [1]:
> https://github.com/apache/flink/blob/607919c6ea6983ae5ad3f82d63b7d6455c73d225/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L173-L211
> >
> > On Wed, May 12, 2021 at 5:12 PM M Singh  wrote:
> >>
> >> Thanks Austin for your helpful references.
> >>
> >> I did take a look at [2]/[3] - but did not find anything relevant on
> searching for string 'late' (for allowed lateness etc) or side output.  So
> from my understanding the late events will be dropped if I am using Table
> API or SQL and the only option is to use datastream interface.  Please let
> me know if I missed anything.
> >>
> >> Thanks again.
> >>
> >>
> >> On Wednesday, May 12, 2021, 04:36:06 PM EDT, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
> >>
> >>
> >> Hi Mans,
> >>
> >> I don't believe there are explicit triggers/evictors/timers in the
> Table API/ SQL, as that is abstracted away from the lower-level DataStream
> API. If you need to get into the fine-grained details, Flink 1.13 has made
> some good improvements in going from the Table API to the DataStream API,
> and back again. [1]
> >>
> >> For working with time and lateness with Table API and SQL, some good
> places to look are the GroupBy Window Aggregation section of the Table API
> docs[2], as well as the SQL cookbook[3] and Ververica's SQL training
> wiki[4].
> >>
> >> Hope that helps,
> >> Austin
> >>
> >> [1]:
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html#improved-interoperability-between-datastream-api-and-table-apisql
> >> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tableapi/#groupby-window-aggregation
> >> [3]:
> https://github.com/ververica/flink-sql-cookbook#aggregations-and-analytics
> >> [4]: https://github.com/ververica/sql-training/wiki/Queries-and-Time
> >>
> >> On Wed, May 12, 2021 at 1:30 PM M Singh  wrote:
> >>
> >> Hey Folks:
> >>
> >> I have the following questions regarding Table API/SQL in streaming
> mode:
> >>
> >> 1. Is there is a notion triggers/evictors/timers when using Table API
> or SQL interfaces ?
> >> 2. Is there anything like side outputs and ability to define allowed
> lateness when dealing with the Table API or SQL interfaces ?
> >>
> >> If 

Re: Issue on creating and using a custom connector in Ververica

2021-05-25 Thread Ingo Bürk
Hi Natu,

the message is of course a bit unfortunate and misleading, but the issue
here isn't that multiple connectors are found, but that none are found. The
repository you linked to implements a connector using the old connector
stack, but Ververica Platform only supports the new stack, see [1].
>From a quick look it also seems to be missing the entry in
META-INF/services which would be required for the factory discovery to work.

In my personal GitHub profile I have an example[2] for a connector using
the new stack which also works in Ververica Platform (it's not for
Elasticsearch, though).

[1]
https://docs.ververica.com/user_guide/sql_development/connectors.html#developing-a-custom-connector-or-format
[2] https://github.com/Airblader/flink-connector-imap


Regards
Ingo

On Mon, May 24, 2021 at 1:57 PM Natu Lauchande  wrote:

> Good day Flink community,
>
> Apache Flink/Ververica Community Edition - Question
>
> I am trying to add a custom connector to Ververica community edition and
> keeps giving me the following error: "The jar contains multiple connector.
> Please choose one.", it doesn't allow me to choose more jars. I am testing
> with the following repo generated custom connectors:
> https://github.com/deadwind4/slink/tree/master/connector-es6
>
> My specific question is there anything specific missing from this repos
> that we should add to signal ververica about a custom record
>
>
> I am constantly receiving the following message attached ("The jar
> contains multiple connectors, please select one.") and i can't select any
> option.
>
>
> Thanks,
>
> Natu
>


Re: two questions about flink stream processing: kafka sources and TimerService

2021-05-18 Thread Ingo Bürk
Hi Jin,

1) As far as I know the order is only guaranteed for events from the same
partition. If you want events across partitions to remain in order you may
need to use parallelism 1. I'll attach some links here which might be
useful:

https://stackoverflow.com/questions/50340107/order-of-events-with-chained-keyby-calls-on-same-key
https://stackoverflow.com/questions/44156774/ordering-of-records-in-a-keyed-stream-in-flink
https://stackoverflow.com/questions/50573174/flink-kafka-producer-elements-out-of-order

2) Indeed there doesn't seem to be a way to access the InternalTimerService
from a ProcessFunction at the moment. One approach could be to implement
this yourself using a MapState. Otherwise I think you need to implement
your own operator from which you can then access InternalTimerService
similar to how KeyedCoProcessOperator does it as well.


Regards
Ingo

On Wed, May 12, 2021 at 8:32 AM Jin Yi  wrote:

> hello.  thanks ahead of time for anyone who answers.
>
> 1.  verifying my understanding: for a kafka source that's partitioned on
> the same piece of data that is later used in a keyBy, if we are relying on
> the kafka timestamp as the event timestamp, is it guaranteed that the event
> stream of the source is in the kafka pipeline's insertion order for the
> topic?
>
> 2.  is there a way to use the InternalTimerService from within a
> ProcessFunction (specifically, a KeyedCoProcessFunction)?  i don't see an
> easy way to do this, except by changing the TimerService interface.  the
> use case for my need is that i'd like to have timers to clean up the left
> and right keyed state using namespaced timers like how IntervalJoin does it
> (
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256).
> right now, b/c the KeyedCoProcessFunction only gives us the
> SimpleTimerService via the Context, i can only trigger onTimer execution
> without being able to refine the cleaning of state to just the event state
> of the side that a timer was originated from.  without this, it'll end up
> needing to visit state associated with both event streams which isn't
> performant as those streams can have different throughputs (and therefore,
> expect to have different retention characteristics/needs).
>
> thanks.
>


Re: error message, need help

2021-05-17 Thread Ingo Bürk
Hi,

can you maybe share some details about the code you're running?


Regards
Ingo

On Tue, May 18, 2021 at 5:10 AM 杨建春/00250041  wrote:

> I'm using flink1.13.0, table Function, why report this error ? what
> reason ? Thanks!
>
>
>
> Traceback (most recent call last):
>   File "D:/yjc/AIOPS/Flink/UDTFcallstack.py", line 149, in 
> t_result.wait()
>   File "D:\Program Files
> (x86)\python36\lib\site-packages\pyflink\table\table_result.py", line 76,
> in wait
> get_method(self._j_table_result, "await")()
>   File "D:\Program Files
> (x86)\python36\lib\site-packages\py4j\java_gateway.py", line 1286, in
> __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "D:\Program Files
> (x86)\python36\lib\site-packages\pyflink\util\exceptions.py", line 147, in
> deco
> return f(*a, **kw)
>   File "D:\Program Files
> (x86)\python36\lib\site-packages\py4j\protocol.py", line 328, in
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o58.await.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.table.api.TableException: Failed to wait job finish
>  at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129)
>  at
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>  at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>  at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>  at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>  at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.TableException: Failed to wait job
> finish
>  at
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
>  at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
>  at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:383)
>  at
> org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:116)
>  at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ... 1 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>  at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>  ... 7 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>  at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>  at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
>  at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>  at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>  at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
>  at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>  at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
>  at