Re: Flink 1.12 StreamRecordQueueEntry is not public class

2022-07-01 Thread Martijn Visser
What I'm interested in is understanding why you're relying on this class;
what is the problem you're trying to solve? That input could be useful to
consider making that interface public or there could be another way to
solve your problem.

Best regards,

Martijn

Op wo 29 jun. 2022 om 18:12 schreef Milind Vaidya :

> Thanks Xuyang,
>
> I did something similar to unblock myself.
>
> - Milind
>
> On Wed, Jun 29, 2022 at 8:40 PM Xuyang  wrote:
>
>> Hi, Milind. You may notice that these classes are tagged with 'Internal'
>> and that mean they are may only used in flink itself. But I think you may
>> do some retrofit work on flink, and it's a fast way to tag it as public and
>> rebuild flink just for customization.
>>
>> At 2022-06-24 06:27:43, "Milind Vaidya"  wrote:
>>
>> Hi
>>
>> I am trying to upgrade the version to 1.12. Some of the old code is using
>>
>>
>> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
>>
>> This is no longer public in 1.12. Any pointers as to how to work
>> around this?
>>
>> Thanks,
>> Milind
>>
>>


Re: Flink 1.12 StreamRecordQueueEntry is not public class

2022-06-29 Thread Milind Vaidya
Thanks Xuyang,

I did something similar to unblock myself.

- Milind

On Wed, Jun 29, 2022 at 8:40 PM Xuyang  wrote:

> Hi, Milind. You may notice that these classes are tagged with 'Internal'
> and that mean they are may only used in flink itself. But I think you may
> do some retrofit work on flink, and it's a fast way to tag it as public and
> rebuild flink just for customization.
>
> At 2022-06-24 06:27:43, "Milind Vaidya"  wrote:
>
> Hi
>
> I am trying to upgrade the version to 1.12. Some of the old code is using
>
>
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
>
> This is no longer public in 1.12. Any pointers as to how to work
> around this?
>
> Thanks,
> Milind
>
>


Flink 1.12 StreamRecordQueueEntry is not public class

2022-06-23 Thread Milind Vaidya
Hi

I am trying to upgrade the version to 1.12. Some of the old code is using

org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;

This is no longer public in 1.12. Any pointers as to how to work
around this?

Thanks,
Milind


Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-15 Thread saravana...@gmail.com
Thanks Zhipeng.  Working as expected.  Thanks once again.

Saravanan

On Tue, Feb 15, 2022 at 3:23 AM Zhipeng Zhang 
wrote:

> Hi Saravanan,
>
> One solution could be using a streamOperator to implement `BoundedOneInput`
> interface.
> An example code could be found here [1].
>
> [1]
> https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75
>
> saravana...@gmail.com  于2022年2月15日周二 02:44写道:
>
>> Hi Niklas,
>>
>> Thanks for your reply.  Approach [1] works only if operators are chained
>> (in order words, operators executed within the same task).   Since
>> mapPartition operator parallelism is different from previous operator
>> parallelism, it doesn't fall under the same task(or not chained) .
>>
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
>> https://issues.apache.org/jira/browse/FLINK-14709
>>
>> Saravanan
>>
>> On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler 
>> wrote:
>>
>>> Hi Saravanan,
>>>
>>> AFAIK the last record is not treated differently.
>>>
>>> Does the approach in [1] not work?
>>>
>>> Best regards,
>>> Niklas
>>>
>>>
>>> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>>>
>>>
>>> > On 9. Feb 2022, at 20:31, saravana...@gmail.com 
>>> wrote:
>>> >
>>> > Is there any way to identify the last message inside RichFunction in
>>> BATCH mode ?
>>> >
>>> >
>>> >
>>> > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com <
>>> saravana...@gmail.com> wrote:
>>> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
>>> DataStream api. mapPartition is not available in Flink DataStream.
>>> >
>>> > Current Code using Flink 1.12.x DataSet :
>>> >
>>> > dataset
>>> > .
>>> > .mapPartition(new SomeMapParitionFn())
>>> > .
>>> >
>>> > public static class SomeMapPartitionFn extends
>>> RichMapPartitionFunction {
>>> >
>>> > @Override
>>> > public void mapPartition(Iterable records,
>>> Collector out) throws Exception {
>>> > for (InputModel record : records) {
>>> > /*
>>> > do some operation
>>> >  */
>>> > if (/* some condition based on processing *MULTIPLE*
>>> records */) {
>>> >
>>> > out.collect(...); // Conditional collect
>>>   ---> (1)
>>> > }
>>> > }
>>> >
>>> > // At the end of the data, collect
>>> >
>>> > out.collect(...);   // Collect processed data
>>>  ---> (2)
>>> > }
>>> > }
>>> >
>>> >   • (1) - Collector.collect invoked based on some condition after
>>> processing few records
>>> >   • (2) - Collector.collect invoked at the end of data
>>> >
>>> > Initially we thought of using flatMap instead of mapPartition, but the
>>> collector is not available in close function.
>>> >
>>> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>>> case of chained drivers
>>> > How to implement this in Flink 1.14.x DataStream? Please advise...
>>> >
>>> > Note: Our application works with only finite set of data (Batch Mode)
>>> >
>>>
>>>
>
> --
> best,
> Zhipeng
>
>


Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-15 Thread Zhipeng Zhang
Hi Saravanan,

One solution could be using a streamOperator to implement `BoundedOneInput`
interface.
An example code could be found here [1].

[1]
https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75

saravana...@gmail.com  于2022年2月15日周二 02:44写道:

> Hi Niklas,
>
> Thanks for your reply.  Approach [1] works only if operators are chained
> (in order words, operators executed within the same task).   Since
> mapPartition operator parallelism is different from previous operator
> parallelism, it doesn't fall under the same task(or not chained) .
>
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
> https://issues.apache.org/jira/browse/FLINK-14709
>
> Saravanan
>
> On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler 
> wrote:
>
>> Hi Saravanan,
>>
>> AFAIK the last record is not treated differently.
>>
>> Does the approach in [1] not work?
>>
>> Best regards,
>> Niklas
>>
>>
>> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>>
>>
>> > On 9. Feb 2022, at 20:31, saravana...@gmail.com 
>> wrote:
>> >
>> > Is there any way to identify the last message inside RichFunction in
>> BATCH mode ?
>> >
>> >
>> >
>> > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com <
>> saravana...@gmail.com> wrote:
>> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
>> DataStream api. mapPartition is not available in Flink DataStream.
>> >
>> > Current Code using Flink 1.12.x DataSet :
>> >
>> > dataset
>> > .
>> > .mapPartition(new SomeMapParitionFn())
>> > .
>> >
>> > public static class SomeMapPartitionFn extends
>> RichMapPartitionFunction {
>> >
>> > @Override
>> > public void mapPartition(Iterable records,
>> Collector out) throws Exception {
>> > for (InputModel record : records) {
>> > /*
>> > do some operation
>> >  */
>> > if (/* some condition based on processing *MULTIPLE*
>> records */) {
>> >
>> > out.collect(...); // Conditional collect
>> ---> (1)
>> > }
>> > }
>> >
>> > // At the end of the data, collect
>> >
>> > out.collect(...);   // Collect processed data
>>  ---> (2)
>> > }
>> > }
>> >
>> >   • (1) - Collector.collect invoked based on some condition after
>> processing few records
>> >   • (2) - Collector.collect invoked at the end of data
>> >
>> > Initially we thought of using flatMap instead of mapPartition, but the
>> collector is not available in close function.
>> >
>> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>> case of chained drivers
>> > How to implement this in Flink 1.14.x DataStream? Please advise...
>> >
>> > Note: Our application works with only finite set of data (Batch Mode)
>> >
>>
>>

-- 
best,
Zhipeng


Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-14 Thread saravana...@gmail.com
Hi Niklas,

Thanks for your reply.  Approach [1] works only if operators are chained
(in order words, operators executed within the same task).   Since
mapPartition operator parallelism is different from previous operator
parallelism, it doesn't fall under the same task(or not chained) .


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
https://issues.apache.org/jira/browse/FLINK-14709

Saravanan

On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler  wrote:

> Hi Saravanan,
>
> AFAIK the last record is not treated differently.
>
> Does the approach in [1] not work?
>
> Best regards,
> Niklas
>
>
> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>
>
> > On 9. Feb 2022, at 20:31, saravana...@gmail.com 
> wrote:
> >
> > Is there any way to identify the last message inside RichFunction in
> BATCH mode ?
> >
> >
> >
> > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com <
> saravana...@gmail.com> wrote:
> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
> DataStream api. mapPartition is not available in Flink DataStream.
> >
> > Current Code using Flink 1.12.x DataSet :
> >
> > dataset
> > .
> > .mapPartition(new SomeMapParitionFn())
> > .
> >
> > public static class SomeMapPartitionFn extends
> RichMapPartitionFunction {
> >
> > @Override
> > public void mapPartition(Iterable records,
> Collector out) throws Exception {
> > for (InputModel record : records) {
> > /*
> > do some operation
> >  */
> > if (/* some condition based on processing *MULTIPLE* records
> */) {
> >
> > out.collect(...); // Conditional collect
> ---> (1)
> > }
> > }
> >
> > // At the end of the data, collect
> >
> > out.collect(...);   // Collect processed data
>  ---> (2)
> > }
> > }
> >
> >   • (1) - Collector.collect invoked based on some condition after
> processing few records
> >   • (2) - Collector.collect invoked at the end of data
> >
> > Initially we thought of using flatMap instead of mapPartition, but the
> collector is not available in close function.
> >
> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
> case of chained drivers
> > How to implement this in Flink 1.14.x DataStream? Please advise...
> >
> > Note: Our application works with only finite set of data (Batch Mode)
> >
>
>


Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-14 Thread Niklas Semmler
Hi Saravanan,

AFAIK the last record is not treated differently.

Does the approach in [1] not work? 

Best regards,
Niklas

https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279


> On 9. Feb 2022, at 20:31, saravana...@gmail.com  wrote:
> 
> Is there any way to identify the last message inside RichFunction in BATCH 
> mode ?
> 
> 
> 
> On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com  
> wrote:
> I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x 
> DataStream api. mapPartition is not available in Flink DataStream.
> 
> Current Code using Flink 1.12.x DataSet :
> 
> dataset
> .
> .mapPartition(new SomeMapParitionFn())
> .
> 
> public static class SomeMapPartitionFn extends 
> RichMapPartitionFunction {
> 
> @Override
> public void mapPartition(Iterable records, 
> Collector out) throws Exception {
> for (InputModel record : records) {
> /*
> do some operation
>  */
> if (/* some condition based on processing *MULTIPLE* records */) {
> 
> out.collect(...); // Conditional collect---> 
> (1)
> }
> }
> 
> // At the end of the data, collect
> 
> out.collect(...);   // Collect processed data   ---> 
> (2) 
> }
> }
> 
>   • (1) - Collector.collect invoked based on some condition after 
> processing few records
>   • (2) - Collector.collect invoked at the end of data
> 
> Initially we thought of using flatMap instead of mapPartition, but the 
> collector is not available in close function.
> 
> https://issues.apache.org/jira/browse/FLINK-14709 - Only available in case of 
> chained drivers
> How to implement this in Flink 1.14.x DataStream? Please advise...
> 
> Note: Our application works with only finite set of data (Batch Mode)
> 



Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-09 Thread saravana...@gmail.com
Is there any way to identify the last message inside RichFunction in BATCH
mode ?



On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com 
wrote:

> I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
> DataStream api. mapPartition is not available in Flink DataStream.
> *Current Code using Flink 1.12.x DataSet :*
>
> dataset
> .
> .mapPartition(new SomeMapParitionFn())
> .
>
> public static class SomeMapPartitionFn extends 
> RichMapPartitionFunction {
>
> @Override
> public void mapPartition(Iterable records, 
> Collector out) throws Exception {
> for (InputModel record : records) {
> /*
> do some operation
>  */
> if (/* some condition based on processing *MULTIPLE* records */) 
> {*out.collect(...); // Conditional collect
> ---> (1)*}
> }
>
> // At the end of the data, collect*out.collect(...);   // 
> Collect processed data   ---> (2) *}
> }
>
>
>-
>
>(1) - Collector.collect invoked based on some condition after
>processing few records
>-
>
>(2) - Collector.collect invoked at the end of data
>
>Initially we thought of using flatMap instead of mapPartition, but the
>collector is not available in close function.
>
>https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>case of chained drivers
>
> How to implement this in Flink 1.14.x DataStream? Please advise...
>
> *Note*: Our application works with only finite set of data (Batch Mode)
>
>
>


Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-09 Thread saravana...@gmail.com
I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
DataStream api. mapPartition is not available in Flink DataStream.
*Current Code using Flink 1.12.x DataSet :*

dataset
.
.mapPartition(new SomeMapParitionFn())
.

public static class SomeMapPartitionFn extends
RichMapPartitionFunction {

@Override
public void mapPartition(Iterable records,
Collector out) throws Exception {
for (InputModel record : records) {
/*
do some operation
 */
if (/* some condition based on processing *MULTIPLE*
records */) {*out.collect(...); // Conditional collect
   ---> (1)*}
}

// At the end of the data, collect*out.collect(...);
// Collect processed data   ---> (2) *}
}


   -

   (1) - Collector.collect invoked based on some condition after processing
   few records
   -

   (2) - Collector.collect invoked at the end of data

   Initially we thought of using flatMap instead of mapPartition, but the
   collector is not available in close function.

   https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
   case of chained drivers

How to implement this in Flink 1.14.x DataStream? Please advise...

*Note*: Our application works with only finite set of data (Batch Mode)


Re: Re: Read parquet data from S3 with Flink 1.12

2021-12-29 Thread David Morávek
I've answered in other thread [1]. Please keep the conversation focused
there.

[1] https://lists.apache.org/thread/7cqqzno3lz75qw9yxprgg45q6voonsbq

Best,
D.

On Tue, Dec 28, 2021 at 4:00 PM Rohan Kumar  wrote:

> Hi Alexandre, I am also facing the same issue. Please let us know if you
> are able to find anything.
>
> Thanks
>
> On 2021/12/27 02:11:01 Alexandre Montecucco wrote:
> > Hi Seth,
> > Thank you for confirming the issue due to the transition in 1.14.
> > For now, given my constraints, I will do a simple workaround and download
> > the whole dataset with java aws library.
> >
> > For future reference though I would like to solve this
> > I am actually still on 1.12 at the moment and had actually some issue
> with
> > simply using flink-parquet.
> > I think I would have the same issue with 1.14. The root issue is really
> > around Hadoop library.
> >
> > If I simply add `flink-parquet` library as specified in the doc it cannot
> > compile because of class not found for
> > `org.apache.hadoop.conf.Configuration`.
> > If I add `hadoop-common` and mark it as provided, it fails with class not
> > found at runtime.
> > If I bundle hadoop with my application jar, the it crashes with
> filesystem
> > not found for `s3`.
> >
> > Did I miss anything in the doc?
> >
> > Alex
> >
> > On Tue, Dec 21, 2021 at 10:29 PM Seth Wiesman  wrote:
> >
> > > Hi Alexandre,
> > >
> > > You are correct, BatchTableEnvironment does not exist in 1.14 anymore.
> In
> > > 1.15 we will have the state processor API ported to DataStream for
> exactly
> > > this reason, it is the last piece to begin officially marking DataSet
> as
> > > deprecated. As you can understand, this has been a multi year process
> and
> > > there have been some rough edges as components are migrated.
> > >
> > > The easiest solution is for you to use 1.12 DataSet <-> Table interop.
> Any
> > > savepoint you create using Flink 1.12 you should be able to restore on
> a
> > > 1.14 DataStream application.
> > >
> > > I am unsure of the issue with the Hadoop plugin, but if using 1.14 is a
> > > hard requirement, rewriting your input data into another format could
> also
> > > be a viable stop-gap solution.
> > >
> > > Seth
> > >
> > > On Mon, Dec 20, 2021 at 8:57 PM Alexandre Montecucco <
> > > alexandre.montecu...@grabtaxi.com> wrote:
> > >
> > >> Hello,
> > >>
> > >> I also face the same issue as documented in a previous mail from the
> > >> mailing list [1]
> > >> Basically when using flink-parquet, I get:
> > >>
> > >>>  java.lang.ClassNotFoundException:
> org.apache.hadoop.conf.Configuration
> > >>
> > >> I have no idea what I need to do to fix this and could not find
> anything
> > >> from the doc. I tried importing various hadoop libraries, but it
> always
> > >> causes yet another issue.
> > >>
> > >> I think this might be the root cause of my problem.
> > >>
> > >> Best,
> > >> Alex
> > >>
> > >> [1] https://lists.apache.org/thread/796m8tww4gqykqm1szb3y5m7t6scgho2
> > >>
> > >> On Mon, Dec 20, 2021 at 4:23 PM Alexandre Montecucco <
> > >> alexandre.montecu...@grabtaxi.com> wrote:
> > >>
> > >>> Hello Piotrek,
> > >>> Thank you for the help.
> > >>> Regarding the S3 issue I have followed the documentation for the
> > >>> plugins. Many of our other apps are using S3 through the Hadoop Fs
> Flink
> > >>> plugin.
> > >>> Also, in this case, just reading regular plain text file works, I
> only
> > >>> have an issue when using Parquet.
> > >>>
> > >>> I tried switching to Flink 1.14, however I am stumbling upon other
> > >>> blockers.
> > >>> To give more context, I am trying to build a Flink savepoint for cold
> > >>> start data. So I am using the Flink State Processor API. But:
> > >>>  -  Flink State Processor API is using the DataSet api which is now
> > >>> marked as deprecated (Legacy)
> > >>>  - the doc you shared regarding reading from Parquet uses the
> DataStream
> > >>> API
> > >>>  - the Flink State Processor API doc [1] states there is
> interoperability
> > >>> of DataSet and Tabl

RE: Re: Read parquet data from S3 with Flink 1.12

2021-12-28 Thread Rohan Kumar
Hi Alexandre, I am also facing the same issue. Please let us know if you
are able to find anything.

Thanks

On 2021/12/27 02:11:01 Alexandre Montecucco wrote:
> Hi Seth,
> Thank you for confirming the issue due to the transition in 1.14.
> For now, given my constraints, I will do a simple workaround and download
> the whole dataset with java aws library.
>
> For future reference though I would like to solve this
> I am actually still on 1.12 at the moment and had actually some issue with
> simply using flink-parquet.
> I think I would have the same issue with 1.14. The root issue is really
> around Hadoop library.
>
> If I simply add `flink-parquet` library as specified in the doc it cannot
> compile because of class not found for
> `org.apache.hadoop.conf.Configuration`.
> If I add `hadoop-common` and mark it as provided, it fails with class not
> found at runtime.
> If I bundle hadoop with my application jar, the it crashes with filesystem
> not found for `s3`.
>
> Did I miss anything in the doc?
>
> Alex
>
> On Tue, Dec 21, 2021 at 10:29 PM Seth Wiesman  wrote:
>
> > Hi Alexandre,
> >
> > You are correct, BatchTableEnvironment does not exist in 1.14 anymore.
In
> > 1.15 we will have the state processor API ported to DataStream for
exactly
> > this reason, it is the last piece to begin officially marking DataSet as
> > deprecated. As you can understand, this has been a multi year process
and
> > there have been some rough edges as components are migrated.
> >
> > The easiest solution is for you to use 1.12 DataSet <-> Table interop.
Any
> > savepoint you create using Flink 1.12 you should be able to restore on a
> > 1.14 DataStream application.
> >
> > I am unsure of the issue with the Hadoop plugin, but if using 1.14 is a
> > hard requirement, rewriting your input data into another format could
also
> > be a viable stop-gap solution.
> >
> > Seth
> >
> > On Mon, Dec 20, 2021 at 8:57 PM Alexandre Montecucco <
> > alexandre.montecu...@grabtaxi.com> wrote:
> >
> >> Hello,
> >>
> >> I also face the same issue as documented in a previous mail from the
> >> mailing list [1]
> >> Basically when using flink-parquet, I get:
> >>
> >>>  java.lang.ClassNotFoundException:
org.apache.hadoop.conf.Configuration
> >>
> >> I have no idea what I need to do to fix this and could not find
anything
> >> from the doc. I tried importing various hadoop libraries, but it always
> >> causes yet another issue.
> >>
> >> I think this might be the root cause of my problem.
> >>
> >> Best,
> >> Alex
> >>
> >> [1] https://lists.apache.org/thread/796m8tww4gqykqm1szb3y5m7t6scgho2
> >>
> >> On Mon, Dec 20, 2021 at 4:23 PM Alexandre Montecucco <
> >> alexandre.montecu...@grabtaxi.com> wrote:
> >>
> >>> Hello Piotrek,
> >>> Thank you for the help.
> >>> Regarding the S3 issue I have followed the documentation for the
> >>> plugins. Many of our other apps are using S3 through the Hadoop Fs
Flink
> >>> plugin.
> >>> Also, in this case, just reading regular plain text file works, I only
> >>> have an issue when using Parquet.
> >>>
> >>> I tried switching to Flink 1.14, however I am stumbling upon other
> >>> blockers.
> >>> To give more context, I am trying to build a Flink savepoint for cold
> >>> start data. So I am using the Flink State Processor API. But:
> >>>  -  Flink State Processor API is using the DataSet api which is now
> >>> marked as deprecated (Legacy)
> >>>  - the doc you shared regarding reading from Parquet uses the
DataStream
> >>> API
> >>>  - the Flink State Processor API doc [1] states there is
interoperability
> >>> of DataSet and Table API
> >>> <
https://nightlies.apache.org/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api
>
> >>>  (but the link is now erroneous), it was last correct in Flink 1.12
[2]
> >>>
> >>> Given that we can convert from DataStream to Table API, I was
thinking I
> >>> could then convert from Table to DataSet API (though very cumbersome
and
> >>> unsure if any performance / memory impact).
> >>> But for the Table to DataSet conversion, the doc is using a
BatchTableEnvironment
> >>> class which does not seem to exist in Flink 1.14 anymore
> >>>
> >>> Any recommendations or anything I might have missed?
>

Re: Read parquet data from S3 with Flink 1.12

2021-12-26 Thread Alexandre Montecucco
Hi Seth,
Thank you for confirming the issue due to the transition in 1.14.
For now, given my constraints, I will do a simple workaround and download
the whole dataset with java aws library.

For future reference though I would like to solve this
I am actually still on 1.12 at the moment and had actually some issue with
simply using flink-parquet.
I think I would have the same issue with 1.14. The root issue is really
around Hadoop library.

If I simply add `flink-parquet` library as specified in the doc it cannot
compile because of class not found for
`org.apache.hadoop.conf.Configuration`.
If I add `hadoop-common` and mark it as provided, it fails with class not
found at runtime.
If I bundle hadoop with my application jar, the it crashes with filesystem
not found for `s3`.

Did I miss anything in the doc?

Alex

On Tue, Dec 21, 2021 at 10:29 PM Seth Wiesman  wrote:

> Hi Alexandre,
>
> You are correct, BatchTableEnvironment does not exist in 1.14 anymore. In
> 1.15 we will have the state processor API ported to DataStream for exactly
> this reason, it is the last piece to begin officially marking DataSet as
> deprecated. As you can understand, this has been a multi year process and
> there have been some rough edges as components are migrated.
>
> The easiest solution is for you to use 1.12 DataSet <-> Table interop. Any
> savepoint you create using Flink 1.12 you should be able to restore on a
> 1.14 DataStream application.
>
> I am unsure of the issue with the Hadoop plugin, but if using 1.14 is a
> hard requirement, rewriting your input data into another format could also
> be a viable stop-gap solution.
>
> Seth
>
> On Mon, Dec 20, 2021 at 8:57 PM Alexandre Montecucco <
> alexandre.montecu...@grabtaxi.com> wrote:
>
>> Hello,
>>
>> I also face the same issue as documented in a previous mail from the
>> mailing list [1]
>> Basically when using flink-parquet, I get:
>>
>>>  java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
>>
>> I have no idea what I need to do to fix this and could not find anything
>> from the doc. I tried importing various hadoop libraries, but it always
>> causes yet another issue.
>>
>> I think this might be the root cause of my problem.
>>
>> Best,
>> Alex
>>
>> [1] https://lists.apache.org/thread/796m8tww4gqykqm1szb3y5m7t6scgho2
>>
>> On Mon, Dec 20, 2021 at 4:23 PM Alexandre Montecucco <
>> alexandre.montecu...@grabtaxi.com> wrote:
>>
>>> Hello Piotrek,
>>> Thank you for the help.
>>> Regarding the S3 issue I have followed the documentation for the
>>> plugins. Many of our other apps are using S3 through the Hadoop Fs Flink
>>> plugin.
>>> Also, in this case, just reading regular plain text file works, I only
>>> have an issue when using Parquet.
>>>
>>> I tried switching to Flink 1.14, however I am stumbling upon other
>>> blockers.
>>> To give more context, I am trying to build a Flink savepoint for cold
>>> start data. So I am using the Flink State Processor API. But:
>>>  -  Flink State Processor API is using the DataSet api which is now
>>> marked as deprecated (Legacy)
>>>  - the doc you shared regarding reading from Parquet uses the DataStream
>>> API
>>>  - the Flink State Processor API doc [1] states there is interoperability
>>> of DataSet and Table API
>>> <https://nightlies.apache.org/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api>
>>>  (but the link is now erroneous), it was last correct in Flink 1.12 [2]
>>>
>>> Given that we can convert from DataStream to Table API, I was thinking I
>>> could then convert from Table to DataSet API (though very cumbersome and
>>> unsure if any performance / memory impact).
>>> But for the Table to DataSet conversion, the doc is using a 
>>> BatchTableEnvironment
>>> class which does not seem to exist in Flink 1.14 anymore
>>>
>>> Any recommendations or anything I might have missed?
>>>
>>> Thank you.
>>>
>>> Best,
>>> Alex
>>>
>>>
>>> [1] 
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api
>>>
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api>
>>>
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
>>> [3]
>>> https://nightlies.apach

Re: Window Top N for Flink 1.12

2021-12-23 Thread Jing Zhang
Hi Jing,
Please try this way,
Only create one sink for final output, write the window aggregate and topN
in one query, write the result of topN into the final sink.

Best,
Jing Zhang


Jing  于2021年12月24日周五 03:13写道:

> Hi Jing Zhang,
>
> Thanks for the reply! My current implementation is like this:
>
>
> tableEnv.executeSql(
>   "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
> BIGINT, num_select BIGINT) WITH ('connector' = 'kafka', 'scan.startup.mode'
> = 'latest-offset')"
> )
>
> tableEnv.executeSql("""
> |INSERT INTO ItemDesc
> |SELECT
> |   item_id,
> |   channel_id,
> |   CAST(HOP_END(proctime, INTERVAL '15' SECOND, INTERVAL '60'
> SECOND) AS BIGINT) AS window_end,
> |   COUNT(*) as num_select
> |FROM mytable
> |GROUP BY item_id, channel_id, HOP(proctime, INTERVAL '15' SECOND,
> INTERVAL '60' SECOND)
>   """.stripMargin)
>
> val result = tableEnv.sqlQuery("""
> |SELECT roku_content_id, window_end, channel_id, num_select, row_num
> |FROM (
> |   SELECT *
> |  ROW_NUMBER() OVER (PARTITION BY channel_id ORDER BY num_select
> DESC) as row_num
> |   FROM ItemDesc)
> |WHERE row_num <= 20
> |""".stripMargin)
>
> But I got the error:
>
> org.apache.flink.table.api.ValidationException: Unable to create a sink
> for writing table 'default_catalog.default_database.ItemDesc'.
>
> The table ItemDesc is an intermediate table. If I put everything in a
> single query, that doesn't work. If I create a table like this:
>
> tableEnv.executeSql(
>   "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
> BIGINT, num_select BIGINT) "
> )
>
> This also doesn't work.
>
>
> Thanks,
> Jing
>
>
>
>
> On Thu, Dec 23, 2021 at 1:20 AM Jing Zhang  wrote:
>
>> Hi Jing,
>> In fact, I agree with you to use TopN [2] instead of Window TopN[1] by 
>> normalizing
>> time into a unit with 5 minute, and add it to be one of partition keys.
>> Please note two points when use TopN
>> 1. the result is an update stream instead of append stream, which means
>> the result sent might be retracted later
>> 2. you could take care of state clean.
>>
>> However you said you meet with a problem when use TopN. I didn't
>> understand your question here. Would you please explain a little more?
>> > > I saw the one possibility is to create a table and insert the
>> aggregated data to the table, then do top N like [1]. However, I cannot
>> make this approach work because I need to specify the connector for this
>> table and I may also need to create another kafka topic for this.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/
>> topn/
>>
>> Jing Zhang  于2021年12月23日周四 17:04写道:
>>
>>> Hi Jing,
>>> I'm afraid there is no possible to Window TopN in SQL on 1.12 version
>>> because window TopN is introduced since 1.13.
>>>
>>> > I saw the one possibility is to create a table and insert the
>>> aggregated data to the table, then do top N like [1]. However, I cannot
>>> make this approach work because I need to specify the connector for this
>>> table and I may also need to create another kafka topic for this.
>>> I didn't understand you here.
>>> Do you mean you need a sink to store output data of TopN? However, you
>>> still need a sink to store the output even you use Window TopN.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>>>
>>> Best,
>>> Jing Zhang
>>>
>>>
>>> Jing  于2021年12月23日周四 16:12写道:
>>>
>>>> Hi, Flink community,
>>>>
>>>> Is there any existing code I can use to get the window top N with Flink
>>>> 1.12? I saw the one possibility is to create a table and insert the
>>>> aggregated data to the table, then do top N like [1]. However, I cannot
>>>> make this approach work because I need to specify the connector for this
>>>> table and I may also need to create another kafka topic for this. Is there
>>>> any existing way to do the Window Top N with Flink 1.12?
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>>>>
>>>>
>>>> Thanks,
>>>> Jing
>>>>
>>>


Re: Window Top N for Flink 1.12

2021-12-23 Thread Jing
Hi Jing Zhang,

Thanks for the reply! My current implementation is like this:


tableEnv.executeSql(
  "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
BIGINT, num_select BIGINT) WITH ('connector' = 'kafka', 'scan.startup.mode'
= 'latest-offset')"
)

tableEnv.executeSql("""
|INSERT INTO ItemDesc
|SELECT
|   item_id,
|   channel_id,
|   CAST(HOP_END(proctime, INTERVAL '15' SECOND, INTERVAL '60'
SECOND) AS BIGINT) AS window_end,
|   COUNT(*) as num_select
|FROM mytable
|GROUP BY item_id, channel_id, HOP(proctime, INTERVAL '15' SECOND,
INTERVAL '60' SECOND)
  """.stripMargin)

val result = tableEnv.sqlQuery("""
|SELECT roku_content_id, window_end, channel_id, num_select, row_num
|FROM (
|   SELECT *
|  ROW_NUMBER() OVER (PARTITION BY channel_id ORDER BY num_select
DESC) as row_num
|   FROM ItemDesc)
|WHERE row_num <= 20
|""".stripMargin)

But I got the error:

org.apache.flink.table.api.ValidationException: Unable to create a sink for
writing table 'default_catalog.default_database.ItemDesc'.

The table ItemDesc is an intermediate table. If I put everything in a
single query, that doesn't work. If I create a table like this:

tableEnv.executeSql(
  "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
BIGINT, num_select BIGINT) "
)

This also doesn't work.


Thanks,
Jing




On Thu, Dec 23, 2021 at 1:20 AM Jing Zhang  wrote:

> Hi Jing,
> In fact, I agree with you to use TopN [2] instead of Window TopN[1] by 
> normalizing
> time into a unit with 5 minute, and add it to be one of partition keys.
> Please note two points when use TopN
> 1. the result is an update stream instead of append stream, which means
> the result sent might be retracted later
> 2. you could take care of state clean.
>
> However you said you meet with a problem when use TopN. I didn't
> understand your question here. Would you please explain a little more?
> > > I saw the one possibility is to create a table and insert the
> aggregated data to the table, then do top N like [1]. However, I cannot
> make this approach work because I need to specify the connector for this
> table and I may also need to create another kafka topic for this.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/
> topn/
>
> Jing Zhang  于2021年12月23日周四 17:04写道:
>
>> Hi Jing,
>> I'm afraid there is no possible to Window TopN in SQL on 1.12 version
>> because window TopN is introduced since 1.13.
>>
>> > I saw the one possibility is to create a table and insert the
>> aggregated data to the table, then do top N like [1]. However, I cannot
>> make this approach work because I need to specify the connector for this
>> table and I may also need to create another kafka topic for this.
>> I didn't understand you here.
>> Do you mean you need a sink to store output data of TopN? However, you
>> still need a sink to store the output even you use Window TopN.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>>
>> Best,
>> Jing Zhang
>>
>>
>> Jing  于2021年12月23日周四 16:12写道:
>>
>>> Hi, Flink community,
>>>
>>> Is there any existing code I can use to get the window top N with Flink
>>> 1.12? I saw the one possibility is to create a table and insert the
>>> aggregated data to the table, then do top N like [1]. However, I cannot
>>> make this approach work because I need to specify the connector for this
>>> table and I may also need to create another kafka topic for this. Is there
>>> any existing way to do the Window Top N with Flink 1.12?
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>>>
>>>
>>> Thanks,
>>> Jing
>>>
>>


Re: Window Top N for Flink 1.12

2021-12-23 Thread Jing Zhang
Hi Jing,
In fact, I agree with you to use TopN [2] instead of Window TopN[1] by
normalizing
time into a unit with 5 minute, and add it to be one of partition keys.
Please note two points when use TopN
1. the result is an update stream instead of append stream, which means the
result sent might be retracted later
2. you could take care of state clean.

However you said you meet with a problem when use TopN. I didn't understand
your question here. Would you please explain a little more?
> > I saw the one possibility is to create a table and insert the
aggregated data to the table, then do top N like [1]. However, I cannot
make this approach work because I need to specify the connector for this
table and I may also need to create another kafka topic for this.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/
topn/

Jing Zhang  于2021年12月23日周四 17:04写道:

> Hi Jing,
> I'm afraid there is no possible to Window TopN in SQL on 1.12 version
> because window TopN is introduced since 1.13.
>
> > I saw the one possibility is to create a table and insert the aggregated
> data to the table, then do top N like [1]. However, I cannot make this
> approach work because I need to specify the connector for this table and I
> may also need to create another kafka topic for this.
> I didn't understand you here.
> Do you mean you need a sink to store output data of TopN? However, you
> still need a sink to store the output even you use Window TopN.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>
> Best,
> Jing Zhang
>
>
> Jing  于2021年12月23日周四 16:12写道:
>
>> Hi, Flink community,
>>
>> Is there any existing code I can use to get the window top N with Flink
>> 1.12? I saw the one possibility is to create a table and insert the
>> aggregated data to the table, then do top N like [1]. However, I cannot
>> make this approach work because I need to specify the connector for this
>> table and I may also need to create another kafka topic for this. Is there
>> any existing way to do the Window Top N with Flink 1.12?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>>
>>
>> Thanks,
>> Jing
>>
>


Re: Window Top N for Flink 1.12

2021-12-23 Thread Jing Zhang
Hi Jing,
I'm afraid there is no possible to Window TopN in SQL on 1.12 version
because window TopN is introduced since 1.13.

> I saw the one possibility is to create a table and insert the aggregated
data to the table, then do top N like [1]. However, I cannot make this
approach work because I need to specify the connector for this table and I
may also need to create another kafka topic for this.
I didn't understand you here.
Do you mean you need a sink to store output data of TopN? However, you
still need a sink to store the output even you use Window TopN.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/

Best,
Jing Zhang


Jing  于2021年12月23日周四 16:12写道:

> Hi, Flink community,
>
> Is there any existing code I can use to get the window top N with Flink
> 1.12? I saw the one possibility is to create a table and insert the
> aggregated data to the table, then do top N like [1]. However, I cannot
> make this approach work because I need to specify the connector for this
> table and I may also need to create another kafka topic for this. Is there
> any existing way to do the Window Top N with Flink 1.12?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>
>
> Thanks,
> Jing
>


Window Top N for Flink 1.12

2021-12-23 Thread Jing
Hi, Flink community,

Is there any existing code I can use to get the window top N with Flink
1.12? I saw the one possibility is to create a table and insert the
aggregated data to the table, then do top N like [1]. However, I cannot
make this approach work because I need to specify the connector for this
table and I may also need to create another kafka topic for this. Is there
any existing way to do the Window Top N with Flink 1.12?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n


Thanks,
Jing


Re: Read parquet data from S3 with Flink 1.12

2021-12-21 Thread Seth Wiesman
Hi Alexandre,

You are correct, BatchTableEnvironment does not exist in 1.14 anymore. In
1.15 we will have the state processor API ported to DataStream for exactly
this reason, it is the last piece to begin officially marking DataSet as
deprecated. As you can understand, this has been a multi year process and
there have been some rough edges as components are migrated.

The easiest solution is for you to use 1.12 DataSet <-> Table interop. Any
savepoint you create using Flink 1.12 you should be able to restore on a
1.14 DataStream application.

I am unsure of the issue with the Hadoop plugin, but if using 1.14 is a
hard requirement, rewriting your input data into another format could also
be a viable stop-gap solution.

Seth

On Mon, Dec 20, 2021 at 8:57 PM Alexandre Montecucco <
alexandre.montecu...@grabtaxi.com> wrote:

> Hello,
>
> I also face the same issue as documented in a previous mail from the
> mailing list [1]
> Basically when using flink-parquet, I get:
>
>>  java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
>
> I have no idea what I need to do to fix this and could not find anything
> from the doc. I tried importing various hadoop libraries, but it always
> causes yet another issue.
>
> I think this might be the root cause of my problem.
>
> Best,
> Alex
>
> [1] https://lists.apache.org/thread/796m8tww4gqykqm1szb3y5m7t6scgho2
>
> On Mon, Dec 20, 2021 at 4:23 PM Alexandre Montecucco <
> alexandre.montecu...@grabtaxi.com> wrote:
>
>> Hello Piotrek,
>> Thank you for the help.
>> Regarding the S3 issue I have followed the documentation for the plugins.
>> Many of our other apps are using S3 through the Hadoop Fs Flink plugin.
>> Also, in this case, just reading regular plain text file works, I only
>> have an issue when using Parquet.
>>
>> I tried switching to Flink 1.14, however I am stumbling upon other
>> blockers.
>> To give more context, I am trying to build a Flink savepoint for cold
>> start data. So I am using the Flink State Processor API. But:
>>  -  Flink State Processor API is using the DataSet api which is now
>> marked as deprecated (Legacy)
>>  - the doc you shared regarding reading from Parquet uses the DataStream
>> API
>>  - the Flink State Processor API doc [1] states there is interoperability
>> of DataSet and Table API
>> <https://nightlies.apache.org/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api>
>>  (but the link is now erroneous), it was last correct in Flink 1.12 [2]
>>
>> Given that we can convert from DataStream to Table API, I was thinking I
>> could then convert from Table to DataSet API (though very cumbersome and
>> unsure if any performance / memory impact).
>> But for the Table to DataSet conversion, the doc is using a 
>> BatchTableEnvironment
>> class which does not seem to exist in Flink 1.14 anymore
>>
>> Any recommendations or anything I might have missed?
>>
>> Thank you.
>>
>> Best,
>> Alex
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api
>>
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api>
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>>
>>
>> On Fri, Dec 17, 2021 at 8:53 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Reading in the DataStream API (that's what I'm using you are doing) from
>>> Parquet files is officially supported and documented only since 1.14 [1].
>>> Before that it was only supported for the Table API. As far as I can tell,
>>> the basic classes (`FileSource` and `ParquetColumnarRowInputFormat`) have
>>> already been in the code base since 1.12.x. I don't know how stable it was
>>> and how well it was working. I would suggest upgrading to Flink 1.14.1. As
>>> a last resort you can try using the very least the latest version of 1.12.x
>>> branch as documented by 1.14 version, but I can not guarantee that it will
>>> be working.
>>>
>>> Regarding the S3 issue, have you followed the documentation? [2][3]
>>>
>>> Best,
>>> Piotrek
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>>> 

Re: Read parquet data from S3 with Flink 1.12

2021-12-20 Thread Alexandre Montecucco
Hello,

I also face the same issue as documented in a previous mail from the
mailing list [1]
Basically when using flink-parquet, I get:

>  java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

I have no idea what I need to do to fix this and could not find anything
from the doc. I tried importing various hadoop libraries, but it always
causes yet another issue.

I think this might be the root cause of my problem.

Best,
Alex

[1] https://lists.apache.org/thread/796m8tww4gqykqm1szb3y5m7t6scgho2

On Mon, Dec 20, 2021 at 4:23 PM Alexandre Montecucco <
alexandre.montecu...@grabtaxi.com> wrote:

> Hello Piotrek,
> Thank you for the help.
> Regarding the S3 issue I have followed the documentation for the plugins.
> Many of our other apps are using S3 through the Hadoop Fs Flink plugin.
> Also, in this case, just reading regular plain text file works, I only
> have an issue when using Parquet.
>
> I tried switching to Flink 1.14, however I am stumbling upon other
> blockers.
> To give more context, I am trying to build a Flink savepoint for cold
> start data. So I am using the Flink State Processor API. But:
>  -  Flink State Processor API is using the DataSet api which is now marked
> as deprecated (Legacy)
>  - the doc you shared regarding reading from Parquet uses the DataStream
> API
>  - the Flink State Processor API doc [1] states there is interoperability
> of DataSet and Table API
> <https://nightlies.apache.org/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api>
>  (but the link is now erroneous), it was last correct in Flink 1.12 [2]
>
> Given that we can convert from DataStream to Table API, I was thinking I
> could then convert from Table to DataSet API (though very cumbersome and
> unsure if any performance / memory impact).
> But for the Table to DataSet conversion, the doc is using a 
> BatchTableEnvironment
> class which does not seem to exist in Flink 1.14 anymore
>
> Any recommendations or anything I might have missed?
>
> Thank you.
>
> Best,
> Alex
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api
>
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api>
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>
>
> On Fri, Dec 17, 2021 at 8:53 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Reading in the DataStream API (that's what I'm using you are doing) from
>> Parquet files is officially supported and documented only since 1.14 [1].
>> Before that it was only supported for the Table API. As far as I can tell,
>> the basic classes (`FileSource` and `ParquetColumnarRowInputFormat`) have
>> already been in the code base since 1.12.x. I don't know how stable it was
>> and how well it was working. I would suggest upgrading to Flink 1.14.1. As
>> a last resort you can try using the very least the latest version of 1.12.x
>> branch as documented by 1.14 version, but I can not guarantee that it will
>> be working.
>>
>> Regarding the S3 issue, have you followed the documentation? [2][3]
>>
>> Best,
>> Piotrek
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/filesystems/s3.html
>>
>>
>> pt., 17 gru 2021 o 10:10 Alexandre Montecucco <
>> alexandre.montecu...@grabtaxi.com> napisał(a):
>>
>>> Hello everyone,
>>> I am struggling to read S3 parquet files from S3 with Flink Streaming
>>> 1.12.2
>>> I had some difficulty simply reading from local parquet files. I finally
>>> managed that part, though the solution feels dirty:
>>> - I use the readFile function + ParquetInputFormat abstract class (that
>>> is protected) (as I could not find a way to use the public
>>> ParquetRowInputFormat).
>>> - the open function, in ParquetInputFormat is
>>> using org.apache.hadoop.conf.Configuration. I am not sure which import to
>>> add. It seems the flink-parquet library is importing the dependency from
>>> hadoop-common but the dep is marked as provided. THe doc only shows usage
>>> of flink-parquet f

Re: Read parquet data from S3 with Flink 1.12

2021-12-20 Thread Alexandre Montecucco
Hello Piotrek,
Thank you for the help.
Regarding the S3 issue I have followed the documentation for the plugins.
Many of our other apps are using S3 through the Hadoop Fs Flink plugin.
Also, in this case, just reading regular plain text file works, I only have
an issue when using Parquet.

I tried switching to Flink 1.14, however I am stumbling upon other
blockers.
To give more context, I am trying to build a Flink savepoint for cold start
data. So I am using the Flink State Processor API. But:
 -  Flink State Processor API is using the DataSet api which is now marked
as deprecated (Legacy)
 - the doc you shared regarding reading from Parquet uses the DataStream API
 - the Flink State Processor API doc [1] states there is interoperability
of DataSet and Table API
<https://nightlies.apache.org/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api>
 (but the link is now erroneous), it was last correct in Flink 1.12 [2]

Given that we can convert from DataStream to Table API, I was thinking I
could then convert from Table to DataSet API (though very cumbersome and
unsure if any performance / memory impact).
But for the Table to DataSet conversion, the doc is using a
BatchTableEnvironment
class which does not seem to exist in Flink 1.14 anymore

Any recommendations or anything I might have missed?

Thank you.

Best,
Alex


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api>

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/


On Fri, Dec 17, 2021 at 8:53 PM Piotr Nowojski  wrote:

> Hi,
>
> Reading in the DataStream API (that's what I'm using you are doing) from
> Parquet files is officially supported and documented only since 1.14 [1].
> Before that it was only supported for the Table API. As far as I can tell,
> the basic classes (`FileSource` and `ParquetColumnarRowInputFormat`) have
> already been in the code base since 1.12.x. I don't know how stable it was
> and how well it was working. I would suggest upgrading to Flink 1.14.1. As
> a last resort you can try using the very least the latest version of 1.12.x
> branch as documented by 1.14 version, but I can not guarantee that it will
> be working.
>
> Regarding the S3 issue, have you followed the documentation? [2][3]
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/filesystems/s3.html
>
>
> pt., 17 gru 2021 o 10:10 Alexandre Montecucco <
> alexandre.montecu...@grabtaxi.com> napisał(a):
>
>> Hello everyone,
>> I am struggling to read S3 parquet files from S3 with Flink Streaming
>> 1.12.2
>> I had some difficulty simply reading from local parquet files. I finally
>> managed that part, though the solution feels dirty:
>> - I use the readFile function + ParquetInputFormat abstract class (that
>> is protected) (as I could not find a way to use the public
>> ParquetRowInputFormat).
>> - the open function, in ParquetInputFormat is
>> using org.apache.hadoop.conf.Configuration. I am not sure which import to
>> add. It seems the flink-parquet library is importing the dependency from
>> hadoop-common but the dep is marked as provided. THe doc only shows usage
>> of flink-parquet from Flink SQL. So I am under the impression that this
>> might not work in the streaming case without extra code. I 'solved' this by
>> adding a dependency to hadoop-common. We did something similar to write
>> parquet data to S3.
>>
>> Now, when trying to run the application to read from S3, I get an
>> exception with root cause:
>> ```
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "s3"
>> ```
>> I guess there are some issues with hadoop-common not knowing about the
>> flink-s3-hadoop plugin setup. But I ran out of ideas on how to solve this.
>>
>>
>> I also noticed there were some changes with flink-parquet in Flink 1.14,
>> but I had some issues with simply reading data (but I did not investigate
>> so deeply for that version).
>>
>> Many thanks for any help.
>> --
>>
>> [image: Grab] <https://htmlsig.com/t/0

Re: Read parquet data from S3 with Flink 1.12

2021-12-17 Thread Piotr Nowojski
Hi,

Reading in the DataStream API (that's what I'm using you are doing) from
Parquet files is officially supported and documented only since 1.14 [1].
Before that it was only supported for the Table API. As far as I can tell,
the basic classes (`FileSource` and `ParquetColumnarRowInputFormat`) have
already been in the code base since 1.12.x. I don't know how stable it was
and how well it was working. I would suggest upgrading to Flink 1.14.1. As
a last resort you can try using the very least the latest version of 1.12.x
branch as documented by 1.14 version, but I can not guarantee that it will
be working.

Regarding the S3 issue, have you followed the documentation? [2][3]

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/filesystems/s3.html


pt., 17 gru 2021 o 10:10 Alexandre Montecucco <
alexandre.montecu...@grabtaxi.com> napisał(a):

> Hello everyone,
> I am struggling to read S3 parquet files from S3 with Flink Streaming
> 1.12.2
> I had some difficulty simply reading from local parquet files. I finally
> managed that part, though the solution feels dirty:
> - I use the readFile function + ParquetInputFormat abstract class (that is
> protected) (as I could not find a way to use the public
> ParquetRowInputFormat).
> - the open function, in ParquetInputFormat is
> using org.apache.hadoop.conf.Configuration. I am not sure which import to
> add. It seems the flink-parquet library is importing the dependency from
> hadoop-common but the dep is marked as provided. THe doc only shows usage
> of flink-parquet from Flink SQL. So I am under the impression that this
> might not work in the streaming case without extra code. I 'solved' this by
> adding a dependency to hadoop-common. We did something similar to write
> parquet data to S3.
>
> Now, when trying to run the application to read from S3, I get an
> exception with root cause:
> ```
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "s3"
> ```
> I guess there are some issues with hadoop-common not knowing about the
> flink-s3-hadoop plugin setup. But I ran out of ideas on how to solve this.
>
>
> I also noticed there were some changes with flink-parquet in Flink 1.14,
> but I had some issues with simply reading data (but I did not investigate
> so deeply for that version).
>
> Many thanks for any help.
> --
>
> [image: Grab] 
>
> [image: Twitter]   [image: Facebook]
>  [image: LinkedIn]
>  [image: Instagram]
>  [image: Youtube]
> 
>
> Alexandre Montecucco / Grab, Software Developer
> alexandre.montecu...@grab.com  / 8782 0937
>
> Grab
> 138 Cecil Street, Cecil Court #01-01Singapore 069538
> https://www.grab.com/ 
>
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to the processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email Please notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>


Read parquet data from S3 with Flink 1.12

2021-12-17 Thread Alexandre Montecucco
Hello everyone,
I am struggling to read S3 parquet files from S3 with Flink Streaming 1.12.2
I had some difficulty simply reading from local parquet files. I finally
managed that part, though the solution feels dirty:
- I use the readFile function + ParquetInputFormat abstract class (that is
protected) (as I could not find a way to use the public
ParquetRowInputFormat).
- the open function, in ParquetInputFormat is
using org.apache.hadoop.conf.Configuration. I am not sure which import to
add. It seems the flink-parquet library is importing the dependency from
hadoop-common but the dep is marked as provided. THe doc only shows usage
of flink-parquet from Flink SQL. So I am under the impression that this
might not work in the streaming case without extra code. I 'solved' this by
adding a dependency to hadoop-common. We did something similar to write
parquet data to S3.

Now, when trying to run the application to read from S3, I get an exception
with root cause:
```
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "s3"
```
I guess there are some issues with hadoop-common not knowing about the
flink-s3-hadoop plugin setup. But I ran out of ideas on how to solve this.


I also noticed there were some changes with flink-parquet in Flink 1.14,
but I had some issues with simply reading data (but I did not investigate
so deeply for that version).

Many thanks for any help.
--

[image: Grab] 

[image: Twitter]   [image: Facebook]
 [image: LinkedIn]
 [image: Instagram]
 [image: Youtube]


Alexandre Montecucco / Grab, Software Developer
alexandre.montecu...@grab.com  / 8782 0937

Grab
138 Cecil Street, Cecil Court #01-01Singapore 069538
https://www.grab.com/ 

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 



This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.



Re: Unified Source Interface in flink 1.12

2021-12-16 Thread Caizhi Weng
Hi!

It is possible for Flink 1.12. A major example is the Hive source [1].

[1] https://issues.apache.org/jira/browse/FLINK-19888

Krzysztof Chmielewski  于2021年12月17日周五
06:56写道:

> Hi,
> I know that FLIP-27 [1] was released in version 1.12 and I know that
> currently (version 1.14) we can easily use a custom source connector that
> implements new unified source interface as a corner stone for Table Source
> Connector in SQL Api.
>
> My question is, does version 1.12 also allows for using Source interface
> implementation for Table Source as it is for version 1.14 or it was added
> in post 1.12 versions?
>
> After my very quick research based on FileSystemTable source it seems it
> is possible in version 1.12. Please correct me if I'm wrong.
>
> Regards,
> Krzysztof Chmielewski
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>


Unified Source Interface in flink 1.12

2021-12-16 Thread Krzysztof Chmielewski
Hi,
I know that FLIP-27 [1] was released in version 1.12 and I know that
currently (version 1.14) we can easily use a custom source connector that
implements new unified source interface as a corner stone for Table Source
Connector in SQL Api.

My question is, does version 1.12 also allows for using Source interface
implementation for Table Source as it is for version 1.14 or it was added
in post 1.12 versions?

After my very quick research based on FileSystemTable source it seems it is
possible in version 1.12. Please correct me if I'm wrong.

Regards,
Krzysztof Chmielewski


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-12-01 Thread Hang Ruan
Sorry, I spell it wrong, which I mean the PR. Here it is
https://github.com/apache/flink/pull/17276 .

Marco Villalobos  于2021年12月1日周三 下午9:18写道:

> Thank you. One last question.  What is an RP? Where can I read it?
>
> Marco
>
> On Nov 30, 2021, at 11:06 PM, Hang Ruan  wrote:
>
> Hi,
>
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
> is open is the default behavior in KafkaSourceBuilder. And it can not be
> changed in KafkaSourceBuilder.
>
> By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>,
> we could change the behavior. This problem will be fixed in 1.12.6. It
> seems not to be contained in your version.
>
> Reading the RP will be helpful for you to understand the behavior.
>
>
> Marco Villalobos  于2021年12月1日周三 上午3:43写道:
>
>> Thanks!
>>
>> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
>> does not exist in Flink 1.12.
>>
>> Is that property supported with the string
>> "commit.offsets.on.checkpoints"?
>>
>> How do I configure that behavior so that offsets get committed on
>> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
>> default behavior with checkpoints?
>>
>>
>>
>>
>> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:
>>
>>> Hi,
>>>
>>> Maybe you can write like this :
>>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>>> "true");
>>>
>>> Other additional properties could be found here :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>>
>>> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>>>
>>>> Thank you for the information.  That still does not answer my question
>>>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>>>> that consumer should commit offsets back to Kafka on checkpoints?
>>>>
>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>>>> method.
>>>>
>>>> But now that I am using KafkaSourceBuilder, how do I configure that
>>>> behavior so that offsets get committed on checkpoints?  Or is that the
>>>> default behavior with checkpoints?
>>>>
>>>> -Marco
>>>>
>>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng 
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Flink 1.14 release note states about this. See [1].
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>>>
>>>>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>>>>
>>>>>> Hi everybody,
>>>>>>
>>>>>> I am using Flink 1.12 and migrating my code from using
>>>>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>>>>
>>>>>> FlinkKafkaConsumer has the method
>>>>>>
>>>>>> /**
>>>>>>>  * Specifies whether or not the consumer should commit offsets back
>>>>>>> to Kafka on checkpoints.
>>>>>>>  * This setting will only have effect if checkpointing is enabled
>>>>>>> for the job. If checkpointing isn't
>>>>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>>>>> "enable.auto.commit" (for 0.9+) property
>>>>>>>  * settings will be used.
>>>>>>> */
>>>>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>>>>
>>>>>>
>>>>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>>>>> already have checkpointing configured, is it necessary to setup "commit
>>>>>> offsets on checkpoints"?
>>>>>>
>>>>>> The Flink 1.12 documentation does not discuss this topic, and the
>>>>>> Flink 1.14 documentation says little about it.
>>>>>>
>>>>>>  For example, the Flink 1.14 documentation states:
>>>>>>
>>>>>> Additional Properties
>>>>>>> In addition to properties described above, you can set arbitrary
>>>>>>> properties for KafkaSource and KafkaConsumer by using
>>>>>>> setProperties(Properties) and setProperty(String, String). KafkaSource 
>>>>>>> has
>>>>>>> following options for configuration:
>>>>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>>>>> offsets to Kafka brokers on checkpoint
>>>>>>
>>>>>>
>>>>>> And the 1.12 documentation states:
>>>>>>
>>>>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>>>>> offsets, together with the state of other operations. In case of a job
>>>>>>> failure, Flink will restore the streaming program to the state of the
>>>>>>> latest checkpoint and re-consume the records from Kafka, starting from 
>>>>>>> the
>>>>>>> offsets that were stored in the checkpoint.
>>>>>>> The interval of drawing checkpoints therefore defines how much the
>>>>>>> program may have to go back at most, in case of a failure. To use fault
>>>>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>>>>>>> enabled
>>>>>>> in the job.
>>>>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>>>>> commit the offsets to Zookeeper.
>>>>>>
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>> Marco
>>>>>>
>>>>>>
>>>>>>
>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-12-01 Thread Marco Villalobos
Thank you. One last question.  What is an RP? Where can I read it?

Marco

> On Nov 30, 2021, at 11:06 PM, Hang Ruan  wrote:
> 
> Hi,
> 
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint is 
> open is the default behavior in KafkaSourceBuilder. And it can not be changed 
> in KafkaSourceBuilder. 
> 
> By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>, we 
> could change the behavior. This problem will be fixed in 1.12.6. It seems not 
> to be contained in your version.  
> 
> Reading the RP will be helpful for you to understand the behavior.
>  
> 
> Marco Villalobos  <mailto:mvillalo...@kineteque.com>> 于2021年12月1日周三 上午3:43写道:
> Thanks! 
> 
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT does 
> not exist in Flink 1.12.
> 
> Is that property supported with the string "commit.offsets.on.checkpoints"?
> 
> How do I configure that behavior so that offsets get committed on checkpoints 
> in Flink 1.12 when using the KafkaSourceBuilder? Or is that the default 
> behavior with checkpoints?
> 
> 
> 
> 
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  <mailto:ruanhang1...@gmail.com>> wrote:
> Hi, 
> 
> Maybe you can write like this : 
> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), 
> "true");
> 
> Other additional properties could be found here : 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>  
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties>
> Marco Villalobos  <mailto:mvillalo...@kineteque.com>> 于2021年11月30日周二 上午11:08写道:
> Thank you for the information.  That still does not answer my question 
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so 
> that consumer should commit offsets back to Kafka on checkpoints?
> 
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method. 
> 
> But now that I am using KafkaSourceBuilder, how do I configure that behavior 
> so that offsets get committed on checkpoints?  Or is that the default 
> behavior with checkpoints?
> 
> -Marco
> 
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  <mailto:tsreape...@gmail.com>> wrote:
> Hi!
> 
> Flink 1.14 release note states about this. See [1].
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>  
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer>
> Marco Villalobos  <mailto:mvillalo...@kineteque.com>> 于2021年11月30日周二 上午7:12写道:
> Hi everybody,
> 
> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer to 
> using the KafkaSourceBuilder.
> 
> FlinkKafkaConsumer has the method 
> 
> /**
>  * Specifies whether or not the consumer should commit offsets back to Kafka 
> on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for the 
> job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" 
> (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
> 
> How do I setup that parameter when using the KafkaSourceBuilder? If I already 
> have checkpointing configured, is it necessary to setup "commit offsets on 
> checkpoints"?
> 
> The Flink 1.12 documentation does not discuss this topic, and the Flink 1.14 
> documentation says little about it.
> 
>  For example, the Flink 1.14 documentation states:
> 
> Additional Properties
> In addition to properties described above, you can set arbitrary properties 
> for KafkaSource and KafkaConsumer by using setProperties(Properties) and 
> setProperty(String, String). KafkaSource has following options for 
> configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming offsets to 
> Kafka brokers on checkpoint
> 
> And the 1.12 documentation states:
> 
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume 
> records from a topic and periodically checkpoint all its Kafka offsets, 
> together with the state of other operations. In case of a job failure, Flink 
> will restore the streaming program to the state of the latest checkpoint and 
> re-consume the records from Kafka, starting from the offsets that were stored 
> in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the program 
> may have to go back at most, in case of a failure. To use fault tolerant 
> Kafka Consumers, checkpointing of the topology needs to be enabled in the job.
> If checkpointing is disabled, the Kafka consumer will periodically commit the 
> offsets to Zookeeper.
> 
> Thank you.
> 
> Marco
> 
> 



Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-30 Thread Hang Ruan
Hi,

In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
is open is the default behavior in KafkaSourceBuilder. And it can not be
changed in KafkaSourceBuilder.

By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>, we
could change the behavior. This problem will be fixed in 1.12.6. It seems
not to be contained in your version.

Reading the RP will be helpful for you to understand the behavior.


Marco Villalobos  于2021年12月1日周三 上午3:43写道:

> Thanks!
>
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
> does not exist in Flink 1.12.
>
> Is that property supported with the string "commit.offsets.on.checkpoints"?
>
> How do I configure that behavior so that offsets get committed on
> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
> default behavior with checkpoints?
>
>
>
>
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:
>
>> Hi,
>>
>> Maybe you can write like this :
>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>> "true");
>>
>> Other additional properties could be found here :
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>
>> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>>
>>> Thank you for the information.  That still does not answer my question
>>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>>> that consumer should commit offsets back to Kafka on checkpoints?
>>>
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>>> method.
>>>
>>> But now that I am using KafkaSourceBuilder, how do I configure that
>>> behavior so that offsets get committed on checkpoints?  Or is that the
>>> default behavior with checkpoints?
>>>
>>> -Marco
>>>
>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng 
>>> wrote:
>>>
>>>> Hi!
>>>>
>>>> Flink 1.14 release note states about this. See [1].
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>>
>>>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>>>
>>>>> Hi everybody,
>>>>>
>>>>> I am using Flink 1.12 and migrating my code from using
>>>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>>>
>>>>> FlinkKafkaConsumer has the method
>>>>>
>>>>> /**
>>>>>>  * Specifies whether or not the consumer should commit offsets back
>>>>>> to Kafka on checkpoints.
>>>>>>  * This setting will only have effect if checkpointing is enabled for
>>>>>> the job. If checkpointing isn't
>>>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>>>> "enable.auto.commit" (for 0.9+) property
>>>>>>  * settings will be used.
>>>>>> */
>>>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>>>
>>>>>
>>>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>>>> already have checkpointing configured, is it necessary to setup "commit
>>>>> offsets on checkpoints"?
>>>>>
>>>>> The Flink 1.12 documentation does not discuss this topic, and the
>>>>> Flink 1.14 documentation says little about it.
>>>>>
>>>>>  For example, the Flink 1.14 documentation states:
>>>>>
>>>>> Additional Properties
>>>>>> In addition to properties described above, you can set arbitrary
>>>>>> properties for KafkaSource and KafkaConsumer by using
>>>>>> setProperties(Properties) and setProperty(String, String). KafkaSource 
>>>>>> has
>>>>>> following options for configuration:
>>>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>>>> offsets to Kafka brokers on checkpoint
>>>>>
>>>>>
>>>>> And the 1.12 documentation states:
>>>>>
>>>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>>>> offsets, together with the state of other operations. In case of a job
>>>>>> failure, Flink will restore the streaming program to the state of the
>>>>>> latest checkpoint and re-consume the records from Kafka, starting from 
>>>>>> the
>>>>>> offsets that were stored in the checkpoint.
>>>>>> The interval of drawing checkpoints therefore defines how much the
>>>>>> program may have to go back at most, in case of a failure. To use fault
>>>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>>>>>> enabled
>>>>>> in the job.
>>>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>>>> commit the offsets to Zookeeper.
>>>>>
>>>>>
>>>>> Thank you.
>>>>>
>>>>> Marco
>>>>>
>>>>>
>>>>>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-30 Thread Marco Villalobos
Thanks!

However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
does not exist in Flink 1.12.

Is that property supported with the string "commit.offsets.on.checkpoints"?

How do I configure that behavior so that offsets get committed on
checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
default behavior with checkpoints?




On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:

> Hi,
>
> Maybe you can write like this :
> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
> "true");
>
> Other additional properties could be found here :
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>
> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>
>> Thank you for the information.  That still does not answer my question
>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>> that consumer should commit offsets back to Kafka on checkpoints?
>>
>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>> method.
>>
>> But now that I am using KafkaSourceBuilder, how do I configure that
>> behavior so that offsets get committed on checkpoints?  Or is that the
>> default behavior with checkpoints?
>>
>> -Marco
>>
>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> Flink 1.14 release note states about this. See [1].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>
>>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>>
>>>> Hi everybody,
>>>>
>>>> I am using Flink 1.12 and migrating my code from using
>>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>>
>>>> FlinkKafkaConsumer has the method
>>>>
>>>> /**
>>>>>  * Specifies whether or not the consumer should commit offsets back to
>>>>> Kafka on checkpoints.
>>>>>  * This setting will only have effect if checkpointing is enabled for
>>>>> the job. If checkpointing isn't
>>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>>> "enable.auto.commit" (for 0.9+) property
>>>>>  * settings will be used.
>>>>> */
>>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>>
>>>>
>>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>>> already have checkpointing configured, is it necessary to setup "commit
>>>> offsets on checkpoints"?
>>>>
>>>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>>>> 1.14 documentation says little about it.
>>>>
>>>>  For example, the Flink 1.14 documentation states:
>>>>
>>>> Additional Properties
>>>>> In addition to properties described above, you can set arbitrary
>>>>> properties for KafkaSource and KafkaConsumer by using
>>>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>>>> following options for configuration:
>>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>>> offsets to Kafka brokers on checkpoint
>>>>
>>>>
>>>> And the 1.12 documentation states:
>>>>
>>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>>> offsets, together with the state of other operations. In case of a job
>>>>> failure, Flink will restore the streaming program to the state of the
>>>>> latest checkpoint and re-consume the records from Kafka, starting from the
>>>>> offsets that were stored in the checkpoint.
>>>>> The interval of drawing checkpoints therefore defines how much the
>>>>> program may have to go back at most, in case of a failure. To use fault
>>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>>>>> enabled
>>>>> in the job.
>>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>>> commit the offsets to Zookeeper.
>>>>
>>>>
>>>> Thank you.
>>>>
>>>> Marco
>>>>
>>>>
>>>>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Hang Ruan
Hi,

Maybe you can write like this :
builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
"true");

Other additional properties could be found here :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties

Marco Villalobos  于2021年11月30日周二 上午11:08写道:

> Thank you for the information.  That still does not answer my question
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
> that consumer should commit offsets back to Kafka on checkpoints?
>
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.
>
> But now that I am using KafkaSourceBuilder, how do I configure that
> behavior so that offsets get committed on checkpoints?  Or is that the
> default behavior with checkpoints?
>
> -Marco
>
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> Flink 1.14 release note states about this. See [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>
>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>
>>> Hi everybody,
>>>
>>> I am using Flink 1.12 and migrating my code from using
>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>
>>> FlinkKafkaConsumer has the method
>>>
>>> /**
>>>>  * Specifies whether or not the consumer should commit offsets back to
>>>> Kafka on checkpoints.
>>>>  * This setting will only have effect if checkpointing is enabled for
>>>> the job. If checkpointing isn't
>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>> "enable.auto.commit" (for 0.9+) property
>>>>  * settings will be used.
>>>> */
>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>
>>>
>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>> already have checkpointing configured, is it necessary to setup "commit
>>> offsets on checkpoints"?
>>>
>>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>>> 1.14 documentation says little about it.
>>>
>>>  For example, the Flink 1.14 documentation states:
>>>
>>> Additional Properties
>>>> In addition to properties described above, you can set arbitrary
>>>> properties for KafkaSource and KafkaConsumer by using
>>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>>> following options for configuration:
>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>> offsets to Kafka brokers on checkpoint
>>>
>>>
>>> And the 1.12 documentation states:
>>>
>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>> offsets, together with the state of other operations. In case of a job
>>>> failure, Flink will restore the streaming program to the state of the
>>>> latest checkpoint and re-consume the records from Kafka, starting from the
>>>> offsets that were stored in the checkpoint.
>>>> The interval of drawing checkpoints therefore defines how much the
>>>> program may have to go back at most, in case of a failure. To use fault
>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>>>> in the job.
>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>> commit the offsets to Zookeeper.
>>>
>>>
>>> Thank you.
>>>
>>> Marco
>>>
>>>
>>>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Marco Villalobos
Thank you for the information.  That still does not answer my question
though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
that consumer should commit offsets back to Kafka on checkpoints?

FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.

But now that I am using KafkaSourceBuilder, how do I configure that
behavior so that offsets get committed on checkpoints?  Or is that the
default behavior with checkpoints?

-Marco

On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:

> Hi!
>
> Flink 1.14 release note states about this. See [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>
> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>
>> Hi everybody,
>>
>> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
>> to using the KafkaSourceBuilder.
>>
>> FlinkKafkaConsumer has the method
>>
>> /**
>>>  * Specifies whether or not the consumer should commit offsets back to
>>> Kafka on checkpoints.
>>>  * This setting will only have effect if checkpointing is enabled for
>>> the job. If checkpointing isn't
>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>> "enable.auto.commit" (for 0.9+) property
>>>  * settings will be used.
>>> */
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>
>>
>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>> already have checkpointing configured, is it necessary to setup "commit
>> offsets on checkpoints"?
>>
>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>> 1.14 documentation says little about it.
>>
>>  For example, the Flink 1.14 documentation states:
>>
>> Additional Properties
>>> In addition to properties described above, you can set arbitrary
>>> properties for KafkaSource and KafkaConsumer by using
>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>> following options for configuration:
>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>> offsets to Kafka brokers on checkpoint
>>
>>
>> And the 1.12 documentation states:
>>
>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
>>> records from a topic and periodically checkpoint all its Kafka offsets,
>>> together with the state of other operations. In case of a job failure,
>>> Flink will restore the streaming program to the state of the latest
>>> checkpoint and re-consume the records from Kafka, starting from the offsets
>>> that were stored in the checkpoint.
>>> The interval of drawing checkpoints therefore defines how much the
>>> program may have to go back at most, in case of a failure. To use fault
>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>>> in the job.
>>> If checkpointing is disabled, the Kafka consumer will periodically
>>> commit the offsets to Zookeeper.
>>
>>
>> Thank you.
>>
>> Marco
>>
>>
>>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Caizhi Weng
Hi!

Flink 1.14 release note states about this. See [1].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer

Marco Villalobos  于2021年11月30日周二 上午7:12写道:

> Hi everybody,
>
> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
> to using the KafkaSourceBuilder.
>
> FlinkKafkaConsumer has the method
>
> /**
>>  * Specifies whether or not the consumer should commit offsets back to
>> Kafka on checkpoints.
>>  * This setting will only have effect if checkpointing is enabled for the
>> job. If checkpointing isn't
>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>> "enable.auto.commit" (for 0.9+) property
>>  * settings will be used.
>> */
>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>
>
> How do I setup that parameter when using the KafkaSourceBuilder? If I
> already have checkpointing configured, is it necessary to setup "commit
> offsets on checkpoints"?
>
> The Flink 1.12 documentation does not discuss this topic, and the Flink
> 1.14 documentation says little about it.
>
>  For example, the Flink 1.14 documentation states:
>
> Additional Properties
>> In addition to properties described above, you can set arbitrary
>> properties for KafkaSource and KafkaConsumer by using
>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>> following options for configuration:
>> commit.offsets.on.checkpoint specifies whether to commit consuming
>> offsets to Kafka brokers on checkpoint
>
>
> And the 1.12 documentation states:
>
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
>> records from a topic and periodically checkpoint all its Kafka offsets,
>> together with the state of other operations. In case of a job failure,
>> Flink will restore the streaming program to the state of the latest
>> checkpoint and re-consume the records from Kafka, starting from the offsets
>> that were stored in the checkpoint.
>> The interval of drawing checkpoints therefore defines how much the
>> program may have to go back at most, in case of a failure. To use fault
>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>> in the job.
>> If checkpointing is disabled, the Kafka consumer will periodically commit
>> the offsets to Zookeeper.
>
>
> Thank you.
>
> Marco
>
>
>


How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Marco Villalobos
Hi everybody,

I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
to using the KafkaSourceBuilder.

FlinkKafkaConsumer has the method

/**
>  * Specifies whether or not the consumer should commit offsets back to
> Kafka on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for the
> job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit"
> (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)


How do I setup that parameter when using the KafkaSourceBuilder? If I
already have checkpointing configured, is it necessary to setup "commit
offsets on checkpoints"?

The Flink 1.12 documentation does not discuss this topic, and the Flink
1.14 documentation says little about it.

 For example, the Flink 1.14 documentation states:

Additional Properties
> In addition to properties described above, you can set arbitrary
> properties for KafkaSource and KafkaConsumer by using
> setProperties(Properties) and setProperty(String, String). KafkaSource has
> following options for configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming offsets
> to Kafka brokers on checkpoint


And the 1.12 documentation states:

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
> records from a topic and periodically checkpoint all its Kafka offsets,
> together with the state of other operations. In case of a job failure,
> Flink will restore the streaming program to the state of the latest
> checkpoint and re-consume the records from Kafka, starting from the offsets
> that were stored in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the program
> may have to go back at most, in case of a failure. To use fault tolerant
> Kafka Consumers, checkpointing of the topology needs to be enabled in the
> job.
> If checkpointing is disabled, the Kafka consumer will periodically commit
> the offsets to Zookeeper.


Thank you.

Marco


Re: OrcTableSource in flink 1.12

2021-10-28 Thread Nikola Hrusov
Hello,

I am still looking into that same issue and I am not sure how to continue
forward.

We want to upgrade to the latest versions of flink (1.14) and I couldn't
find any examples on how to properly do that.

Does anybody help me on translating the example from the docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/orc/OrcTableSource.html
to a way which is not deprecated?

Regards
,
Nikola

On Tue, Mar 23, 2021 at 12:35 PM Timo Walther  wrote:

> Hi Nikola,
>
> for the ORC source it is fine to use `TableEnvironment#fromTableSource`.
> It is true that this method is deprecated, but as I said not all
> connectors have been ported to be supported in the SQL DDL via string
> properties. Therefore, `TableEnvironment#fromTableSource` is still
> accessible until all connectors are support in the DDL.
>
> Btw it might also make sense to look into the Hive connector for reading
> ORC.
>
> Regards,
> Timo
>
> On 22.03.21 18:02, Nikola Hrusov wrote:
> > Hi Timo,
> >
> > I need to read ORC files and run a query on them as in the example
> > above. Since the example given in docs is not recommended what should I
> use?
> >
> > I looked into the method you suggest - TableEnvironment#fromTableSource
> > - it shows as Deprecated on the docs:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/api/TableEnvironment.html#fromTableSource-org.apache.flink.table.sources.TableSource-
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/api/TableEnvironment.html#fromTableSource-org.apache.flink.table.sources.TableSource-
> >
> >
> > However, it doesn't say what I should use instead?
> >
> > I have looked in all the docs available for 1.12 but I cannot find how
> > to achieve the same result as it was in some previous versions. In some
> > previous versions you could define
> > `tableEnv.registerTableSource(tableName, orcTableSource);` but that
> > method is not available anymore.
> >
> > What is the way to go from here? I would like to read from orc files,
> > run a query and transform the result. I do not necessarily need it to be
> > with the DataSet API.
> >
> > Regards
> > ,
> > Nikola
> >
> > On Mon, Mar 22, 2021 at 6:49 PM Timo Walther  > > wrote:
> >
> > Hi Nikola,
> >
> >
> > the OrcTableSource has not been updated to be used in a SQL DDL. You
> > can
> > define your own table factory [1] that translates properties into a
> > object to create instances or use
> > `org.apache.flink.table.api.TableEnvironment#fromTableSource`. I
> > recommend the latter option.
> >
> > Please keep in mind that we are about to drop DataSet support for
> Table
> > API in 1.13. Batch and streaming use cases are already possible with
> > the
> > unified TableEnvironment.
> >
> > Are you sure that you really need DataSet API?
> >
> > Regards,
> > Timo
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sourcessinks/
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sourcessinks/
> >
> >
> > On 21.03.21 15:42, Nikola Hrusov wrote:
> >  > Hello,
> >  >
> >  > I am trying to find some examples of how to use the
> > OrcTableSource and
> >  > query it.
> >  > I got to the documentation here:
> >  >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/orc/OrcTableSource.html
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/orc/OrcTableSource.html
> >
> >
> >  >
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/orc/OrcTableSource.html
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/orc/OrcTableSource.html
> >>
> >
> >  > and it says that an OrcTableSource is used as below:
> >  >
> >  > |OrcTableSource orcSrc = OrcTableSource.builder()
> >  > .path("file:///my/data/file.orc")
> >  >
> >
>  .forOrcSchema("struct")
> > .build();
> >  > tEnv.registerTableSourceInternal("orcTable", orcSrc); Table res =
> >  > tableEnv.sqlQuery("SELECT * FROM orcTable"); |
> >  >
> >  >
> >  > My question is what should tEnv be so that I can use
> >  > the registerTableSourceInternal method?
> >  > My end goal is to query the orc source and then return a DataSet.
> >  >
> >  > Regards
> >  > ,
> >  > Nikola
> >
>
>


Re: Flink-1.12 Sql on Job two SQL sink control order

2021-10-18 Thread Arvid Heise
Are you running in Batch? Then you probably need to write 2 SQL jobs (or
statements).

In streaming, the notion of order doesn't make much sense. But maybe I
misunderstood your use case.

On Thu, Oct 14, 2021 at 11:37 AM Francesco Guardiani <
france...@ververica.com> wrote:

> I'm not aware of any way to control the sink order, afaik each
> Table#executeInsert will generate a separate job on its own. You may be
> able to hack it around by having a custom DynamicTableSink that for each
> record sends it to tidb and then to kafka.
>
> May I ask why you need that? If the notification system after the Kafka
> sink depends on tidb, perhaps you need a retry system there that can wait
> for tidb to ingest and process those data?
>
> On Thu, Oct 14, 2021 at 10:40 AM WuKong  wrote:
>
>> Hi all:
>>  I have two Flink SQL , the same source  from Kafka,  and one SQL
>> sink data into Tidb ,another one SQL sink Kafka to notify downstream
>> system, how can I control the sink order , I wish If source Kafka data
>> come, first sink Tidb and after that sink Kafka .
>>
>> --
>> ---
>> Best,
>> WuKong
>>
>


Re: Flink-1.12 Sql on Job two SQL sink control order

2021-10-14 Thread Francesco Guardiani
I'm not aware of any way to control the sink order, afaik each
Table#executeInsert will generate a separate job on its own. You may be
able to hack it around by having a custom DynamicTableSink that for each
record sends it to tidb and then to kafka.

May I ask why you need that? If the notification system after the Kafka
sink depends on tidb, perhaps you need a retry system there that can wait
for tidb to ingest and process those data?

On Thu, Oct 14, 2021 at 10:40 AM WuKong  wrote:

> Hi all:
>  I have two Flink SQL , the same source  from Kafka,  and one SQL
> sink data into Tidb ,another one SQL sink Kafka to notify downstream
> system, how can I control the sink order , I wish If source Kafka data
> come, first sink Tidb and after that sink Kafka .
>
> --
> ---
> Best,
> WuKong
>


Flink-1.12 Sql on Job two SQL sink control order

2021-10-14 Thread WuKong
Hi all:
 I have two Flink SQL , the same source  from Kafka,  and one SQL sink data 
into Tidb ,another one SQL sink Kafka to notify downstream system, how can I 
control the sink order , I wish If source Kafka data come, first sink Tidb and 
after that sink Kafka .



---
Best,
WuKong


Re: IAM Roles with Service Account on Flink 1.12 Running on Kubernetes - Seeing Errors

2021-09-18 Thread Yang Wang
It seems that the application failed to submit the job due to akka timeout,
not about the service account.
It will help a lot to debug the root cause if you could share the full
JobManager logs.

If the JobManager does not have enough permissions to create TaskManager
pods and watch these pods, you could
find explicit information in the JobManager logs.

Best,
Yang

Rayan Ahmed  于2021年9月15日周三 下午1:47写道:

> Hi,
>
> I am trying to use IAM Roles with Service Accounts on Flink 1.12 running
> on Kubernetes. Previously I was using KIAM to provide identification to the
> pods and that works fine. However, when switching to use IRSA, I see the
> following errors (posted below). Has anyone experienced a similar issue and
> if so, what is the resolution? Thanks in advance!
>
> org.apache.flink.util.FlinkException: Failed to execute job 'test-01'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:155)
> at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
> at
> com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.run(GracefulLifecycleManager.java:16)
> at
> com.intuit.strmprocess.sdk.core.SppBaseProcessor.run(SppBaseProcessor.java:83)
> at
> com.intuit.spp.example.SampleProcessor.main(SampleProcessor.java:27)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.TimeoutException: Invocation of public
> abstract java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.dispatcher.DispatcherGateway.submitJob(org.apache.flink.runtime.jobgraph.JobGraph,org.apache.flink.api.common.time.Time)
> timed out.
> at org.apache.flink.runtime.rpc.akka.$Proxy45.submitJob(Unknown
> Source)
> at
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:183)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at
> org.apache.

IAM Roles with Service Account on Flink 1.12 Running on Kubernetes - Seeing Errors

2021-09-15 Thread Rayan Ahmed
Hi, 

I am trying to use IAM Roles with Service Accounts on Flink 1.12 running on 
Kubernetes. Previously I was using KIAM to provide identification to the pods 
and that works fine. However, when switching to use IRSA, I see the following 
errors (posted below). Has anyone experienced a similar issue and if so, what 
is the resolution? Thanks in advance! 

org.apache.flink.util.FlinkException: Failed to execute job 'test-01'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:155)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
at 
com.intuit.strmprocess.sdk.core.lifecycles.GracefulLifecycleManager.run(GracefulLifecycleManager.java:16)
at 
com.intuit.strmprocess.sdk.core.SppBaseProcessor.run(SppBaseProcessor.java:83)
at com.intuit.spp.example.SampleProcessor.main(SampleProcessor.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Invocation of public abstract 
java.util.concurrent.CompletableFuture 
org.apache.flink.runtime.dispatcher.DispatcherGateway.submitJob(org.apache.flink.runtime.jobgraph.JobGraph,org.apache.flink.api.common.time.Time)
 timed out.
at org.apache.flink.runtime.rpc.akka.$Proxy45.submitJob(Unknown Source)
at 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:183)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)

Re: Taskmanager killed often after migrating to flink 1.12

2021-04-29 Thread Till Rohrmann
Great, thanks for the update.

On Wed, Apr 28, 2021 at 7:08 PM Sambaran  wrote:

> Hi Till,
>
> Thank you for the response, we are currently running flink with an
> increased memory usage, so far the taskmanager is working fine, we will
> check if there is any further issue and will update you.
>
> Regards
> Sambaran
>
> On Wed, Apr 28, 2021 at 5:33 PM Till Rohrmann 
> wrote:
>
>> Hi Sambaran,
>>
>> could you also share the cause why the checkpoints could not be discarded
>> with us?
>>
>> With Flink 1.10, we introduced a stricter memory model for the
>> TaskManagers. That could be a reason why you see more TaskManagers being
>> killed by the underlying resource management system. You could maybe check
>> whether your resource management system logs that some containers/pods are
>> exceeding their memory limitations. If this is the case, then you should
>> give your Flink processes a bit more memory [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html
>>
>> Cheers,
>> Till
>>
>> On Tue, Apr 27, 2021 at 6:48 PM Sambaran  wrote:
>>
>>> Hi there,
>>>
>>> We have recently migrated to flink 1.12 from 1.7, although the jobs are
>>> running fine, sometimes the task manager is getting killed (much frequently
>>> 2-3 times a day).
>>>
>>> Logs:
>>> INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
>>> RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>>>
>>> While checking more logs we see flink not able to discard old checkpoints
>>> org.apache.flink.runtime.checkpoint.CheckpointsCleaner   [] - Could
>>> not discard completed checkpoint 173.
>>>
>>> We are not sure of what is the reason here, has anyone faced this before?
>>>
>>> Regards
>>> Sambaran
>>>
>>


Re: Taskmanager killed often after migrating to flink 1.12

2021-04-28 Thread Sambaran
Hi Till,

Thank you for the response, we are currently running flink with an
increased memory usage, so far the taskmanager is working fine, we will
check if there is any further issue and will update you.

Regards
Sambaran

On Wed, Apr 28, 2021 at 5:33 PM Till Rohrmann  wrote:

> Hi Sambaran,
>
> could you also share the cause why the checkpoints could not be discarded
> with us?
>
> With Flink 1.10, we introduced a stricter memory model for the
> TaskManagers. That could be a reason why you see more TaskManagers being
> killed by the underlying resource management system. You could maybe check
> whether your resource management system logs that some containers/pods are
> exceeding their memory limitations. If this is the case, then you should
> give your Flink processes a bit more memory [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html
>
> Cheers,
> Till
>
> On Tue, Apr 27, 2021 at 6:48 PM Sambaran  wrote:
>
>> Hi there,
>>
>> We have recently migrated to flink 1.12 from 1.7, although the jobs are
>> running fine, sometimes the task manager is getting killed (much frequently
>> 2-3 times a day).
>>
>> Logs:
>> INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
>> RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>>
>> While checking more logs we see flink not able to discard old checkpoints
>> org.apache.flink.runtime.checkpoint.CheckpointsCleaner   [] - Could
>> not discard completed checkpoint 173.
>>
>> We are not sure of what is the reason here, has anyone faced this before?
>>
>> Regards
>> Sambaran
>>
>


Re: Taskmanager killed often after migrating to flink 1.12

2021-04-28 Thread Till Rohrmann
Hi Sambaran,

could you also share the cause why the checkpoints could not be discarded
with us?

With Flink 1.10, we introduced a stricter memory model for the
TaskManagers. That could be a reason why you see more TaskManagers being
killed by the underlying resource management system. You could maybe check
whether your resource management system logs that some containers/pods are
exceeding their memory limitations. If this is the case, then you should
give your Flink processes a bit more memory [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html

Cheers,
Till

On Tue, Apr 27, 2021 at 6:48 PM Sambaran  wrote:

> Hi there,
>
> We have recently migrated to flink 1.12 from 1.7, although the jobs are
> running fine, sometimes the task manager is getting killed (much frequently
> 2-3 times a day).
>
> Logs:
> INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
> RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>
> While checking more logs we see flink not able to discard old checkpoints
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner   [] - Could
> not discard completed checkpoint 173.
>
> We are not sure of what is the reason here, has anyone faced this before?
>
> Regards
> Sambaran
>


Taskmanager killed often after migrating to flink 1.12

2021-04-27 Thread Sambaran
Hi there,

We have recently migrated to flink 1.12 from 1.7, although the jobs are
running fine, sometimes the task manager is getting killed (much frequently
2-3 times a day).

Logs:
INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.

While checking more logs we see flink not able to discard old checkpoints
org.apache.flink.runtime.checkpoint.CheckpointsCleaner   [] - Could not
discard completed checkpoint 173.

We are not sure of what is the reason here, has anyone faced this before?

Regards
Sambaran


Re: Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-17 Thread Abu Bakar Siddiqur Rahman Rocky
Have you tried by using the command?

flink-1.12.2 ./bin/flink run examples/streaming/WordCount.jar --input
file://(file location) --output file://(file location)

for example:

./bin/flink run
/Users/abubakarsiddiqurrahman/Documents/flink/examples/streaming/WordCount.jar
--input 
file:///Users/abubakarsiddiqurrahman/Documents/flink/examples/streaming/input.txt
--output 
file:///Users/abubakarsiddiqurrahman/Documents/flink/examples/streaming/streamingoutput

I think it will be worked in that case.

Thank you




On Sat, Apr 17, 2021 at 1:17 PM Robert Metzger  wrote:

> Thanks a lot for the logs. I filed a ticket to track the issue:
> https://issues.apache.org/jira/browse/FLINK-22331
> I hope somebody with M1 hardware will soon have time to look into it.
>
> On Fri, Apr 16, 2021 at 11:02 AM Klemens Muthmann <
> klemens.muthm...@cyface.de> wrote:
>
>> Hi,
>>
>> I’ve appended you two log files. One is from a run without the Rosetta 2
>> compatibility layer while the other is with. As I said it would be great if
>> everything works without Rosetta 2, but at first it might be sufficient to
>> make it work with the compatibility layer.
>>
>> Regards
>>  Klemens
>>
>>
>> Am 15.04.2021 um 21:29 schrieb Robert Metzger :
>>
>> Hi,
>>
>> a DEBUG log of the client would indeed be nice.
>> Can you adjust this file:
>>
>> conf/log4j-cli.properties
>>
>> to the following contents: (basically TRACE logging with netty logs
>> enabled)
>>
>>
>>
>> 
>> #  Licensed to the Apache Software Foundation (ASF) under one
>> #  or more contributor license agreements.  See the NOTICE file
>> #  distributed with this work for additional information
>> #  regarding copyright ownership.  The ASF licenses this file
>> #  to you under the Apache License, Version 2.0 (the
>> #  "License"); you may not use this file except in compliance
>> #  with the License.  You may obtain a copy of the License at
>> #
>> #  http://www.apache.org/licenses/LICENSE-2.0
>> #
>> #  Unless required by applicable law or agreed to in writing, software
>> #  distributed under the License is distributed on an "AS IS" BASIS,
>> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> #  See the License for the specific language governing permissions and
>> # limitations under the License.
>>
>> 
>>
>> # Allows this configuration to be modified at runtime. The file will be
>> checked every 30 seconds.
>> monitorInterval=30
>>
>> rootLogger.level = TRACE
>> rootLogger.appenderRef.file.ref = FileAppender
>>
>> # Log all infos in the given file
>> appender.file.name = FileAppender
>> appender.file.type = FILE
>> appender.file.append = false
>> appender.file.fileName = ${sys:log.file}
>> appender.file.layout.type = PatternLayout
>> appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>> - %m%n
>>
>> # Log output from org.apache.flink.yarn to the console. This is used by
>> the
>> # CliFrontend class when using a per-job YARN cluster.
>> logger.yarn.name = org.apache.flink.yarn
>> logger.yarn.level = INFO
>> logger.yarn.appenderRef.console.ref = ConsoleAppender
>> logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
>> logger.yarncli.level = INFO
>> logger.yarncli.appenderRef.console.ref = ConsoleAppender
>> logger.hadoop.name = org.apache.hadoop
>> logger.hadoop.level = INFO
>> logger.hadoop.appenderRef.console.ref = ConsoleAppender
>>
>> # Make sure hive logs go to the file.
>> logger.hive.name = org.apache.hadoop.hive
>> logger.hive.level = INFO
>> logger.hive.additivity = false
>> logger.hive.appenderRef.file.ref = FileAppender
>>
>> # Log output from org.apache.flink.kubernetes to the console.
>> logger.kubernetes.name = org.apache.flink.kubernetes
>> logger.kubernetes.level = INFO
>> logger.kubernetes.appenderRef.console.ref = ConsoleAppender
>>
>> appender.console.name = ConsoleAppender
>> appender.console.type = CONSOLE
>> appender.console.layout.type = PatternLayout
>> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c
>> %x - %m%n
>>
>> # suppress the warning that hadoop native libraries are not loaded
>> (irrelevant for the client)
>> logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
>> logger.hadoopnative.level = OFF
>>
>> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>> #logger.netty.name =
>> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
>> #logger.netty.level = OFF
>>
>>
>>
>> And then submit a job locally, and send me the respective log file
>> (containing the "client" string in the file name).
>>
>> Thanks a lot, and stay healthy through the pandemic!
>>
>> Best,
>> Robert
>>
>>
>> On Thu, Apr 15, 2021 at 9:12 PM Klemens Muthmann <
>> klemens.muthm...@cyface.de> wrote:
>>
>>> Hi,
>>>
>>> Since kindergarden time is shortened due to the 

Re: Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-17 Thread Robert Metzger
Thanks a lot for the logs. I filed a ticket to track the issue:
https://issues.apache.org/jira/browse/FLINK-22331
I hope somebody with M1 hardware will soon have time to look into it.

On Fri, Apr 16, 2021 at 11:02 AM Klemens Muthmann <
klemens.muthm...@cyface.de> wrote:

> Hi,
>
> I’ve appended you two log files. One is from a run without the Rosetta 2
> compatibility layer while the other is with. As I said it would be great if
> everything works without Rosetta 2, but at first it might be sufficient to
> make it work with the compatibility layer.
>
> Regards
>  Klemens
>
>
> Am 15.04.2021 um 21:29 schrieb Robert Metzger :
>
> Hi,
>
> a DEBUG log of the client would indeed be nice.
> Can you adjust this file:
>
> conf/log4j-cli.properties
>
> to the following contents: (basically TRACE logging with netty logs
> enabled)
>
>
>
> 
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #  http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
>
> 
>
> # Allows this configuration to be modified at runtime. The file will be
> checked every 30 seconds.
> monitorInterval=30
>
> rootLogger.level = TRACE
> rootLogger.appenderRef.file.ref = FileAppender
>
> # Log all infos in the given file
> appender.file.name = FileAppender
> appender.file.type = FILE
> appender.file.append = false
> appender.file.fileName = ${sys:log.file}
> appender.file.layout.type = PatternLayout
> appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
> %m%n
>
> # Log output from org.apache.flink.yarn to the console. This is used by the
> # CliFrontend class when using a per-job YARN cluster.
> logger.yarn.name = org.apache.flink.yarn
> logger.yarn.level = INFO
> logger.yarn.appenderRef.console.ref = ConsoleAppender
> logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
> logger.yarncli.level = INFO
> logger.yarncli.appenderRef.console.ref = ConsoleAppender
> logger.hadoop.name = org.apache.hadoop
> logger.hadoop.level = INFO
> logger.hadoop.appenderRef.console.ref = ConsoleAppender
>
> # Make sure hive logs go to the file.
> logger.hive.name = org.apache.hadoop.hive
> logger.hive.level = INFO
> logger.hive.additivity = false
> logger.hive.appenderRef.file.ref = FileAppender
>
> # Log output from org.apache.flink.kubernetes to the console.
> logger.kubernetes.name = org.apache.flink.kubernetes
> logger.kubernetes.level = INFO
> logger.kubernetes.appenderRef.console.ref = ConsoleAppender
>
> appender.console.name = ConsoleAppender
> appender.console.type = CONSOLE
> appender.console.layout.type = PatternLayout
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c
> %x - %m%n
>
> # suppress the warning that hadoop native libraries are not loaded
> (irrelevant for the client)
> logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
> logger.hadoopnative.level = OFF
>
> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
> #logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
> #logger.netty.level = OFF
>
>
>
> And then submit a job locally, and send me the respective log file
> (containing the "client" string in the file name).
>
> Thanks a lot, and stay healthy through the pandemic!
>
> Best,
> Robert
>
>
> On Thu, Apr 15, 2021 at 9:12 PM Klemens Muthmann <
> klemens.muthm...@cyface.de> wrote:
>
>> Hi,
>>
>> Since kindergarden time is shortened due to the pandemic I only get four
>> hours of work into each day and I am supposed to do eight. So unfortunately
>> I will not be able to develop a fix at the moment. -.- I am happy to
>> provide any debug log you need or test adaptations and provide fixes as
>> pull requests. But I will sadly have no time to do any research into the
>> problem. :( So for now I guess I will be using one of our Linux servers to
>> test the Flink Pipelines until Silicon is supported.
>>
>> Nevertheless, thanks for your answer. If there is anything I can provide
>> you to narrow down the problem, I am happy to help.
>>
>> Regards
>>  Klemens
>>
>> Am 15.04.2021 um 20:59 schrieb Robert Metzger :
>>
>> Hey Klemens,
>>
>> I'm 

Re: Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-15 Thread Robert Metzger
Hi,

a DEBUG log of the client would indeed be nice.
Can you adjust this file:

conf/log4j-cli.properties

to the following contents: (basically TRACE logging with netty logs enabled)



#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.


# Allows this configuration to be modified at runtime. The file will be
checked every 30 seconds.
monitorInterval=30

rootLogger.level = TRACE
rootLogger.appenderRef.file.ref = FileAppender

# Log all infos in the given file
appender.file.name = FileAppender
appender.file.type = FILE
appender.file.append = false
appender.file.fileName = ${sys:log.file}
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n

# Log output from org.apache.flink.yarn to the console. This is used by the
# CliFrontend class when using a per-job YARN cluster.
logger.yarn.name = org.apache.flink.yarn
logger.yarn.level = INFO
logger.yarn.appenderRef.console.ref = ConsoleAppender
logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
logger.yarncli.level = INFO
logger.yarncli.appenderRef.console.ref = ConsoleAppender
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.hadoop.appenderRef.console.ref = ConsoleAppender

# Make sure hive logs go to the file.
logger.hive.name = org.apache.hadoop.hive
logger.hive.level = INFO
logger.hive.additivity = false
logger.hive.appenderRef.file.ref = FileAppender

# Log output from org.apache.flink.kubernetes to the console.
logger.kubernetes.name = org.apache.flink.kubernetes
logger.kubernetes.level = INFO
logger.kubernetes.appenderRef.console.ref = ConsoleAppender

appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n

# suppress the warning that hadoop native libraries are not loaded
(irrelevant for the client)
logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
logger.hadoopnative.level = OFF

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
#logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
#logger.netty.level = OFF



And then submit a job locally, and send me the respective log file
(containing the "client" string in the file name).

Thanks a lot, and stay healthy through the pandemic!

Best,
Robert


On Thu, Apr 15, 2021 at 9:12 PM Klemens Muthmann 
wrote:

> Hi,
>
> Since kindergarden time is shortened due to the pandemic I only get four
> hours of work into each day and I am supposed to do eight. So unfortunately
> I will not be able to develop a fix at the moment. -.- I am happy to
> provide any debug log you need or test adaptations and provide fixes as
> pull requests. But I will sadly have no time to do any research into the
> problem. :( So for now I guess I will be using one of our Linux servers to
> test the Flink Pipelines until Silicon is supported.
>
> Nevertheless, thanks for your answer. If there is anything I can provide
> you to narrow down the problem, I am happy to help.
>
> Regards
>  Klemens
>
> Am 15.04.2021 um 20:59 schrieb Robert Metzger :
>
> Hey Klemens,
>
> I'm sorry that you are running into this. Looks like you are the first (of
> probably many people) who use Flink on a M1 chip.
>
> If you are up for it, we would really appreciate a fix for this issue, as
> a contribution to Flink.
> Maybe you can distill the problem into an integration test, so that you
> can look into fixes right from your IDE. (It seems that the RestClient is
> causing the problems. The client is used by the command line interface to
> upload the job to the cluster (that's not happening when executing the job
> from the IDE))
> My first guess is that a newer netty version might be required? Or maybe
> there's some DEBUG log output that's helpful in understanding the issue?
>
>
>
>
> On Tue, Apr 13, 2021 at 5:34 PM Klemens Muthmann <
> klemens.muthm...@cyface.de> wrote:
>
>> Hi,
>>
>> I've just tried to run the basic example for Apache Flin

Re: Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-15 Thread Klemens Muthmann
Hi,

Since kindergarden time is shortened due to the pandemic I only get four hours 
of work into each day and I am supposed to do eight. So unfortunately I will 
not be able to develop a fix at the moment. -.- I am happy to provide any debug 
log you need or test adaptations and provide fixes as pull requests. But I will 
sadly have no time to do any research into the problem. :( So for now I guess I 
will be using one of our Linux servers to test the Flink Pipelines until 
Silicon is supported.

Nevertheless, thanks for your answer. If there is anything I can provide you to 
narrow down the problem, I am happy to help.

Regards
 Klemens

> Am 15.04.2021 um 20:59 schrieb Robert Metzger :
> 
> Hey Klemens,
> 
> I'm sorry that you are running into this. Looks like you are the first (of 
> probably many people) who use Flink on a M1 chip.
> 
> If you are up for it, we would really appreciate a fix for this issue, as a 
> contribution to Flink.
> Maybe you can distill the problem into an integration test, so that you can 
> look into fixes right from your IDE. (It seems that the RestClient is causing 
> the problems. The client is used by the command line interface to upload the 
> job to the cluster (that's not happening when executing the job from the IDE))
> My first guess is that a newer netty version might be required? Or maybe 
> there's some DEBUG log output that's helpful in understanding the issue?
> 
> 
> 
> 
> On Tue, Apr 13, 2021 at 5:34 PM Klemens Muthmann  > wrote:
> Hi,
> 
> I've just tried to run the basic example for Apache Flink 
> 
>  on an Apple Mac Pro with the new M1 Processor. I only need this for 
> development purposes. The actual thing is going to run on a Linux server 
> later on, so I would not mind if it only runs using the Rosetta compatibility 
> layer. Unfortunately it failed with the following Stack Trace:
> 
> flink-1.12.2 ./bin/flink run examples/streaming/WordCount.jar
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by 
> org.apache.flink.api.java.ClosureCleaner 
> (file:/Users/muthmann/Development/Flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar)
>  to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of 
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> 
> 
>  The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute job 'Streaming WordCount'.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
> 'Streaming WordCount'.
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at 
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at 
> org.apache.flink.client.

Re: Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-15 Thread Robert Metzger
Hey Klemens,

I'm sorry that you are running into this. Looks like you are the first (of
probably many people) who use Flink on a M1 chip.

If you are up for it, we would really appreciate a fix for this issue, as a
contribution to Flink.
Maybe you can distill the problem into an integration test, so that you can
look into fixes right from your IDE. (It seems that the RestClient is
causing the problems. The client is used by the command line interface to
upload the job to the cluster (that's not happening when executing the job
from the IDE))
My first guess is that a newer netty version might be required? Or maybe
there's some DEBUG log output that's helpful in understanding the issue?




On Tue, Apr 13, 2021 at 5:34 PM Klemens Muthmann 
wrote:

> Hi,
>
> I've just tried to run the basic example for Apache Flink
> 
>  on
> an Apple Mac Pro with the new M1 Processor. I only need this for
> development purposes. The actual thing is going to run on a Linux server
> later on, so I would not mind if it only runs using the Rosetta
> compatibility layer. Unfortunately it failed with the following Stack Trace:
>
> flink-1.12.2 ./bin/flink run examples/streaming/WordCount.jar
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by 
> org.apache.flink.api.java.ClosureCleaner 
> (file:/Users/muthmann/Development/Flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar)
>  to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of 
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute job 'Streaming WordCount'.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
> 'Streaming WordCount'.
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at 
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
> at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
> at 
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
> at 
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
> at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.

Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-13 Thread Klemens Muthmann
Hi,

I've just tried to run the basic example for Apache Flink 

 on an Apple Mac Pro with the new M1 Processor. I only need this for 
development purposes. The actual thing is going to run on a Linux server later 
on, so I would not mind if it only runs using the Rosetta compatibility layer. 
Unfortunately it failed with the following Stack Trace:
flink-1.12.2 ./bin/flink run examples/streaming/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner 
(file:/Users/muthmann/Development/Flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar)
 to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute job 'Streaming WordCount'.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'Streaming WordCount'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 8 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
at 
java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
at 
java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelInactive(RestClient.java:588)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.c

Re: Re: Does it support rate-limiting in flink 1.12?

2021-04-12 Thread Roman Khachatryan
In Flink, you can only limit memory usage, e.g. via
taskmanager.memory.process.size [1]
(throttling could be implemented using the DataStream API, but you
mentioned you are using SQL).
Quotas on other resources can be set in the underlying resource manager.

But I'd suggest investigating the failure and understand what's
causing it. Probably, high resource usage is not the root cause.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#memory-configuration

Regards,
Roman

On Mon, Apr 12, 2021 at 10:17 AM 张颖  wrote:
>
> Hi,
> This is not my intention.
> I was meaning that I run stream jobs and batch jobs in the same cluster, but 
> the batch job almost preemption all the resource in the cluster(maybe lead to 
> the machine loadaveage to 150 or cpu to 100% or disk io to 100%), which lead 
> my steam job to a series of problems (such as tm lost and connection time 
> out). So I want wo limit the speed of processing data on batch job.
>
>
>
>
>
>
>
> At 2021-04-12 15:49:31, "Roman Khachatryan"  wrote:
> >Hi,
> >
> >I'm not sure that I fully understand your question.
> >Is the intention to prioritize some jobs over the others in the same
> >Flink cluster? Currently, it is not possible (FLIP-156 and further
> >work aim to address this [1]). At the moment, you can either
> >- deploy the jobs in separate clusters (per-job mode [2]) and rely on
> >the underlying resource manager for resource isolation
> >- or allocate less task slots to a lower priority job by configuring:
> >parallelism, operator chaining and slot sharing groups
> >
> >[1] 
> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements
> >[2] 
> >https://ci.apache.org/projects/flink/flink-docs-stable/deployment/#per-job-mode
> >
> >Regards,
> >Roman
> >
> >
> >
> >On Mon, Apr 12, 2021 at 9:07 AM 张颖  wrote:
> >>
> >> When I run a sql job with blink planner in my cluster,the task is almost 
> >> preemption the whole resources in the cluster,  and this is a bad effect 
> >> to the stream task.As it is not necessary on speed,so is there any way to 
> >> control the rate in my batch task?
> >>
> >>
> >>
> >> this is the machine performance in running some operator:
> >> https://issues.apache.org/jira/browse/FLINK-22204
> >>
> >>
> >>
> >>
>
>
>
>


Re: Does it support rate-limiting in flink 1.12?

2021-04-12 Thread Roman Khachatryan
Hi,

I'm not sure that I fully understand your question.
Is the intention to prioritize some jobs over the others in the same
Flink cluster? Currently, it is not possible (FLIP-156 and further
work aim to address this [1]). At the moment, you can either
- deploy the jobs in separate clusters (per-job mode [2]) and rely on
the underlying resource manager for resource isolation
- or allocate less task slots to a lower priority job by configuring:
parallelism, operator chaining and slot sharing groups

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/#per-job-mode

Regards,
Roman



On Mon, Apr 12, 2021 at 9:07 AM 张颖  wrote:
>
> When I run a sql job with blink planner in my cluster,the task is almost 
> preemption the whole resources in the cluster,  and this is a bad effect to 
> the stream task.As it is not necessary on speed,so is there any way to 
> control the rate in my batch task?
>
>
>
> this is the machine performance in running some operator:
> https://issues.apache.org/jira/browse/FLINK-22204
>
>
>
>


Does it support rate-limiting in flink 1.12?

2021-04-12 Thread 张颖
When I run a sql job with blink planner in my cluster,the task is almost 
preemption the whole resources in the cluster,  and this is a bad effect to the 
stream task.As it is not necessary on speed,so is there any way to control the 
rate in my batch task?

 

this is the machine performance in running some operator:
https://issues.apache.org/jira/browse/FLINK-22204



Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-30 Thread Yingjie Cao
Hi Haihang,

After scanning the user mailing list, I found some users have reported
checkpoint timeout when using unaligned checkpoint, can you share which
checkpoint mode do you use? (The information can be found in log or the
checkpoint -> configuration tab in webui)

Best,
Yingjie

Yingjie Cao  于2021年3月30日周二 下午4:29写道:

> Hi Haihang,
>
> I think your issue is not related to FLINK-16404
> , because that change
> should have small impact on checkpoint time, we already have a micro
> benchmark for that change (1s checkpoint interval) and no regression is
> seen.
>
> Could you share some more information, for example, the stack of the task
> which can not finish the checkpoint?
>
> Best,
> Yingjie
>
> Haihang Jing  于2021年3月25日周四 上午10:58写道:
>
>> Hi,Congxian ,thanks for your replay.
>> job run on Flink1.9 (checkpoint interval 3min)
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/6.png>
>>
>> job run on Flink1.12 (checkpoint interval 10min)
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/7.png>
>>
>> job run on Flink1.12 (checkpoint interval 3min)
>> Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/2.png>
>>
>> Pic2:Start delay(4m27s):
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/1.png>
>>
>> Pic3:Next checkpoint failed(task141 ack n/a):
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/3.png>
>>
>> Pic4:Did not see back pressure and data skew:
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/4.png>
>>
>> Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/5.png>
>>
>> Best,
>> Haihang
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-30 Thread Yingjie Cao
Hi Haihang,

I think your issue is not related to FLINK-16404
, because that change
should have small impact on checkpoint time, we already have a micro
benchmark for that change (1s checkpoint interval) and no regression is
seen.

Could you share some more information, for example, the stack of the task
which can not finish the checkpoint?

Best,
Yingjie

Haihang Jing  于2021年3月25日周四 上午10:58写道:

> Hi,Congxian ,thanks for your replay.
> job run on Flink1.9 (checkpoint interval 3min)
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/6.png>
>
> job run on Flink1.12 (checkpoint interval 10min)
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/7.png>
>
> job run on Flink1.12 (checkpoint interval 3min)
> Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/2.png>
>
> Pic2:Start delay(4m27s):
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/1.png>
>
> Pic3:Next checkpoint failed(task141 ack n/a):
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/3.png>
>
> Pic4:Did not see back pressure and data skew:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/4.png>
>
> Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/5.png>
>
> Best,
> Haihang
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-24 Thread Haihang Jing
Hi,Congxian ,thanks for your replay.
job run on Flink1.9 (checkpoint interval 3min)

 
job run on Flink1.12 (checkpoint interval 10min)

 
job run on Flink1.12 (checkpoint interval 3min)
Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):

 
Pic2:Start delay(4m27s):

 
Pic3:Next checkpoint failed(task141 ack n/a):

 
Pic4:Did not see back pressure and data skew:

 
Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:

 
Best,
Haihang



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-24 Thread Haihang Jing
Hi,Congxian ,thanks for your replay.
job run on Flink1.9 (checkpoint interval 3min)

 
job run on Flink1.12 (checkpoint interval 10min)

 
job run on Flink1.12 (checkpoint interval 3min)
Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):

 
Pic2:Start delay(4m27s):

 
Pic3:Next checkpoint failed(task141 ack n/a):

 
Pic4:Did not see back pressure and data skew:

 
Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:

 
Best,
Haihang




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-24 Thread Congxian Qiu
Hi
From the description, the time used to complete the checkpoint in 1.12
is longer. could you share more detail about the time consumption when
running job on 1.9 and 1.12?
Best,
Congxian


Haihang Jing  于2021年3月23日周二 下午7:22写道:

> 【Appearance】For jobs with the same configuration (checkpoint interval: 3
> minutes, job logic: regular join), flink1.9 runs normally. After flink1.12
> runs for a period of time, the checkpoint creation time increases, and
> finally the checkpoint creation fails.
>
> 【Analysis】After learning flink1.10, the checkpoint mechanism is adjusted.
> The receiver will not cache the data after a single barrier arrives when
> the
> barrier is aligned, which means that the sender must wait for credit
> feedback to transmit data after the barrier is aligned, so the sender will
> generate certain The cold start of Flink affects the delay and network
> throughput. Therefore, the checkpoint interval is adjusted to 10 minutes
> for
> comparative testing, and it is found that after the adjustment (interval is
> 10), the job running on flink 1.12 is running normally.
>
> issue:https://issues.apache.org/jira/browse/FLINK-16404
>
> 【Question】1.Have you encountered the same problem?
>2.Can  flink1.12 set small checkpoint interval?
>
> The checkpoint interval is 3 minutes after the flink1.12 job runs for 5
> hours, the checkpoint creation fails, the specific exception stack:
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:96)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1924)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1897)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2038)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-23 Thread Haihang Jing
【Appearance】For jobs with the same configuration (checkpoint interval: 3
minutes, job logic: regular join), flink1.9 runs normally. After flink1.12
runs for a period of time, the checkpoint creation time increases, and
finally the checkpoint creation fails.

【Analysis】After learning flink1.10, the checkpoint mechanism is adjusted.
The receiver will not cache the data after a single barrier arrives when the
barrier is aligned, which means that the sender must wait for credit
feedback to transmit data after the barrier is aligned, so the sender will
generate certain The cold start of Flink affects the delay and network
throughput. Therefore, the checkpoint interval is adjusted to 10 minutes for
comparative testing, and it is found that after the adjustment (interval is
10), the job running on flink 1.12 is running normally.

issue:https://issues.apache.org/jira/browse/FLINK-16404

【Question】1.Have you encountered the same problem?
   2.Can  flink1.12 set small checkpoint interval?

The checkpoint interval is 3 minutes after the flink1.12 job runs for 5
hours, the checkpoint creation fails, the specific exception stack:
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.

at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:96)

at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1924)

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1897)

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2038)

at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: OrcTableSource in flink 1.12

2021-03-23 Thread Timo Walther

Hi Nikola,

for the ORC source it is fine to use `TableEnvironment#fromTableSource`. 
It is true that this method is deprecated, but as I said not all 
connectors have been ported to be supported in the SQL DDL via string 
properties. Therefore, `TableEnvironment#fromTableSource` is still 
accessible until all connectors are support in the DDL.


Btw it might also make sense to look into the Hive connector for reading 
ORC.


Regards,
Timo

On 22.03.21 18:02, Nikola Hrusov wrote:

Hi Timo,

I need to read ORC files and run a query on them as in the example 
above. Since the example given in docs is not recommended what should I use?


I looked into the method you suggest - TableEnvironment#fromTableSource 
- it shows as Deprecated on the docs: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/api/TableEnvironment.html#fromTableSource-org.apache.flink.table.sources.TableSource- 



However, it doesn't say what I should use instead?

I have looked in all the docs available for 1.12 but I cannot find how 
to achieve the same result as it was in some previous versions. In some 
previous versions you could define 
`tableEnv.registerTableSource(tableName, orcTableSource);` but that 
method is not available anymore.


What is the way to go from here? I would like to read from orc files, 
run a query and transform the result. I do not necessarily need it to be 
with the DataSet API.


Regards
,
Nikola

On Mon, Mar 22, 2021 at 6:49 PM Timo Walther > wrote:


Hi Nikola,


the OrcTableSource has not been updated to be used in a SQL DDL. You
can
define your own table factory [1] that translates properties into a
object to create instances or use
`org.apache.flink.table.api.TableEnvironment#fromTableSource`. I
recommend the latter option.

Please keep in mind that we are about to drop DataSet support for Table
API in 1.13. Batch and streaming use cases are already possible with
the
unified TableEnvironment.

Are you sure that you really need DataSet API?

Regards,
Timo

[1]

https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sourcessinks/



On 21.03.21 15:42, Nikola Hrusov wrote:
 > Hello,
 >
 > I am trying to find some examples of how to use the
OrcTableSource and
 > query it.
 > I got to the documentation here:
 >

https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/orc/OrcTableSource.html



 >

>

 > and it says that an OrcTableSource is used as below:
 >
 > |OrcTableSource orcSrc = OrcTableSource.builder()
 > .path("file:///my/data/file.orc")
 >
.forOrcSchema("struct")
.build();
 > tEnv.registerTableSourceInternal("orcTable", orcSrc); Table res =
 > tableEnv.sqlQuery("SELECT * FROM orcTable"); |
 >
 >
 > My question is what should tEnv be so that I can use
 > the registerTableSourceInternal method?
 > My end goal is to query the orc source and then return a DataSet.
 >
 > Regards
 > ,
 > Nikola





Re: OrcTableSource in flink 1.12

2021-03-22 Thread Nikola Hrusov
Hi Timo,

I need to read ORC files and run a query on them as in the example above.
Since the example given in docs is not recommended what should I use?

I looked into the method you suggest - TableEnvironment#fromTableSource -
it shows as Deprecated on the docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/api/TableEnvironment.html#fromTableSource-org.apache.flink.table.sources.TableSource-

However, it doesn't say what I should use instead?

I have looked in all the docs available for 1.12 but I cannot find how to
achieve the same result as it was in some previous versions. In some
previous versions you could define `tableEnv.registerTableSource(tableName,
orcTableSource);` but that method is not available anymore.

What is the way to go from here? I would like to read from orc files, run a
query and transform the result. I do not necessarily need it to be with the
DataSet API.

Regards
,
Nikola

On Mon, Mar 22, 2021 at 6:49 PM Timo Walther  wrote:

> Hi Nikola,
>
>
> the OrcTableSource has not been updated to be used in a SQL DDL. You can
> define your own table factory [1] that translates properties into a
> object to create instances or use
> `org.apache.flink.table.api.TableEnvironment#fromTableSource`. I
> recommend the latter option.
>
> Please keep in mind that we are about to drop DataSet support for Table
> API in 1.13. Batch and streaming use cases are already possible with the
> unified TableEnvironment.
>
> Are you sure that you really need DataSet API?
>
> Regards,
> Timo
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sourcessinks/
>
> On 21.03.21 15:42, Nikola Hrusov wrote:
> > Hello,
> >
> > I am trying to find some examples of how to use the OrcTableSource and
> > query it.
> > I got to the documentation here:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/orc/OrcTableSource.html
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/orc/OrcTableSource.html>
>
> > and it says that an OrcTableSource is used as below:
> >
> > |OrcTableSource orcSrc = OrcTableSource.builder()
> > .path("file:///my/data/file.orc")
> >
> .forOrcSchema("struct")
> .build();
> > tEnv.registerTableSourceInternal("orcTable", orcSrc); Table res =
> > tableEnv.sqlQuery("SELECT * FROM orcTable"); |
> >
> >
> > My question is what should tEnv be so that I can use
> > the registerTableSourceInternal method?
> > My end goal is to query the orc source and then return a DataSet.
> >
> > Regards
> > ,
> > Nikola
>
>


Re: OrcTableSource in flink 1.12

2021-03-22 Thread Timo Walther

Hi Nikola,


the OrcTableSource has not been updated to be used in a SQL DDL. You can 
define your own table factory [1] that translates properties into a 
object to create instances or use 
`org.apache.flink.table.api.TableEnvironment#fromTableSource`. I 
recommend the latter option.


Please keep in mind that we are about to drop DataSet support for Table 
API in 1.13. Batch and streaming use cases are already possible with the 
unified TableEnvironment.


Are you sure that you really need DataSet API?

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sourcessinks/


On 21.03.21 15:42, Nikola Hrusov wrote:

Hello,

I am trying to find some examples of how to use the OrcTableSource and 
query it.
I got to the documentation here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/orc/OrcTableSource.html 
 
and it says that an OrcTableSource is used as below:


|OrcTableSource orcSrc = OrcTableSource.builder() 
.path("file:///my/data/file.orc") 
.forOrcSchema("struct") .build(); 
tEnv.registerTableSourceInternal("orcTable", orcSrc); Table res = 
tableEnv.sqlQuery("SELECT * FROM orcTable"); |



My question is what should tEnv be so that I can use 
the registerTableSourceInternal method?

My end goal is to query the orc source and then return a DataSet.

Regards
,
Nikola




OrcTableSource in flink 1.12

2021-03-21 Thread Nikola Hrusov
Hello,

I am trying to find some examples of how to use the OrcTableSource and
query it.
I got to the documentation here:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/orc/OrcTableSource.html
and it says that an OrcTableSource is used as below:

 OrcTableSource orcSrc = OrcTableSource.builder()
   .path("file:///my/data/file.orc")
   .forOrcSchema("struct")
   .build();

 tEnv.registerTableSourceInternal("orcTable", orcSrc);
 Table res = tableEnv.sqlQuery("SELECT * FROM orcTable");


My question is what should tEnv be so that I can use
the registerTableSourceInternal method?
My end goal is to query the orc source and then return a DataSet.

Regards
,
Nikola


Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-03-10 Thread Matthias Pohl
Hi Abhishek,
sorry for the late reply. Did you manage to fix it? One remark: Are you
sure you're referring to the right configuration file? log4j-cli.properties
is used for the CLI tool [1]. Or do you try to get the logs from within the
main of your job?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/logging.html#configuring-log4j-2

On Thu, Mar 4, 2021 at 1:50 PM Abhishek Shukla 
wrote:

> @Matthis tried this but did not work, normal logs (application logs) are
> coming.
>
> but the startup bean creation or server error log in case of build failure
> are not getting printed in file
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-03-04 Thread Abhishek Shukla
@Matthis tried this but did not work, normal logs (application logs) are
coming.

but the startup bean creation or server error log in case of build failure
are not getting printed in file



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.12 Compatibility with hbase-shaded-client 2.1 in application jar

2021-03-03 Thread Debraj Manna
The issue is resolved. org.apache.hbase exclusion was missing on my
application pom while creating the uber jar.

diff --git a/map/engine/pom.xml b/map/engine/pom.xml
index 8337be031d1..8eceb721fa7 100644
--- a/map/engine/pom.xml
+++ b/map/engine/pom.xml
@@ -203,6 +203,7 @@
 org.slf4j:*
 log4j:*
 org.apache.hadoop:*
+org.apache.hbase:*
 
 
 

On Wed, Mar 3, 2021 at 10:12 AM Debraj Manna 
wrote:

> Hi
>
> I am trying to deploy an application in flink 1.12 having
> hbase-shaded-client 2.1.0 as dependency  in application mode
> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#application-mode>.
> On deploying the application I am seeing the below ClassCastException:
>
> *org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto
> cannot be cast to
> org.apache.hadoop.hbase.shaded.com.google.protobuf.Message*
>
> *I have done *export HADOOP_CLASSPATH=`hadoop classpath` as mentioned in
> the hadoop documentation. I did not add any hadoop / hbase jars in the
> flink/lib folder .
>
> ubuntu@vrni-platform://tmp/debraj-flink/flink/lib$ ls
> flink-csv-1.12.1.jarflink-json-1.12.1.jar
>  flink-table_2.12-1.12.1.jarlog4j-1.2.17.jar
> slf4j-log4j12-1.7.25.jar
> flink-dist_2.12-1.12.1.jar  flink-shaded-zookeeper-3.4.14.jar
>  flink-table-blink_2.12-1.12.1.jar  log4j-to-slf4j-2.11.1.jar
>  vrni-flink-datadog-0.001-SNAPSHOT.jar
>
> Can anyone suggest what could be going wrong here?
>
> The full exception trace is like below
>
> 2021-03-02 18:10:45,819 ERROR 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager  - 
> Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
> Could not start the ResourceManager 
> akka.tcp://flink@localhost:41477/user/rpc/resourcemanager_0
> at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:233)
> at 
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:607)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:181)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at akka.actor.Actor.aroundReceive(Actor.scala:539)
> at akka.actor.Actor.aroundReceive$(Actor.scala:537)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
> at akka.actor.ActorCell.invoke(ActorCell.scala:583)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
> at akka.dispatch.Mailbox.run(Mailbox.scala:229)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
> Cannot initialize resource provider.
> at 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:124)
> at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:245)
> at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:229)
> ... 22 more
> Caused by: 
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
> Could not start resource manager client.
> at 
> org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:181)
> at 
> org.apache.flink.runtime.resourc

Flink 1.12 Compatibility with hbase-shaded-client 2.1 in application jar

2021-03-02 Thread Debraj Manna
Hi

I am trying to deploy an application in flink 1.12 having
hbase-shaded-client 2.1.0 as dependency  in application mode
<https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#application-mode>.
On deploying the application I am seeing the below ClassCastException:

*org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto
cannot be cast to
org.apache.hadoop.hbase.shaded.com.google.protobuf.Message*

*I have done *export HADOOP_CLASSPATH=`hadoop classpath` as mentioned in
the hadoop documentation. I did not add any hadoop / hbase jars in the
flink/lib folder .

ubuntu@vrni-platform://tmp/debraj-flink/flink/lib$ ls
flink-csv-1.12.1.jarflink-json-1.12.1.jar
 flink-table_2.12-1.12.1.jarlog4j-1.2.17.jar
slf4j-log4j12-1.7.25.jar
flink-dist_2.12-1.12.1.jar  flink-shaded-zookeeper-3.4.14.jar
 flink-table-blink_2.12-1.12.1.jar  log4j-to-slf4j-2.11.1.jar
 vrni-flink-datadog-0.001-SNAPSHOT.jar

Can anyone suggest what could be going wrong here?

The full exception trace is like below

2021-03-02 18:10:45,819 ERROR
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
- Fatal error occurred in ResourceManager.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Could not start the ResourceManager
akka.tcp://flink@localhost:41477/user/rpc/resourcemanager_0
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:233)
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:607)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:181)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
at akka.actor.ActorCell.invoke(ActorCell.scala:583)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Cannot initialize resource provider.
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:124)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:245)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:229)
... 22 more
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Could not start resource manager client.
at 
org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:181)
at 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:81)
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:122)
... 24 more
Caused by: java.lang.ClassCastException:
org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto
cannot be cast to
org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy16.registerApplicationMaster(Unknown Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:107)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j

Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-28 Thread Matthias Pohl
Hi Abhishek,
have you also tried to apply the instructions listed in [1]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/logging.html#configuring-log4j1

On Mon, Mar 1, 2021 at 4:42 AM Abhishek Shukla 
wrote:

> Hi Matthias,
> Thanks for replying,
> I checked both of these pages,
> And I downloaded the zip of flink 1.12.1 so the changes related to log4j2
> are there in property file,
>
> I am able to see the logs of pipeline once application in up, but the logs
> related to application failure or successful bean creation or logs at time
> of post construct are not getting printed out in file, which was happening
> in flink 1.9 with provided log4j-cli.properties file.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-28 Thread Abhishek Shukla
Hi Matthias,
Thanks for replying,
I checked both of these pages,
And I downloaded the zip of flink 1.12.1 so the changes related to log4j2
are there in property file,

I am able to see the logs of pipeline once application in up, but the logs
related to application failure or successful bean creation or logs at time
of post construct are not getting printed out in file, which was happening
in flink 1.9 with provided log4j-cli.properties file.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-26 Thread Matthias Pohl
Hi Abhishek,
this might be caused by the switch from log4j to log4j2 as the default in
Flink 1.11 [1]. Have you had a chance to look at the logging documentation
[2] to enable log4j again?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.11.html#switch-to-log4j-2-by-default-flink-15672
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/logging.html

On Thu, Feb 25, 2021 at 5:56 AM Abhishek Shukla 
wrote:

> I was getting bean creation logs and spring boot start up logs in Flink
> 1.9 with flink1.9_log4j-cli.properties (attached)
>
> 
> 
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #  http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> 
>
> log4j.rootLogger=INFO, file
>
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=true
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
> %-60c %x - %m%n
>
>
> # Log output from org.apache.flink.yarn to the console. This is used by the
> # CliFrontend class when using a per-job YARN 
> cluster.log4j.logger.org.apache.flink.yarn=INFO, console
> log4j.logger.org.apache.flink.yarn.cli.FlinkYarnSessionCli=INFO, console
> log4j.logger.org.apache.hadoop=INFO, console
>
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
> %-5p %-60c %x - %m%n
>
> # suppress the warning that hadoop native libraries are not loaded 
> (irrelevant for the client)
> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
>
> but after updating to Flink 1.12.1 those logs are not getting printed in
> log file attaching flink1.12_log4j-cli.properties
>
> 
> 
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #  http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> 
>
> rootLogger.level = INFO
> rootLogger.appenderRef.file.ref = FileAppender
>
> # Log all infos in the given fileappender.file.name = FileAppender
> appender.file.type = FILE
> appender.file.append = true
> appender.file.fileName = ${sys:log.file}
> appender.file.layout.type = PatternLayout
> appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - 
> %m%n
> logger.flink.name = org.apache.flink
> logger.flink.level = INFO
>
> # Log output from org.apache.flink.yarn to the console. This is used by the
> # CliFrontend class when using a per-job YARN cluster.logger.yarn.name = 
> org.apache.flink.yarn
> logger.yarn.level = INFO
> logger.yarn.appenderRef.console.ref = ConsoleAppenderlogger.yarncli.name = 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> logger.yarncli.level = INFO
> logger.yarncli.appenderRef.console.ref = ConsoleAppenderlogger.hadoop.name = 
> org.apache.hadoop
> logger.hadoop.level = INFO
> logger.hadoop.appenderRef.console.ref = ConsoleAppender
>
> # Log output from org.

apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-24 Thread Abhishek Shukla
I was getting bean creation logs and spring boot start up logs in Flink 1.9
with flink1.9_log4j-cli.properties (attached)



#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.


log4j.rootLogger=INFO, file


# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=true
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n


# Log output from org.apache.flink.yarn to the console. This is used by the
# CliFrontend class when using a per-job YARN
cluster.log4j.logger.org.apache.flink.yarn=INFO, console
log4j.logger.org.apache.flink.yarn.cli.FlinkYarnSessionCli=INFO, console
log4j.logger.org.apache.hadoop=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# suppress the warning that hadoop native libraries are not loaded
(irrelevant for the client)
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR

but after updating to Flink 1.12.1 those logs are not getting printed in
log file attaching flink1.12_log4j-cli.properties



#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.


rootLogger.level = INFO
rootLogger.appenderRef.file.ref = FileAppender

# Log all infos in the given fileappender.file.name = FileAppender
appender.file.type = FILE
appender.file.append = true
appender.file.fileName = ${sys:log.file}
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
logger.flink.name = org.apache.flink
logger.flink.level = INFO

# Log output from org.apache.flink.yarn to the console. This is used by the
# CliFrontend class when using a per-job YARN cluster.logger.yarn.name
= org.apache.flink.yarn
logger.yarn.level = INFO
logger.yarn.appenderRef.console.ref =
ConsoleAppenderlogger.yarncli.name =
org.apache.flink.yarn.cli.FlinkYarnSessionCli
logger.yarncli.level = INFO
logger.yarncli.appenderRef.console.ref =
ConsoleAppenderlogger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.hadoop.appenderRef.console.ref = ConsoleAppender

# Log output from org.apache.flink.kubernetes to the
console.logger.kubernetes.name = org.apache.flink.kubernetes
logger.kubernetes.level = INFO
logger.kubernetes.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n

# suppress the warning that hadoop native libraries are not loaded
(irrelevant for the client)logger.hadoopnative.name =
org.apache.hadoop.util.NativeCodeLoader
logger.hadoopnative.level = OFF

# Suppress the irrelevant (wrong) warnings from the Netty channel
handlerlogger.netty.name =
org.apache.fli

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-02-04 Thread Abhishek Rai
I had a similar need recently and ended up using
KafkaDeserializationSchemaWrapper to wrap a given
DeserializationSchema.  The resulting
KafkaDeserializationSchema[Wrapper] can be passed directly to the
`FlinkKafkaConsumer` constructor.

```
class BoundingDeserializationSchema
extends KafkaDeserializationSchemaWrapper {
 private static final long serialVersionUID = 1858204203663583785L;
 private long maxRecords_;
 private long numRecords_ = 0;

 public BoundingDeserializationSchema(
 DeserializationSchema deserializationSchema,
 long maxRecords) {
  super(deserializationSchema);
  maxRecords_ = maxRecords;
 }

 @Override
 public void deserialize(
 ConsumerRecord message, Collector out)
 throws Exception {
  super.deserialize(message, out);
  numRecords_++;
 }

 @Override
 public boolean isEndOfStream(Row nextElement) {
  return numRecords_ >= maxRecords_;
 }
}

```

On Thu, Jan 14, 2021 at 6:15 AM sagar  wrote:
>
> Thanks Yun
>
>
>
> On Thu, Jan 14, 2021 at 1:58 PM Yun Gao  wrote:
>>
>> Hi Sagar,
>>
>>   I rechecked and found that the new kafka source is not formally publish 
>> yet, and a stable method I think may be try adding the FlinkKafkaConsumer as 
>> a BOUNDED source first. Sorry for the inconvient.
>>
>> Best,
>>  Yun
>>
>> --
>> Sender:Yun Gao
>> Date:2021/01/14 15:26:54
>> Recipient:Ardhani Narasimha; 
>> sagar
>> Cc:Flink User Mail List
>> Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch 
>> mode (Flink 1.12)
>>
>> Hi Sagar,
>>
>>   I think the problem is that the legacy source implemented by extending 
>> SourceFunction are all defined as CONTINOUS_UNBOUNDED when use 
>> env.addSource(). Although there is hacky way to add the legacy sources as 
>> BOUNDED source [1], I think you may first have a try of new version of 
>> KafkaSource [2] ? The new version of KafkaSource is implemented with the new 
>> Source API [3], which provides unfied support for the streaming and batch 
>> mode.
>>
>> Best,
>>  Yun
>>
>>
>>
>>
>> [1] 
>> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
>> [2]  
>> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
>> [3] 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>
>>
>>
>> --Original Mail --
>> Sender:Ardhani Narasimha 
>> Send Date:Thu Jan 14 15:11:35 2021
>> Recipients:sagar 
>> CC:Flink User Mail List 
>> Subject:Re: Using Kafka as bounded source with DataStream API in batch mode 
>> (Flink 1.12)
>>>
>>> Interesting use case.
>>>
>>> Can you please elaborate more on this.
>>> On what criteria do you want to batch? Time? Count? Or Size?
>>>
>>> On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:
>>>>
>>>> Hi Team,
>>>>
>>>> I am getting the following error while running DataStream API in with 
>>>> batch mode with kafka source.
>>>> I am using FlinkKafkaConsumer to consume the data.
>>>>
>>>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source 
>>>> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not 
>>>> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
>>>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
>>>> ~[flink-core-1.12.0.jar:1.12.0]
>>>>
>>>> In my batch program I wanted to work with four to five different stream in 
>>>> batch mode as data source is bounded
>>>>
>>>> I don't find any clear example of how to do it with kafka souce with Flink 
>>>> 1.12
>>>>
>>>> I don't want to use JDBC source as underlying database table may change. 
>>>> please give me some example on how to achieve the above use case.
>>>>
>>>> Also for any large bounded source are there any alternatives to achieve 
>>>> this?
>>>>
>>>>
>>>>
>>>> --
>>>> ---Regards-

Re: Flink upgrade to Flink-1.12

2021-01-27 Thread Aljoscha Krettek
I'm afraid I also don't know more than that. But I agree with Ufuk that 
it should just work.


I think the best way would be to try it in a test environment and then 
go forward with upgrading the production jobs/cluster.


Best,
Aljoscha

On 2021/01/25 18:59, Ufuk Celebi wrote:

Thanks for reaching out. Semi-asynchronous does *not* refer to incremental 
checkpoints and Savepoints are always triggered as full snapshots (not 
incremental).

Earlier versions of the RocksDb state backend supported two snapshotting modes, 
fully and semi-asynchronous snapshots. Semi-asynchronous state snapshots for 
RocksDb have been removed a long time ago by Aljoscha in 
https://github.com/apache/flink/pull/2345 (FLINK-4340). The notes you are 
referencing were added around that time and I'm afraid they might have become 
mostly obsolete.

I'm pulling in Aljoscha who should be able to give a definitive answer here.

To make a long story short, it should simply work for you to upgrade from 1.11 
to 1.12 via a Savepoint.

Cheers,

Ufuk

On Wed, Jan 20, 2021, at 3:58 AM, 耿延杰 wrote:

Hi all,

As flink doc says:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/upgrading.html#preconditions


We do not support migration for state in RocksDB that was checkpointed using 
`semi-asynchronous` mode. In case your old job was using this mode, you can 
still change your job to use `fully-asynchronous` mode before taking the 
savepoint that is used as the basis for the migration.


So, my first question:
Is "semi-asynchronous" means "incremental checkpoint"?

And second question:
If so, assume I'm using flink-1.11 and RocksDB with incremental asynchronous 
checkpoint as state backend.
I should:
1. take a savepoint for old version(flink-1.11),
2. and change job to use "full asynchronous checkpoint" ,
3. restart old version(flink-1.11) job with new config (full asynchronous 
checkpoint),
4. then, take a savepoint
5. and finally, stop old version(flink-1.11) and upgrade to flink-1.12

Whether I understand correctly?

Best regards


Re: Flink upgrade to Flink-1.12

2021-01-25 Thread Ufuk Celebi
Thanks for reaching out. Semi-asynchronous does *not* refer to incremental 
checkpoints and Savepoints are always triggered as full snapshots (not 
incremental).

Earlier versions of the RocksDb state backend supported two snapshotting modes, 
fully and semi-asynchronous snapshots. Semi-asynchronous state snapshots for 
RocksDb have been removed a long time ago by Aljoscha in 
https://github.com/apache/flink/pull/2345 (FLINK-4340). The notes you are 
referencing were added around that time and I'm afraid they might have become 
mostly obsolete.

I'm pulling in Aljoscha who should be able to give a definitive answer here.

To make a long story short, it should simply work for you to upgrade from 1.11 
to 1.12 via a Savepoint.

Cheers,

Ufuk

On Wed, Jan 20, 2021, at 3:58 AM, 耿延杰 wrote:
> Hi all,
> 
> As flink doc says:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/upgrading.html#preconditions
> 
>> We do not support migration for state in RocksDB that was checkpointed using 
>> `semi-asynchronous` mode. In case your old job was using this mode, you can 
>> still change your job to use `fully-asynchronous` mode before taking the 
>> savepoint that is used as the basis for the migration.
> 
> So, my first question:
> Is "semi-asynchronous" means "incremental checkpoint"?
> 
> And second question:
> If so, assume I'm using flink-1.11 and RocksDB with incremental asynchronous 
> checkpoint as state backend. 
> I should: 
> 1. take a savepoint for old version(flink-1.11), 
> 2. and change job to use "full asynchronous checkpoint" ,
> 3. restart old version(flink-1.11) job with new config (full asynchronous 
> checkpoint),
> 4. then, take a savepoint
> 5. and finally, stop old version(flink-1.11) and upgrade to flink-1.12
> 
> Whether I understand correctly?
> 
> Best regards


Flink upgrade to Flink-1.12

2021-01-19 Thread ??????
Hi all,


As flink doc says:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/upgrading.html#preconditions


We do not support migration for state in RocksDB that was checkpointed 
using semi-asynchronous mode. In case your old job was using this 
mode, you can still change your job to use fully-asynchronous mode 
before taking the savepoint that is used as the basis for the migration.



So, my first question:
Is "semi-asynchronous" means "incremental checkpoint"?


And second question:
If so, assume I'm using flink-1.11 and RocksDB with incremental asynchronous 
checkpoint as state backend. 
I should: 
1. take a savepoint for old version(flink-1.11), 
2. and change job to use "full asynchronous checkpoint" ,
3. restart old version(flink-1.11) job with new config (full asynchronous 
checkpoint),
4. then, take a savepoint
5. and finally, stop old version(flink-1.11) and upgrade to flink-1.12


Whether I understand correctly?


Best regards

Re: Flink 1.12 Kryo Serialization Error

2021-01-18 Thread Timo Walther

Forgot to add the link:

https://github.com/twalthr/flink/tree/kryoBug_ser

Regards,
Timo


On 18.01.21 14:11, Timo Walther wrote:
I ported the code to the Flink code base. Because I had issues with SBT 
and Scala 2.12. Note it uses an older version of circe. I'm just pasting 
it here in case it helps someone.


Regards,
Timo

On 18.01.21 13:51, Timo Walther wrote:

Hi Yuval,

thanks for sharing some code with us. I scanned the code but could not 
find anything suspicious from an API perspective. By using the full 
RAW serializable string, we should actually be on the save side when 
it comes to configure the Kryo serializer.


I would suggest to further investigate in the checkpointing area if it 
only occurs when checkpointing is enabled.


Regards,
Timo

On 13.01.21 13:35, Yuval Itzchakov wrote:

Hi Timo and Piotr,

Let me try and answer all your questions:

Piotr:

1. Yes, I am using Flink 1.12.0
2. I have no tried downgrading to Flink 1.11.3, as I have features 
that are specific to 1.12 that I need (namely the ability to create 
a DataStreamScanProvider which was not available in previous versions)
3. I am using a pretty standard configuration. The only thing I've 
set was checkpointing (using the default MemoryStateBackend):


image.png
4. This is the interesting bit. When I try to create a small 
reproduction outside the codebase, using a simple source the issue 
does not reproduce, both with default Kryo serialization and with my 
own Kryo serializer.
5. No, here is the relevant bit of build.sbt (flinkVersion is set to 
1.12)

image.png
6. I am trying to come up with a reproduction, thus far with no luck. 
Here's what I have so far: 
https://github.com/YuvalItzchakov/flink-bug-repro 
. I am afraid that 
there are many more moving parts that are affecting this issue (I 
have a custom flink source and sink involved)


Timo:

I am explicitly passing a serialized string of my custom Kryo 
serializer to the UDF (see 
https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31 
). 
I can validate that both serialization and deserialization invoke the 
method defined on my custom serializer, if that's what you mean.
Otherwise, if there's a mismatch between the two serializers Flink 
blows up at runtime saying that the types don't match.


On Wed, Jan 13, 2021 at 1:19 PM Timo Walther > wrote:


    Hi Yuval,

    could you share a reproducible example with us?

    I see you are using SQL / Table API with a RAW type. I could imagine
    that the KryoSerializer is configured differently when 
serializing and
    when deserializing. This might be due to `ExecutionConfig` not 
shipped

    (or copied) through the stack correctly.

    Even though an error in the stack should be visible immediately 
and not

    after 30 seconds, I still would also investigate an error in this
    direction.

    Regards,
    Timo


    On 13.01.21 09:47, Piotr Nowojski wrote:
 > Hi Yuval,
 >
 > I have a couple of questions:
 >
 > 1. I see that you are using Flink 1.12.0, is that correct?
 > 2. Have you tried running your application with a different Flink
 > version? If you are using 1.12.0, could you check Flink 1.11.3,
    or vice
 > versa?
 > 3. What's the configuration that you are using? For example, have
    you
 > enabled unaligned checkpoints or some other feature?
 > 4. Is the problem still there if you replace Kryo with something
    else
 > (Java's serialisation?)?
 > 5. Could it be a problem with dependency convergence? Like maybe
    there
 > are different versions of Flink jars present during runtime?
 > 6. Lastly, would it be possible for you to prepare a minimal 
example

 > that could reproduce the problem?
 >
 > Piotrek
 >
 > wt., 12 sty 2021 o 17:19 Yuval Itzchakov mailto:yuva...@gmail.com>
 > >> 
napisał(a):

 >
 >     Hi Chesnay,
 >     Turns out it didn't actually work, there were one or two
 >     successful runs but the problem still persists (it's a bit 
non

 >     deterministic, and doesn't always reproduce when parallelism
    is set
 >     to 1).
 >
 >     I turned off all Kryo custom serialization and am only using
    Flink
 >     provided one's ATM, the problem still persists.
 >     There seems to be an issue with how Flink serializes these
    raw types
 >     over the wire, but I still can't put my finger as to what the
 >     problem is.
 >
 >     What I can see is that Flink tries to consume a
    HybridMemorySegment
 >     which contains one of these custom raw types I have and
    because of
 >     malformed content it receives

Re: Flink 1.12 Kryo Serialization Error

2021-01-18 Thread Timo Walther
I ported the code to the Flink code base. Because I had issues with SBT 
and Scala 2.12. Note it uses an older version of circe. I'm just pasting 
it here in case it helps someone.


Regards,
Timo

On 18.01.21 13:51, Timo Walther wrote:

Hi Yuval,

thanks for sharing some code with us. I scanned the code but could not 
find anything suspicious from an API perspective. By using the full RAW 
serializable string, we should actually be on the save side when it 
comes to configure the Kryo serializer.


I would suggest to further investigate in the checkpointing area if it 
only occurs when checkpointing is enabled.


Regards,
Timo

On 13.01.21 13:35, Yuval Itzchakov wrote:

Hi Timo and Piotr,

Let me try and answer all your questions:

Piotr:

1. Yes, I am using Flink 1.12.0
2. I have no tried downgrading to Flink 1.11.3, as I have features 
that are specific to 1.12 that I need (namely the ability to create 
a DataStreamScanProvider which was not available in previous versions)
3. I am using a pretty standard configuration. The only thing I've set 
was checkpointing (using the default MemoryStateBackend):


image.png
4. This is the interesting bit. When I try to create a small 
reproduction outside the codebase, using a simple source the issue 
does not reproduce, both with default Kryo serialization and with my 
own Kryo serializer.
5. No, here is the relevant bit of build.sbt (flinkVersion is set to 
1.12)

image.png
6. I am trying to come up with a reproduction, thus far with no luck. 
Here's what I have so far: 
https://github.com/YuvalItzchakov/flink-bug-repro 
. I am afraid that 
there are many more moving parts that are affecting this issue (I have 
a custom flink source and sink involved)


Timo:

I am explicitly passing a serialized string of my custom Kryo 
serializer to the UDF (see 
https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31 
). 
I can validate that both serialization and deserialization invoke the 
method defined on my custom serializer, if that's what you mean.
Otherwise, if there's a mismatch between the two serializers Flink 
blows up at runtime saying that the types don't match.


On Wed, Jan 13, 2021 at 1:19 PM Timo Walther > wrote:


    Hi Yuval,

    could you share a reproducible example with us?

    I see you are using SQL / Table API with a RAW type. I could imagine
    that the KryoSerializer is configured differently when serializing 
and
    when deserializing. This might be due to `ExecutionConfig` not 
shipped

    (or copied) through the stack correctly.

    Even though an error in the stack should be visible immediately 
and not

    after 30 seconds, I still would also investigate an error in this
    direction.

    Regards,
    Timo


    On 13.01.21 09:47, Piotr Nowojski wrote:
 > Hi Yuval,
 >
 > I have a couple of questions:
 >
 > 1. I see that you are using Flink 1.12.0, is that correct?
 > 2. Have you tried running your application with a different Flink
 > version? If you are using 1.12.0, could you check Flink 1.11.3,
    or vice
 > versa?
 > 3. What's the configuration that you are using? For example, have
    you
 > enabled unaligned checkpoints or some other feature?
 > 4. Is the problem still there if you replace Kryo with something
    else
 > (Java's serialisation?)?
 > 5. Could it be a problem with dependency convergence? Like maybe
    there
 > are different versions of Flink jars present during runtime?
 > 6. Lastly, would it be possible for you to prepare a minimal 
example

 > that could reproduce the problem?
 >
 > Piotrek
 >
 > wt., 12 sty 2021 o 17:19 Yuval Itzchakov mailto:yuva...@gmail.com>
 > >> napisał(a):
 >
 >     Hi Chesnay,
 >     Turns out it didn't actually work, there were one or two
 >     successful runs but the problem still persists (it's a bit non
 >     deterministic, and doesn't always reproduce when parallelism
    is set
 >     to 1).
 >
 >     I turned off all Kryo custom serialization and am only using
    Flink
 >     provided one's ATM, the problem still persists.
 >     There seems to be an issue with how Flink serializes these
    raw types
 >     over the wire, but I still can't put my finger as to what the
 >     problem is.
 >
 >     What I can see is that Flink tries to consume a
    HybridMemorySegment
 >     which contains one of these custom raw types I have and
    because of
 >     malformed content it receives a negative length for the byte
    array:
 >
 >     image.png
 >
 >     Content seems to be prepended with a bunch of N

Re: Flink 1.12 Kryo Serialization Error

2021-01-18 Thread Timo Walther

Hi Yuval,

thanks for sharing some code with us. I scanned the code but could not 
find anything suspicious from an API perspective. By using the full RAW 
serializable string, we should actually be on the save side when it 
comes to configure the Kryo serializer.


I would suggest to further investigate in the checkpointing area if it 
only occurs when checkpointing is enabled.


Regards,
Timo

On 13.01.21 13:35, Yuval Itzchakov wrote:

Hi Timo and Piotr,

Let me try and answer all your questions:

Piotr:

1. Yes, I am using Flink 1.12.0
2. I have no tried downgrading to Flink 1.11.3, as I have features that 
are specific to 1.12 that I need (namely the ability to create 
a DataStreamScanProvider which was not available in previous versions)
3. I am using a pretty standard configuration. The only thing I've set 
was checkpointing (using the default MemoryStateBackend):


image.png
4. This is the interesting bit. When I try to create a small 
reproduction outside the codebase, using a simple source the issue does 
not reproduce, both with default Kryo serialization and with my own Kryo 
serializer.

5. No, here is the relevant bit of build.sbt (flinkVersion is set to 1.12)
image.png
6. I am trying to come up with a reproduction, thus far with no luck. 
Here's what I have so far: 
https://github.com/YuvalItzchakov/flink-bug-repro 
. I am afraid that 
there are many more moving parts that are affecting this issue (I have a 
custom flink source and sink involved)


Timo:

I am explicitly passing a serialized string of my custom Kryo serializer 
to the UDF (see 
https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31 
). 
I can validate that both serialization and deserialization invoke the 
method defined on my custom serializer, if that's what you mean.
Otherwise, if there's a mismatch between the two serializers Flink blows 
up at runtime saying that the types don't match.


On Wed, Jan 13, 2021 at 1:19 PM Timo Walther > wrote:


Hi Yuval,

could you share a reproducible example with us?

I see you are using SQL / Table API with a RAW type. I could imagine
that the KryoSerializer is configured differently when serializing and
when deserializing. This might be due to `ExecutionConfig` not shipped
(or copied) through the stack correctly.

Even though an error in the stack should be visible immediately and not
after 30 seconds, I still would also investigate an error in this
direction.

Regards,
Timo


On 13.01.21 09:47, Piotr Nowojski wrote:
 > Hi Yuval,
 >
 > I have a couple of questions:
 >
 > 1. I see that you are using Flink 1.12.0, is that correct?
 > 2. Have you tried running your application with a different Flink
 > version? If you are using 1.12.0, could you check Flink 1.11.3,
or vice
 > versa?
 > 3. What's the configuration that you are using? For example, have
you
 > enabled unaligned checkpoints or some other feature?
 > 4. Is the problem still there if you replace Kryo with something
else
 > (Java's serialisation?)?
 > 5. Could it be a problem with dependency convergence? Like maybe
there
 > are different versions of Flink jars present during runtime?
 > 6. Lastly, would it be possible for you to prepare a minimal example
 > that could reproduce the problem?
 >
 > Piotrek
 >
 > wt., 12 sty 2021 o 17:19 Yuval Itzchakov mailto:yuva...@gmail.com>
 > >> napisał(a):
 >
 >     Hi Chesnay,
 >     Turns out it didn't actually work, there were one or two
 >     successful runs but the problem still persists (it's a bit non
 >     deterministic, and doesn't always reproduce when parallelism
is set
 >     to 1).
 >
 >     I turned off all Kryo custom serialization and am only using
Flink
 >     provided one's ATM, the problem still persists.
 >     There seems to be an issue with how Flink serializes these
raw types
 >     over the wire, but I still can't put my finger as to what the
 >     problem is.
 >
 >     What I can see is that Flink tries to consume a
HybridMemorySegment
 >     which contains one of these custom raw types I have and
because of
 >     malformed content it receives a negative length for the byte
array:
 >
 >     image.png
 >
 >     Content seems to be prepended with a bunch of NULL values which
 >     through off the length calculation:
 >
 >     image.png
 >
 >     But I still don't have the entire chain of execution wrapped
 >     mentally in my head, trying to figure it out.
 >
 >    

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-14 Thread sagar
Thanks Yun



On Thu, Jan 14, 2021 at 1:58 PM Yun Gao  wrote:

> Hi Sagar,
>
>   I rechecked and found that the new kafka source is not formally publish
> yet, and a stable method I think may be try adding the FlinkKafkaConsumer
> as a BOUNDED source first. Sorry for the inconvient.
>
> Best,
>  Yun
>
> --
> Sender:Yun Gao
> Date:2021/01/14 15:26:54
> Recipient:Ardhani Narasimha; sagar<
> sagarban...@gmail.com>
> Cc:Flink User Mail List
> Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch
> mode (Flink 1.12)
>
> Hi Sagar,
>
>   I think the problem is that the legacy source implemented by
> extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use
> env.addSource(). Although there is hacky way to add the legacy sources as
> BOUNDED source [1], I think you may first have a try of new version of
> KafkaSource [2] ? The new version of KafkaSource is implemented with the
> new Source API [3], which provides unfied support for the streaming and
> batch mode.
>
> Best,
>  Yun
>
>
>
>
> [1]
> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
> [2]
> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
>
>
> --Original Mail --
> *Sender:*Ardhani Narasimha 
> *Send Date:*Thu Jan 14 15:11:35 2021
> *Recipients:*sagar 
> *CC:*Flink User Mail List 
> *Subject:*Re: Using Kafka as bounded source with DataStream API in batch
> mode (Flink 1.12)
>
>> Interesting use case.
>>
>> Can you please elaborate more on this.
>> On what criteria do you want to batch? Time? Count? Or Size?
>>
>> On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:
>>
>>> Hi Team,
>>>
>>> I am getting the following error while running DataStream API in with
>>> batch mode with kafka source.
>>> I am using FlinkKafkaConsumer to consume the data.
>>>
>>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source
>>> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not
>>> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
>>> at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>>> ~[flink-core-1.12.0.jar:1.12.0]
>>>
>>> In my batch program I wanted to work with four to five different stream
>>> in batch mode as data source is bounded
>>>
>>> I don't find any clear example of how to do it with kafka souce with
>>> Flink 1.12
>>>
>>> I don't want to use JDBC source as underlying database table may change.
>>> please give me some example on how to achieve the above use case.
>>>
>>> Also for any large bounded source are there any alternatives to
>>> achieve this?
>>>
>>>
>>>
>>> --
>>> ---Regards---
>>>
>>>   Sagar Bandal
>>>
>>> This is confidential mail ,All Rights are Reserved.If you are not
>>> intended receipiant please ignore this email.
>>>
>>
>>
>> ---
>> *IMPORTANT*: The contents of this email and any attachments are
>> confidential and protected by applicable laws. If you have received this
>> email by mistake, please (i) notify the sender immediately; (ii) delete it
>> from your database; and (iii) do not disclose the contents to anyone or
>> make copies thereof. Razorpay accepts no liability caused due to any
>> inadvertent/ unintentional data transmitted through this email.
>>
>> ---
>>
>
>

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-14 Thread Yun Gao
Hi Sagar,

  I rechecked and found that the new kafka source is not formally publish yet, 
and a stable method I think may be try adding the FlinkKafkaConsumer as a 
BOUNDED source first. Sorry for the inconvient. 

Best,
 Yun

--
Sender:Yun Gao
Date:2021/01/14 15:26:54
Recipient:Ardhani Narasimha; 
sagar
Cc:Flink User Mail List
Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Hi Sagar,

  I think the problem is that the legacy source implemented by extending 
SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). 
Although there is hacky way to add the legacy sources as BOUNDED source [1], I 
think you may first have a try of new version of KafkaSource [2] ? The new 
version of KafkaSource is implemented with the new Source API [3], which 
provides unfied support for the streaming and batch mode.

Best,
 Yun




[1] 
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
[2]  
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface




 --Original Mail --
Sender:Ardhani Narasimha 
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar 
CC:Flink User Mail List 
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:

Hi Team,

I am getting the following error while running DataStream API in with batch 
mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with 
the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, 
please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in 
batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please 
give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended 
receipiant please ignore this email.
---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---


Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread sagar
Hi Ardhani,

So whenever I want to run this flink job, I will call the Java API to put
the data to the four different kafka topics, what data to put into kafka
will be coded into those API and then once that is complete, I want to run
the flink job on the available data in the kafka and perform business
operation on all the available data.

I am not sure whether kafka as a datasource will be best for this use case,
but somehow I don't want to expose my flink job to database tables
directly.



Thanks & Regards,
Sagar


On Thu, Jan 14, 2021 at 12:41 PM Ardhani Narasimha <
ardhani.narasi...@razorpay.com> wrote:

> Interesting use case.
>
> Can you please elaborate more on this.
> On what criteria do you want to batch? Time? Count? Or Size?
>
> On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:
>
>> Hi Team,
>>
>> I am getting the following error while running DataStream API in with
>> batch mode with kafka source.
>> I am using FlinkKafkaConsumer to consume the data.
>>
>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source
>> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not
>> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>> ~[flink-core-1.12.0.jar:1.12.0]
>>
>> In my batch program I wanted to work with four to five different stream
>> in batch mode as data source is bounded
>>
>> I don't find any clear example of how to do it with kafka souce with
>> Flink 1.12
>>
>> I don't want to use JDBC source as underlying database table may change.
>> please give me some example on how to achieve the above use case.
>>
>> Also for any large bounded source are there any alternatives to
>> achieve this?
>>
>>
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>
>
> ---
> *IMPORTANT*: The contents of this email and any attachments are
> confidential and protected by applicable laws. If you have received this
> email by mistake, please (i) notify the sender immediately; (ii) delete it
> from your database; and (iii) do not disclose the contents to anyone or
> make copies thereof. Razorpay accepts no liability caused due to any
> inadvertent/ unintentional data transmitted through this email.
>
> ---
>


-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread Yun Gao
Hi Sagar,

  I think the problem is that the legacy source implemented by extending 
SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). 
Although there is hacky way to add the legacy sources as BOUNDED source [1], I 
think you may first have a try of new version of KafkaSource [2] ? The new 
version of KafkaSource is implemented with the new Source API [3], which 
provides unfied support for the streaming and batch mode.

Best,
 Yun




[1] 
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
[2]  
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface




 --Original Mail --
Sender:Ardhani Narasimha 
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar 
CC:Flink User Mail List 
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:

Hi Team,

I am getting the following error while running DataStream API in with batch 
mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with 
the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, 
please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in 
batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please 
give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended 
receipiant please ignore this email.
---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---

Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread Ardhani Narasimha
Interesting use case.

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:

> Hi Team,
>
> I am getting the following error while running DataStream API in with
> batch mode with kafka source.
> I am using FlinkKafkaConsumer to consume the data.
>
> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source
> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not
> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
> ~[flink-core-1.12.0.jar:1.12.0]
>
> In my batch program I wanted to work with four to five different stream in
> batch mode as data source is bounded
>
> I don't find any clear example of how to do it with kafka souce with Flink
> 1.12
>
> I don't want to use JDBC source as underlying database table may change.
> please give me some example on how to achieve the above use case.
>
> Also for any large bounded source are there any alternatives to
> achieve this?
>
>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>

-- 
---
**IMPORTANT**: The contents of this email and any attachments are 
confidential and protected by applicable laws. If you have received this 
email by mistake, please (i) notify the sender immediately; (ii) delete it 
from your database; and (iii) do not disclose the contents to anyone or 
make copies thereof. Razorpay accepts no liability caused due to any 
inadvertent/ unintentional data transmitted through this email.
---


Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-13 Thread sagar
Hi Team,

I am getting the following error while running DataStream API in with batch
mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source
with the 'execution.runtime-mode' set to 'BATCH'. This combination is not
allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in
batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink
1.12

I don't want to use JDBC source as underlying database table may change.
please give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to
achieve this?



-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Flink 1.12 Kryo Serialization Error

2021-01-13 Thread Timo Walther

Hi Yuval,

could you share a reproducible example with us?

I see you are using SQL / Table API with a RAW type. I could imagine 
that the KryoSerializer is configured differently when serializing and 
when deserializing. This might be due to `ExecutionConfig` not shipped 
(or copied) through the stack correctly.


Even though an error in the stack should be visible immediately and not 
after 30 seconds, I still would also investigate an error in this direction.


Regards,
Timo


On 13.01.21 09:47, Piotr Nowojski wrote:

Hi Yuval,

I have a couple of questions:

1. I see that you are using Flink 1.12.0, is that correct?
2. Have you tried running your application with a different Flink 
version? If you are using 1.12.0, could you check Flink 1.11.3, or vice 
versa?
3. What's the configuration that you are using? For example, have you 
enabled unaligned checkpoints or some other feature?
4. Is the problem still there if you replace Kryo with something else 
(Java's serialisation?)?
5. Could it be a problem with dependency convergence? Like maybe there 
are different versions of Flink jars present during runtime?
6. Lastly, would it be possible for you to prepare a minimal example 
that could reproduce the problem?


Piotrek

wt., 12 sty 2021 o 17:19 Yuval Itzchakov > napisał(a):


Hi Chesnay,
Turns out it didn't actually work, there were one or two
successful runs but the problem still persists (it's a bit non
deterministic, and doesn't always reproduce when parallelism is set
to 1).

I turned off all Kryo custom serialization and am only using Flink
provided one's ATM, the problem still persists.
There seems to be an issue with how Flink serializes these raw types
over the wire, but I still can't put my finger as to what the
problem is.

What I can see is that Flink tries to consume a HybridMemorySegment
which contains one of these custom raw types I have and because of
malformed content it receives a negative length for the byte array:

image.png

Content seems to be prepended with a bunch of NULL values which
through off the length calculation:

image.png

But I still don't have the entire chain of execution wrapped
mentally in my head, trying to figure it out.

An additional error I'm receiving, even when removing the
problematic JSON field and switching it out for a String:

java.lang.IllegalStateException: When there are multiple buffers, an
unfinished bufferConsumer can not be at the head of the buffers queue.
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
~[flink-core-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.

Flink 1.12 Kryo Serialization Error

2021-01-11 Thread Yuval Itzchakov
Hi,

I've implemented a KryoSerializer for a specific JSON type in my
application as I have a bunch of UDFs that depend on a RAW('io.circe.Json')
encoder being available. The implementation is rather simple. When I run my
Flink application with Kryo in trace logs, I see that data gets properly
serialized / deserialized using the serializer. However, after about 30
seconds, the application blows up with the following error:

Caused by: java.io.IOException: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: pos: 140513145180741,
length: 733793654, index: 69, offset: 0
at
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
... 11 more

Or with the following exception:

Caused by: java.lang.NegativeArraySizeException
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.fli

Re: read a tarred + gzipped file flink 1.12

2021-01-05 Thread Arvid Heise
Hi Billy,

I suspect that it's not possible in Flink as is. The tar file acts as a
directory containing an arbitrary number of files. Afaik, Flink assumes
that all compressed files or just single files, like gz without tar. It's
like this in your case, but then the tar part doesn't make much sense.

Since you cannot control the input, you have two options:
* External process that unpacks the file and then calls Flink.
* Implement your own input format similar to [1].

[1]
https://stackoverflow.com/questions/49122170/zip-compressed-input-for-apache-flink

On Mon, Dec 28, 2020 at 2:41 PM Billy Bain  wrote:

> We have an input file that is tarred and compressed to 12gb. It is about
> 50gb uncompressed.
>
> With readTextFile(), I see it uncompress the file but then flink doesn't
> seem to handle the untar portion. It's just a single file. (We don't
> control the input format)
>
> foo.tar.gz 12gb
> foo.tar  50gb
> then untar it and it is valid jsonl
>
> When reading, we get this exception:
>
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> Unrecognized token 'playstore': was expecting (JSON String, Number, Array,
> Object or token 'null', 'true' or 'false')
>  at [Source: UNKNOWN; line: 1, column: 10]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
>
> The process is seeing the header in the tar format and rightly complaining
> about the JSON format.
>
> Is it possible to untar this file using Flink?
>
> --
> Wayne D. Young
> aka Billy Bob Bain
> billybobb...@gmail.com
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Cannot start from savepoint using Flink 1.12 in standalone Kubernetes + Kubernetes HA

2020-12-29 Thread Yang Wang
This is a known issue. Please refer here[1] for more information. And it is
already fixed in master and 1.12 branch.
Also the next minor Flink release version(1.12.1) will include it. Maybe
you could help to verify that.


[1]. https://issues.apache.org/jira/browse/FLINK-20648

Best,
Yang

ChangZhuo Chen (陳昌倬)  于2020年12月30日周三 上午9:35写道:

> Hi,
>
> We cannot start job from savepoint (created by Flink 1.12, Standalone
> Kubernetes + zookeeper HA) in Flink 1.12, Standalone Kubernetes +
> Kubernetes HA. The following is the exception that stops the job.
>
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesException:
> Cannot retry checkAndUpdateConfigMap with configMap
> name-51e5afd90227d537ff442403d1b279da-jobmanager-leader because it does not
> exist.
>
>
> Cluster can start new job from scratch, so we think cluster
> configuration is good.
>
> The following is HA related config:
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: gs://some/path/recovery
> kubernetes.cluster-id: cluster-name
> kubernetes.context: kubernetes-context
> kubernetes.namespace: kubernetes-namespace
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Cannot start from savepoint using Flink 1.12 in standalone Kubernetes + Kubernetes HA

2020-12-29 Thread 陳昌倬
Hi,

We cannot start job from savepoint (created by Flink 1.12, Standalone
Kubernetes + zookeeper HA) in Flink 1.12, Standalone Kubernetes +
Kubernetes HA. The following is the exception that stops the job.

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
retry checkAndUpdateConfigMap with configMap 
name-51e5afd90227d537ff442403d1b279da-jobmanager-leader because it does not 
exist.


Cluster can start new job from scratch, so we think cluster
configuration is good.

The following is HA related config:

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: gs://some/path/recovery
kubernetes.cluster-id: cluster-name
kubernetes.context: kubernetes-context
kubernetes.namespace: kubernetes-namespace


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


read a tarred + gzipped file flink 1.12

2020-12-28 Thread Billy Bain
We have an input file that is tarred and compressed to 12gb. It is about
50gb uncompressed.

With readTextFile(), I see it uncompress the file but then flink doesn't
seem to handle the untar portion. It's just a single file. (We don't
control the input format)

foo.tar.gz 12gb
foo.tar  50gb
then untar it and it is valid jsonl

When reading, we get this exception:

Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
Unrecognized token 'playstore': was expecting (JSON String, Number, Array,
Object or token 'null', 'true' or 'false')
 at [Source: UNKNOWN; line: 1, column: 10]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)

The process is seeing the header in the tar format and rightly complaining
about the JSON format.

Is it possible to untar this file using Flink?

-- 
Wayne D. Young
aka Billy Bob Bain
billybobb...@gmail.com


Re: NullPointerException while calling TableEnvironment.sqlQuery in Flink 1.12

2020-12-21 Thread Danny Chan
Hi Yuval Itzchakov ~

The thread you paste has a different stake trace with your case.

In the pasted thread, the JaninoRelMetadataProvider was missed because we
only set it once in a thread local variable, when the RelMetadataQuery was
used in a different working thread, the JaninoRelMetadataProvider caused an
NPE.

For your case, based on the stack trace, this line throws ~

RelMetadataQuery line 114:

super(null);

But actually this line allows an empty argument and it should not throw.

Can you give a re-producecable case here so that we can debug and find more
evidence ?

Yuval Itzchakov  于2020年12月22日周二 上午1:52写道:

> Hi,
>
> While trying to execute a query via TableEnvironment.sqlQuery in Flink
> 1.12, I receive the following exception:
>
> java.lang.NullPointerException
> :114, RelMetadataQuery (org.apache.calcite.rel.metadata)
> :76, RelMetadataQuery (org.apache.calcite.rel.metadata)
> get:39, FlinkRelOptClusterFactory$$anon$1
> (org.apache.flink.table.planner.calcite)
> get:38, FlinkRelOptClusterFactory$$anon$1
> (org.apache.flink.table.planner.calcite)
> getMetadataQuery:178, RelOptCluster (org.apache.calcite.plan)
> create:108, LogicalFilter (org.apache.calcite.rel.logical)
> createFilter:344, RelFactories$FilterFactoryImpl
> (org.apache.calcite.rel.core)
> convertWhere:1042, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertSelectImpl:666, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertSelect:644, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertQueryRecursive:3438, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertQuery:570, SqlToRelConverter (org.apache.calcite.sql2rel)
> org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:165,
> FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
> rel:157, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
> toQueryOperation:823, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> convertSqlQuery:795, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> convert:250, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> parse:78, ParserImpl (org.apache.flink.table.planner.delegation)
> sqlQuery:639, TableEnvironmentImpl (org.apache.flink.table.api.internal)
> $anonfun$translateTemplate$2:476, Foo$ (Foo)
> apply:-1, 644680650 (ai.hunters.pipeline.Processors$$$Lambda$1597)
> evaluateNow:361, FiberContext (zio.internal)
> $anonfun$evaluateLater$1:778, FiberContext (zio.internal)
> run:-1, 289594359 (zio.internal.FiberContext$$Lambda$617)
> runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
> run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
> run:748, Thread (java.lang)
>
> This seems to be coming from the FlinkRelMetadataQuery class attempting to
> initialize all handlers:
>
> [image: image.png]
>
> This seems to be coming from the calcite shaded JAR
> inside "flink-table-planner-blink-1.12"
>
> Has anyone ran into this issue? I saw a thread in the chinese user group
> but I don't understand what's been said there (
> https://www.mail-archive.com/user-zh@flink.apache.org/msg05874.html)
> --
> Best Regards,
> Yuval Itzchakov.
>


NullPointerException while calling TableEnvironment.sqlQuery in Flink 1.12

2020-12-21 Thread Yuval Itzchakov
Hi,

While trying to execute a query via TableEnvironment.sqlQuery in Flink
1.12, I receive the following exception:

java.lang.NullPointerException
:114, RelMetadataQuery (org.apache.calcite.rel.metadata)
:76, RelMetadataQuery (org.apache.calcite.rel.metadata)
get:39, FlinkRelOptClusterFactory$$anon$1
(org.apache.flink.table.planner.calcite)
get:38, FlinkRelOptClusterFactory$$anon$1
(org.apache.flink.table.planner.calcite)
getMetadataQuery:178, RelOptCluster (org.apache.calcite.plan)
create:108, LogicalFilter (org.apache.calcite.rel.logical)
createFilter:344, RelFactories$FilterFactoryImpl
(org.apache.calcite.rel.core)
convertWhere:1042, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:666, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:644, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3438, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:570, SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:165,
FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:157, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:823, SqlToOperationConverter
(org.apache.flink.table.planner.operations)
convertSqlQuery:795, SqlToOperationConverter
(org.apache.flink.table.planner.operations)
convert:250, SqlToOperationConverter
(org.apache.flink.table.planner.operations)
parse:78, ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:639, TableEnvironmentImpl (org.apache.flink.table.api.internal)
$anonfun$translateTemplate$2:476, Foo$ (Foo)
apply:-1, 644680650 (ai.hunters.pipeline.Processors$$$Lambda$1597)
evaluateNow:361, FiberContext (zio.internal)
$anonfun$evaluateLater$1:778, FiberContext (zio.internal)
run:-1, 289594359 (zio.internal.FiberContext$$Lambda$617)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)

This seems to be coming from the FlinkRelMetadataQuery class attempting to
initialize all handlers:

[image: image.png]

This seems to be coming from the calcite shaded JAR
inside "flink-table-planner-blink-1.12"

Has anyone ran into this issue? I saw a thread in the chinese user group
but I don't understand what's been said there (
https://www.mail-archive.com/user-zh@flink.apache.org/msg05874.html)
-- 
Best Regards,
Yuval Itzchakov.


Re: Flink 1.12

2020-12-20 Thread Yang Wang
ter-id=k8s-ha-app1 \
>>>>> -Dkubernetes.container.image=flink:k8s-ha \
>>>>> -Dkubernetes.container.image.pull-policy=Always \
>>>>> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
>>>>> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2
>>>>> -Dtaskmanager.numberOfTaskSlots=4 \
>>>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>> \
>>>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>>>>> -Drestart-strategy=fixed-delay
>>>>> -Drestart-strategy.fixed-delay.attempts=10 \
>>>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>> \
>>>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>> \
>>>>> local:///opt/flink/examples/streaming/StateMachineExample.jar
>>>>>
>>>>> A couple of questions about it:
>>>>>
>>>>> ./bin/flink run-application -p 10 -t used to be ./bin/flink 
>>>>> run-application
>>>>> -t. What is -p 10?
>>>>> -Dkubernetes.container.image=flink:k8s-ha does it require a special
>>>>> container build?
>>>>>
>>>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>>>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>> \
>>>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>> \
>>>>>
>>>>> This is if I use HDFS for save pointing, right? I can instead use PVC
>>>>> - based save pointing, correct?
>>>>>
>>>>> Also I was trying to understand, how it works, and from the
>>>>> documentation it sounds like there is one active and one or
>>>>> more standby JMs. Can I control the amount of standby JMs?
>>>>>
>>>>> Finally, what is behavior on the rolling restart of JM deployment?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler 
>>>>> wrote:
>>>>>
>>>>> Unfortunately no; there are some discussions going on in the 
>>>>> docker-library/official-images
>>>>> PR <https://github.com/docker-library/official-images/pull/9249> that
>>>>> have to be resolved first, but currently these would require changes on 
>>>>> the
>>>>> Flink side that we cannot do (because it is already released!). We are not
>>>>> sure yet whether we can get the PR accepted and defer further changes to
>>>>> 1.12.1 .
>>>>>
>>>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>>>>
>>>>> Thanks.
>>>>> Do you have ETA for docker images?
>>>>>
>>>>>
>>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler 
>>>>> wrote:
>>>>>
>>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>>>> 2) Docker images are not yet published.
>>>>> 3) It is mentioned at the top of the Kubernetes HA Services
>>>>> documentation that it also works for the native Kubernetes integration.
>>>>>
>>>>> *Kubernetes high availability services can only be used when deploying
>>>>> to Kubernetes. Consequently, they can be configured when using 
>>>>> **standalone
>>>>> Flink on Kubernetes
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>**
>>>>> or the *
>>>>> *native Kubernetes integration
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>> *
>>>>>
>>>>> From what I understand you only need to configure the 3 listed
>>>>> options; the documentation also contains an example configuration
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>
>>>>> .
>>>>>
>>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>>>>
>>>>> It is great that Flink 1.12 is out. Several questions:
>>>>>
>>>>> 1. Is official Flink 1.12 distribution
>>>>> https://flink.apache.org/downloads.html specifies Scala versions, but
>>>>> not Java versions. Is it Java 8?
>>>>> 2. I do not see any 1.12 docker images here
>>>>> https://hub.docker.com/_/flink. Are they somewhere else?
>>>>> 3 Flink 1.12 introduces Kubernetes HA support
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html,
>>>>> but Flink native Kubernetes support
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>>>>  has
>>>>> no mentioning of HA. Are the 2 integrated? DO you have any examples of
>>>>> starting HA cluster using Flink native Kubernetes?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Flink 1.12

2020-12-20 Thread Boris Lublinsky
luence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s>
>>>> More carefully and found the sample, I was looking for:
>>>> 
>>>> ./bin/flink run-application -p 10 -t kubernetes-application 
>>>> -Dkubernetes.cluster-id=k8s-ha-app1 \
>>>> -Dkubernetes.container.image=flink:k8s-ha \ 
>>>> -Dkubernetes.container.image.pull-policy=Always \
>>>> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
>>>> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 
>>>> -Dtaskmanager.numberOfTaskSlots=4 \
>>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>  \
>>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>>>> -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
>>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>  \
>>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>  \
>>>> local:///opt/flink/examples/streaming/StateMachineExample.jar <>
>>>> 
>>>> A couple of questions about it:
>>>> 
>>>> ./bin/flink run-application -p 10 -t used to be ./bin/flink 
>>>> run-application -t. What is -p 10?
>>>> -Dkubernetes.container.image=flink:k8s-ha does it require a special 
>>>> container build?
>>>> 
>>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>  \
>>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>  \
>>>> 
>>>> This is if I use HDFS for save pointing, right? I can instead use PVC - 
>>>> based save pointing, correct?
>>>> 
>>>> Also I was trying to understand, how it works, and from the documentation 
>>>> it sounds like there is one active and one or 
>>>> more standby JMs. Can I control the amount of standby JMs?
>>>> 
>>>> Finally, what is behavior on the rolling restart of JM deployment?
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler >>>> <mailto:ches...@apache.org>> wrote:
>>>>> 
>>>>> Unfortunately no; there are some discussions going on in the 
>>>>> docker-library/official-images PR 
>>>>> <https://github.com/docker-library/official-images/pull/9249> that have 
>>>>> to be resolved first, but currently these would require changes on the 
>>>>> Flink side that we cannot do (because it is already released!). We are 
>>>>> not sure yet whether we can get the PR accepted and defer further changes 
>>>>> to 1.12.1 .
>>>>> 
>>>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>>>>> Thanks.
>>>>>> Do you have ETA for docker images?
>>>>>> 
>>>>>> 
>>>>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler >>>>>> <mailto:ches...@apache.org>> wrote:
>>>>>>> 
>>>>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>>>>>> 2) Docker images are not yet published.
>>>>>>> 3) It is mentioned at the top of the Kubernetes HA Services 
>>>>>>> documentation that it also works for the native Kubernetes integration.
>>>>>>> Kubernetes high availability services can only be used when deploying 
>>>>>>> to Kubernetes. Consequently, they can be configured when using 
>>>>>>> standalone Flink on Kubernetes 
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>
>>>>>>>  or the native Kubernetes integration 
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>>>> From what I understand you only need to configure the 3 listed options; 
>>>>>>> the documentation also contains an example configuration 
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>.
>>>>>>> 
>>>>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>>>>>>> It is great that Flink 1.12 is out. Several questions:
>>>>>>>> 
>>>>>>>> 1. Is official Flink 1.12 distribution 
>>>>>>>> https://flink.apache.org/downloads.html 
>>>>>>>> <https://flink.apache.org/downloads.html> specifies Scala versions, 
>>>>>>>> but not Java versions. Is it Java 8?
>>>>>>>> 2. I do not see any 1.12 docker images here 
>>>>>>>> https://hub.docker.com/_/flink <https://hub.docker.com/_/flink>. Are 
>>>>>>>> they somewhere else?
>>>>>>>> 3 Flink 1.12 introduces Kubernetes HA support 
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>>>  
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html>,
>>>>>>>>  but Flink native Kubernetes support 
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>>>>>>>  
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>>>>>  has no mentioning of HA. Are the 2 integrated? DO you have any 
>>>>>>>> examples of starting HA cluster using Flink native Kubernetes?
>>>>>>>> 
>>>>>>>>   
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 



Re: Flink 1.12

2020-12-20 Thread Yang Wang
t; based save pointing, correct?
>>>>
>>>> Also I was trying to understand, how it works, and from the
>>>> documentation it sounds like there is one active and one or
>>>> more standby JMs. Can I control the amount of standby JMs?
>>>>
>>>> Finally, what is behavior on the rolling restart of JM deployment?
>>>>
>>>>
>>>>
>>>>
>>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler 
>>>> wrote:
>>>>
>>>> Unfortunately no; there are some discussions going on in the 
>>>> docker-library/official-images
>>>> PR <https://github.com/docker-library/official-images/pull/9249> that
>>>> have to be resolved first, but currently these would require changes on the
>>>> Flink side that we cannot do (because it is already released!). We are not
>>>> sure yet whether we can get the PR accepted and defer further changes to
>>>> 1.12.1 .
>>>>
>>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>>>
>>>> Thanks.
>>>> Do you have ETA for docker images?
>>>>
>>>>
>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler 
>>>> wrote:
>>>>
>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>>> 2) Docker images are not yet published.
>>>> 3) It is mentioned at the top of the Kubernetes HA Services
>>>> documentation that it also works for the native Kubernetes integration.
>>>>
>>>> *Kubernetes high availability services can only be used when deploying
>>>> to Kubernetes. Consequently, they can be configured when using **standalone
>>>> Flink on Kubernetes
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>**
>>>> or the *
>>>> *native Kubernetes integration
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>> *
>>>>
>>>> From what I understand you only need to configure the 3 listed options;
>>>> the documentation also contains an example configuration
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>
>>>> .
>>>>
>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>>>
>>>> It is great that Flink 1.12 is out. Several questions:
>>>>
>>>> 1. Is official Flink 1.12 distribution
>>>> https://flink.apache.org/downloads.html specifies Scala versions, but
>>>> not Java versions. Is it Java 8?
>>>> 2. I do not see any 1.12 docker images here
>>>> https://hub.docker.com/_/flink. Are they somewhere else?
>>>> 3 Flink 1.12 introduces Kubernetes HA support
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html,
>>>> but Flink native Kubernetes support
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>>>  has
>>>> no mentioning of HA. Are the 2 integrated? DO you have any examples of
>>>> starting HA cluster using Flink native Kubernetes?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: Flink 1.12

2020-12-19 Thread Boris Lublinsky
ointing, correct?
>>> 
>>> Also I was trying to understand, how it works, and from the documentation 
>>> it sounds like there is one active and one or 
>>> more standby JMs. Can I control the amount of standby JMs?
>>> 
>>> Finally, what is behavior on the rolling restart of JM deployment?
>>> 
>>> 
>>> 
>>> 
>>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler >>> <mailto:ches...@apache.org>> wrote:
>>>> 
>>>> Unfortunately no; there are some discussions going on in the 
>>>> docker-library/official-images PR 
>>>> <https://github.com/docker-library/official-images/pull/9249> that have to 
>>>> be resolved first, but currently these would require changes on the Flink 
>>>> side that we cannot do (because it is already released!). We are not sure 
>>>> yet whether we can get the PR accepted and defer further changes to 1.12.1 
>>>> .
>>>> 
>>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>>>> Thanks.
>>>>> Do you have ETA for docker images?
>>>>> 
>>>>> 
>>>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler >>>>> <mailto:ches...@apache.org>> wrote:
>>>>>> 
>>>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>>>>> 2) Docker images are not yet published.
>>>>>> 3) It is mentioned at the top of the Kubernetes HA Services 
>>>>>> documentation that it also works for the native Kubernetes integration.
>>>>>> Kubernetes high availability services can only be used when deploying to 
>>>>>> Kubernetes. Consequently, they can be configured when using standalone 
>>>>>> Flink on Kubernetes 
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>
>>>>>>  or the native Kubernetes integration 
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>>> From what I understand you only need to configure the 3 listed options; 
>>>>>> the documentation also contains an example configuration 
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>.
>>>>>> 
>>>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>>>>>> It is great that Flink 1.12 is out. Several questions:
>>>>>>> 
>>>>>>> 1. Is official Flink 1.12 distribution 
>>>>>>> https://flink.apache.org/downloads.html 
>>>>>>> <https://flink.apache.org/downloads.html> specifies Scala versions, but 
>>>>>>> not Java versions. Is it Java 8?
>>>>>>> 2. I do not see any 1.12 docker images here 
>>>>>>> https://hub.docker.com/_/flink <https://hub.docker.com/_/flink>. Are 
>>>>>>> they somewhere else?
>>>>>>> 3 Flink 1.12 introduces Kubernetes HA support 
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>>  
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html>,
>>>>>>>  but Flink native Kubernetes support 
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>>>>>>  
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>>>>  has no mentioning of HA. Are the 2 integrated? DO you have any 
>>>>>>> examples of starting HA cluster using Flink native Kubernetes?
>>>>>>> 
>>>>>>>   
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 



Re: Flink 1.12

2020-12-17 Thread Yang Wang
 `kubectl edit deploy `. It should also work.
>>
>> Finally, what is behavior on the rolling restart of JM deployment?
>>
>> Once a JobManager terminated, it will lose the leadership and a standby
>> one will take over. So on the rolling restart of JM deployment, you will
>> find that the leader switches multiple times and your job also restarts
>> multiple times. I am not sure why you need to roll the JobManager
>> deployment. We are using deployment for JobManager in Flink just because we
>> want the JobManager to be launched once it crashed. Another reason for
>> multiple JobManagers is to get a faster recovery.
>>
>>
>> Best,
>> Yang
>>
>>
>> Boris Lublinsky  于2020年12月16日周三 上午9:09写道:
>>
>>> Thanks Chesney for your quick response,
>>> I read documentation
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s
>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s>
>>> More carefully and found the sample, I was looking for:
>>>
>>> ./bin/flink run-application -p 10 -t kubernetes-application
>>> -Dkubernetes.cluster-id=k8s-ha-app1 \
>>> -Dkubernetes.container.image=flink:k8s-ha \
>>> -Dkubernetes.container.image.pull-policy=Always \
>>> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
>>> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2
>>> -Dtaskmanager.numberOfTaskSlots=4 \
>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> \
>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>>> -Drestart-strategy=fixed-delay
>>> -Drestart-strategy.fixed-delay.attempts=10 \
>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>> \
>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>> \
>>> local:///opt/flink/examples/streaming/StateMachineExample.jar
>>>
>>> A couple of questions about it:
>>>
>>> ./bin/flink run-application -p 10 -t used to be ./bin/flink run-application
>>> -t. What is -p 10?
>>> -Dkubernetes.container.image=flink:k8s-ha does it require a special
>>> container build?
>>>
>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>> \
>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>> \
>>>
>>> This is if I use HDFS for save pointing, right? I can instead use PVC -
>>> based save pointing, correct?
>>>
>>> Also I was trying to understand, how it works, and from the
>>> documentation it sounds like there is one active and one or
>>> more standby JMs. Can I control the amount of standby JMs?
>>>
>>> Finally, what is behavior on the rolling restart of JM deployment?
>>>
>>>
>>>
>>>
>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler 
>>> wrote:
>>>
>>> Unfortunately no; there are some discussions going on in the 
>>> docker-library/official-images
>>> PR <https://github.com/docker-library/official-images/pull/9249> that
>>> have to be resolved first, but currently these would require changes on the
>>> Flink side that we cannot do (because it is already released!). We are not
>>> sure yet whether we can get the PR accepted and defer further changes to
>>> 1.12.1 .
>>>
>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>>
>>> Thanks.
>>> Do you have ETA for docker images?
>>>
>>>
>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler 
>>> wrote:
>>>
>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>> 2) Docker images are not yet published.
>>> 3) It is mentioned at the top of the Kubernetes HA Services
>>> documentation that it also works for the native Kubernetes integration.
>>>
>>> *Kubernetes high availability services can only be used when deploying
>>> to Kubernetes. Consequently, they can be configured when using **standalone
>>> Flink on Kubernetes
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>**
>>> or the *
>>> *native Kubernetes integration
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>> *
>>>
>>> From what I understand you only need to configure the 3 listed options;
>>> the documentation also contains an example configuration
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>
>>> .
>>>
>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>>
>>> It is great that Flink 1.12 is out. Several questions:
>>>
>>> 1. Is official Flink 1.12 distribution
>>> https://flink.apache.org/downloads.html specifies Scala versions, but
>>> not Java versions. Is it Java 8?
>>> 2. I do not see any 1.12 docker images here
>>> https://hub.docker.com/_/flink. Are they somewhere else?
>>> 3 Flink 1.12 introduces Kubernetes HA support
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html,
>>> but Flink native Kubernetes support
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>>  has
>>> no mentioning of HA. Are the 2 integrated? DO you have any examples of
>>> starting HA cluster using Flink native Kubernetes?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Flink 1.12

2020-12-17 Thread Boris Lublinsky
uce a config option for it. But you could do it 
>> manually via `kubectl edit deploy `. It should also work.
>> 
>> Finally, what is behavior on the rolling restart of JM deployment?
>> Once a JobManager terminated, it will lose the leadership and a standby one 
>> will take over. So on the rolling restart of JM deployment, you will find 
>> that the leader switches multiple times and your job also restarts multiple 
>> times. I am not sure why you need to roll the JobManager deployment. We are 
>> using deployment for JobManager in Flink just because we want the JobManager 
>> to be launched once it crashed. Another reason for multiple JobManagers is 
>> to get a faster recovery.
>> 
>> 
>> Best,
>> Yang
>>  
>> 
>> Boris Lublinsky > <mailto:boris.lublin...@lightbend.com>> 于2020年12月16日周三 上午9:09写道:
>> Thanks Chesney for your quick response,
>> I read documentation 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s
>>  
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s>
>> More carefully and found the sample, I was looking for:
>> 
>> ./bin/flink run-application -p 10 -t kubernetes-application 
>> -Dkubernetes.cluster-id=k8s-ha-app1 \
>> -Dkubernetes.container.image=flink:k8s-ha \ 
>> -Dkubernetes.container.image.pull-policy=Always \
>> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
>> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 
>> -Dtaskmanager.numberOfTaskSlots=4 \
>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>  \
>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>> -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>  \
>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>  \
>> local:///opt/flink/examples/streaming/StateMachineExample.jar <>
>> 
>> A couple of questions about it:
>> 
>> ./bin/flink run-application -p 10 -t used to be ./bin/flink run-application 
>> -t. What is -p 10?
>> -Dkubernetes.container.image=flink:k8s-ha does it require a special 
>> container build?
>> 
>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>  \
>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>  \
>> 
>> This is if I use HDFS for save pointing, right? I can instead use PVC - 
>> based save pointing, correct?
>> 
>> Also I was trying to understand, how it works, and from the documentation it 
>> sounds like there is one active and one or 
>> more standby JMs. Can I control the amount of standby JMs?
>> 
>> Finally, what is behavior on the rolling restart of JM deployment?
>> 
>> 
>> 
>> 
>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler >> <mailto:ches...@apache.org>> wrote:
>>> 
>>> Unfortunately no; there are some discussions going on in the 
>>> docker-library/official-images PR 
>>> <https://github.com/docker-library/official-images/pull/9249> that have to 
>>> be resolved first, but currently these would require changes on the Flink 
>>> side that we cannot do (because it is already released!). We are not sure 
>>> yet whether we can get the PR accepted and defer further changes to 1.12.1 .
>>> 
>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>>> Thanks.
>>>> Do you have ETA for docker images?
>>>> 
>>>> 
>>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler >>>> <mailto:ches...@apache.org>> wrote:
>>>>> 
>>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>>>> 2) Docker images are not yet published.
>>>>> 3) It is mentioned at the top of the Kubernetes HA Services documentation 
>>>>> that it also works for the native Kubernetes integration.
>>>>> Kubernetes high availability services can only be used when deploying to 
>>>>> Kubernetes. Consequently, they can be configured when using standalone 
>>>>> Flink on Kubernetes 
>>>>> <https://ci.apache.org/projects/flink/flink-docs-r

Re: Flink 1.12

2020-12-17 Thread Boris Lublinsky
response,
>> I read documentation 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s
>>  
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s>
>> More carefully and found the sample, I was looking for:
>> 
>> ./bin/flink run-application -p 10 -t kubernetes-application 
>> -Dkubernetes.cluster-id=k8s-ha-app1 \
>> -Dkubernetes.container.image=flink:k8s-ha \ 
>> -Dkubernetes.container.image.pull-policy=Always \
>> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
>> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 
>> -Dtaskmanager.numberOfTaskSlots=4 \
>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>  \
>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>> -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>  \
>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>  \
>> local:///opt/flink/examples/streaming/StateMachineExample.jar <>
>> 
>> A couple of questions about it:
>> 
>> ./bin/flink run-application -p 10 -t used to be ./bin/flink run-application 
>> -t. What is -p 10?
>> -Dkubernetes.container.image=flink:k8s-ha does it require a special 
>> container build?
>> 
>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>  \
>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>  \
>> 
>> This is if I use HDFS for save pointing, right? I can instead use PVC - 
>> based save pointing, correct?
>> 
>> Also I was trying to understand, how it works, and from the documentation it 
>> sounds like there is one active and one or 
>> more standby JMs. Can I control the amount of standby JMs?
>> 
>> Finally, what is behavior on the rolling restart of JM deployment?
>> 
>> 
>> 
>> 
>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler >> <mailto:ches...@apache.org>> wrote:
>>> 
>>> Unfortunately no; there are some discussions going on in the 
>>> docker-library/official-images PR 
>>> <https://github.com/docker-library/official-images/pull/9249> that have to 
>>> be resolved first, but currently these would require changes on the Flink 
>>> side that we cannot do (because it is already released!). We are not sure 
>>> yet whether we can get the PR accepted and defer further changes to 1.12.1 .
>>> 
>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>>> Thanks.
>>>> Do you have ETA for docker images?
>>>> 
>>>> 
>>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler >>>> <mailto:ches...@apache.org>> wrote:
>>>>> 
>>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>>>> 2) Docker images are not yet published.
>>>>> 3) It is mentioned at the top of the Kubernetes HA Services documentation 
>>>>> that it also works for the native Kubernetes integration.
>>>>> Kubernetes high availability services can only be used when deploying to 
>>>>> Kubernetes. Consequently, they can be configured when using standalone 
>>>>> Flink on Kubernetes 
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>
>>>>>  or the native Kubernetes integration 
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>> From what I understand you only need to configure the 3 listed options; 
>>>>> the documentation also contains an example configuration 
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>.
>>>>> 
>>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>>>>> It is great that Flink 1.12 is out. Several questions:
>>>>>> 
>>>>>> 1. Is official Flink 1.12 distribution 
>>>>>> https://flink.apache.org/downloads.html 
>>>>>> <https://flink.apache.org/downloads.html> specifies Scala versions, but 
>>>>>> not Java versions. Is it Java 8?
>>>>>> 2. I do not see any 1.12 docker images here 
>>>>>> https://hub.docker.com/_/flink <https://hub.docker.com/_/flink>. Are 
>>>>>> they somewhere else?
>>>>>> 3 Flink 1.12 introduces Kubernetes HA support 
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>  
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html>,
>>>>>>  but Flink native Kubernetes support 
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>>>>>  
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>>>  has no mentioning of HA. Are the 2 integrated? DO you have any examples 
>>>>>> of starting HA cluster using Flink native Kubernetes?
>>>>>> 
>>>>>>   
>>>>> 
>>>> 
>>> 
>> 
> 



Re: Flink 1.12

2020-12-17 Thread Yang Wang
r=oss://flink/flink-ha \
>> -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10
>> \
>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>> \
>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>> \
>> local:///opt/flink/examples/streaming/StateMachineExample.jar
>>
>> A couple of questions about it:
>>
>> ./bin/flink run-application -p 10 -t used to be ./bin/flink run-application
>> -t. What is -p 10?
>> -Dkubernetes.container.image=flink:k8s-ha does it require a special
>> container build?
>>
>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>> \
>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>> \
>>
>> This is if I use HDFS for save pointing, right? I can instead use PVC -
>> based save pointing, correct?
>>
>> Also I was trying to understand, how it works, and from the documentation
>> it sounds like there is one active and one or
>> more standby JMs. Can I control the amount of standby JMs?
>>
>> Finally, what is behavior on the rolling restart of JM deployment?
>>
>>
>>
>>
>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler 
>> wrote:
>>
>> Unfortunately no; there are some discussions going on in the 
>> docker-library/official-images
>> PR <https://github.com/docker-library/official-images/pull/9249> that
>> have to be resolved first, but currently these would require changes on the
>> Flink side that we cannot do (because it is already released!). We are not
>> sure yet whether we can get the PR accepted and defer further changes to
>> 1.12.1 .
>>
>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>
>> Thanks.
>> Do you have ETA for docker images?
>>
>>
>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler  wrote:
>>
>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>> 2) Docker images are not yet published.
>> 3) It is mentioned at the top of the Kubernetes HA Services documentation
>> that it also works for the native Kubernetes integration.
>>
>> *Kubernetes high availability services can only be used when deploying to
>> Kubernetes. Consequently, they can be configured when using **standalone
>> Flink on Kubernetes
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>**
>> or the *
>> *native Kubernetes integration
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>> *
>>
>> From what I understand you only need to configure the 3 listed options;
>> the documentation also contains an example configuration
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>
>> .
>>
>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>
>> It is great that Flink 1.12 is out. Several questions:
>>
>> 1. Is official Flink 1.12 distribution
>> https://flink.apache.org/downloads.html specifies Scala versions, but
>> not Java versions. Is it Java 8?
>> 2. I do not see any 1.12 docker images here
>> https://hub.docker.com/_/flink. Are they somewhere else?
>> 3 Flink 1.12 introduces Kubernetes HA support
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html,
>> but Flink native Kubernetes support
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>  has
>> no mentioning of HA. Are the 2 integrated? DO you have any examples of
>> starting HA cluster using Flink native Kubernetes?
>>
>>
>>
>>
>>
>>
>>
>>
>


Re: Flink 1.12

2020-12-17 Thread Boris Lublinsky
e active and one or 
> more standby JMs. Can I control the amount of standby JMs?
> 
> Finally, what is behavior on the rolling restart of JM deployment?
> 
> 
> 
> 
>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler > <mailto:ches...@apache.org>> wrote:
>> 
>> Unfortunately no; there are some discussions going on in the 
>> docker-library/official-images PR 
>> <https://github.com/docker-library/official-images/pull/9249> that have to 
>> be resolved first, but currently these would require changes on the Flink 
>> side that we cannot do (because it is already released!). We are not sure 
>> yet whether we can get the PR accepted and defer further changes to 1.12.1 .
>> 
>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>> Thanks.
>>> Do you have ETA for docker images?
>>> 
>>> 
>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler >>> <mailto:ches...@apache.org>> wrote:
>>>> 
>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>>> 2) Docker images are not yet published.
>>>> 3) It is mentioned at the top of the Kubernetes HA Services documentation 
>>>> that it also works for the native Kubernetes integration.
>>>> Kubernetes high availability services can only be used when deploying to 
>>>> Kubernetes. Consequently, they can be configured when using standalone 
>>>> Flink on Kubernetes 
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>
>>>>  or the native Kubernetes integration 
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>> From what I understand you only need to configure the 3 listed options; 
>>>> the documentation also contains an example configuration 
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>.
>>>> 
>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>>>> It is great that Flink 1.12 is out. Several questions:
>>>>> 
>>>>> 1. Is official Flink 1.12 distribution 
>>>>> https://flink.apache.org/downloads.html 
>>>>> <https://flink.apache.org/downloads.html> specifies Scala versions, but 
>>>>> not Java versions. Is it Java 8?
>>>>> 2. I do not see any 1.12 docker images here 
>>>>> https://hub.docker.com/_/flink <https://hub.docker.com/_/flink>. Are they 
>>>>> somewhere else?
>>>>> 3 Flink 1.12 introduces Kubernetes HA support 
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html>,
>>>>>  but Flink native Kubernetes support 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>>  has no mentioning of HA. Are the 2 integrated? DO you have any examples 
>>>>> of starting HA cluster using Flink native Kubernetes?
>>>>> 
>>>>>   
>>>> 
>>> 
>> 
> 



Re: Flink 1.12

2020-12-16 Thread Boris Lublinsky
-images PR 
>> <https://github.com/docker-library/official-images/pull/9249> that have to 
>> be resolved first, but currently these would require changes on the Flink 
>> side that we cannot do (because it is already released!). We are not sure 
>> yet whether we can get the PR accepted and defer further changes to 1.12.1 .
>> 
>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>> Thanks.
>>> Do you have ETA for docker images?
>>> 
>>> 
>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler >>> <mailto:ches...@apache.org>> wrote:
>>>> 
>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>>> 2) Docker images are not yet published.
>>>> 3) It is mentioned at the top of the Kubernetes HA Services documentation 
>>>> that it also works for the native Kubernetes integration.
>>>> Kubernetes high availability services can only be used when deploying to 
>>>> Kubernetes. Consequently, they can be configured when using standalone 
>>>> Flink on Kubernetes 
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>
>>>>  or the native Kubernetes integration 
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>> From what I understand you only need to configure the 3 listed options; 
>>>> the documentation also contains an example configuration 
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>.
>>>> 
>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>>>> It is great that Flink 1.12 is out. Several questions:
>>>>> 
>>>>> 1. Is official Flink 1.12 distribution 
>>>>> https://flink.apache.org/downloads.html 
>>>>> <https://flink.apache.org/downloads.html> specifies Scala versions, but 
>>>>> not Java versions. Is it Java 8?
>>>>> 2. I do not see any 1.12 docker images here 
>>>>> https://hub.docker.com/_/flink <https://hub.docker.com/_/flink>. Are they 
>>>>> somewhere else?
>>>>> 3 Flink 1.12 introduces Kubernetes HA support 
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html>,
>>>>>  but Flink native Kubernetes support 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>>  has no mentioning of HA. Are the 2 integrated? DO you have any examples 
>>>>> of starting HA cluster using Flink native Kubernetes?
>>>>> 
>>>>>   
>>>> 
>>> 
>> 
> 



  1   2   >