Questions about java enum when convert DataStream to Table

2023-08-01 Thread haishui
I want to convert dataStream to Table. The type of dataSream is a POJO, which 
contains a enum field.


1. The enum field is RAW('classname', '...') in table. When I execute `SELECT * 
FROM t_test` and print the result, It throws EOFException.
2. If I assign the field is STRING in schema, It throws cannot cast "TestEnum" 
to "java.lang.String"


Is there any way to define the enum field as STRING in table?


My code is as follows:
Flink 1.17.1
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource source = env.fromElements(
new TestData("1", TestEnum.A),
new TestData("2", TestEnum.B)
);
Schema schema = Schema
 .newBuilder()
 .column("s", DataTypes.STRING())
 .column("en", DataTypes.STRING())
 .build();
Table table = tableEnv.fromDataStream(source);
tableEnv.createTemporaryView("t_test", table);
tableEnv.executeSql("DESC t_test").print();
tableEnv.executeSql("select * from t_test").print();
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class TestData {
private String s;
private TestEnum en;
}

public enum TestEnum {
A, B, C
}
++++
| op |  s | en |
++++
| +I |  1 | SqlRawValue{?} |
Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: 
java.io.EOFException
at 
org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:66)
at GeneratedCastExecutor$1.cast(Unknown Source)
at 
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.lambda$init$0(RowDataToStringConverterImpl.java:74)
at 
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.convert(RowDataToStringConverterImpl.java:87)
at 
org.apache.flink.table.utils.print.TableauStyle.rowFieldsToString(TableauStyle.java:167)
at 
org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:148)
at 
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
Caused by: java.io.EOFException
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:202)
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170)
at 
org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:96)
at 
org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:36)
at 
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:505)
at 
org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
... 7 more

Re: Is there any way I can do aggregation on DataStream in Apache Flink and store the Aggregated value?

2023-08-01 Thread liu ron
Hi, Muazim

Flink is an incremental computing framework, in streaming mode it considers
data to be unbounded, so every piece of data that comes in triggers the
computation logic because it doesn't know when the data will end.
Based on your description, I understand that you may have a full data
computation requirement and only need to compute once after seeing all the
data, so you can try setting the execution mode to batch mode[1].

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

[1]:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/

Best
Ron


Muazim Wani  于2023年8月1日周二 18:04写道:

> Hi Team,
> I am new to Flink. I have this use case where I have a dataStream of
> Doubles and I am trying to get the total sum of whole DataStream.
>
> I have used ReduceFunction and AggregateFunction.
>
> Case 1: In Reduced function the output is dataStream of rolling Sum. To
> get the final Sum I have to traverse the outputStream and the last value
> would be my total . In my case, I don't want to iterate the whole
> dataStream to get the final Sum and also don't wan't to use an extra
> DataStream to just store the final aggregated value.
>
> Case 2: I am able to access aggregate() method only after countWindow()
> and in countWindow() we have to pass the size. As I don't know the size of
> my dataStream(User will be sending data to me) I can't use it.
>
> Below is my implementation of ReduceFunction
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> DataStreamSource dataStream = env.fromElements(2.00, 3.00, 4.00,
> 11.00, 13.00, 14.00);
>
> DataStream singleOutputStreamOperator = dataStream.keyBy(value ->
> "key").reduce(new ReduceFunction() {
>   @Override
>   public Double reduce(Double aDouble, Double t1) throws Exception {
> return aDouble+ t1;
>   }
> });
>
> singleOutputStreamOperator.print();
> DataStream.Collector doubleCollector = new
> DataStream.Collector<>();
> singleOutputStreamOperator.collectAsync(doubleCollector);
> singleOutputStreamOperator.executeAndCollect("Aggregation");
>
> Double result = null;
> while( doubleCollector.getOutput().hasNext() ) {
>   result = doubleCollector.getOutput().next();
>   System.out.println("result = " + result);
> }
> The output looks like : 2.0, 5.0, 9.0, 20.0, 33.0. I simply want to get
> 33.0 as my aggregated value and store it in variable and give it to user.
>
> Is there any better way to solve it for my useCase?
>
> Thanks and regards
>


Re: Apache Atlas - Flink Integration

2023-08-01 Thread arjun s
Hi Shammon,

Thank you for your prompt reply.Aslo  I'm interested to know if there is an
available feature for integrating Apache Flink with Apache Ranger. If so,
could you kindly share the relevant documentation with me?

Thanks & Regards,
Arjun


Re: Kafka Disconnection Error

2023-08-01 Thread liu ron
Hi, Kenan

I think you maybe can get help from Kafka community. IMO, it is just an
info level log, whether it has a real impact?


Best,
Ron

Kenan Kılıçtepe  于2023年8月2日周三 06:04写道:

> I got a lot of these disconnection error logs. Why? My flink and kafka
> clusters are running in Google Cloud and I dont think there is a network
> issue. Also I got this error even my workload is very low.
>
> 2023-08-01 21:54:00,003 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-7] Node -1 disconnected.
> 2023-08-01 21:54:00,003 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-15] Node -1 disconnected.
> 2023-08-01 21:54:00,004 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-3] Node -1 disconnected.
> 2023-08-01 21:54:00,108 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-39] Node -1 disconnected.
> 2023-08-01 21:54:00,109 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-11] Node -1 disconnected.
> 2023-08-01 21:54:03,000 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-18] Node -1 disconnected.
> 2023-08-01 21:54:05,128 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Consumer clientId=cpu_46330-9,
> groupId=cpualgosgroup1] Disconnecting from node 2 due to request timeout.
> 2023-08-01 21:54:05,128 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Consumer clientId=cpu_46330-9,
> groupId=cpualgosgroup1] Cancelled in-flight FETCH request with correlation
> id 3774321 due to node 2 being disconnected (elapsed time since creation:
> 38505ms, elapsed time since send: 38505ms, request timeout: 3ms)
> 2023-08-01 21:54:05,128 INFO  org.apache.kafka.clients.FetchSessionHandler
> [] - [Consumer clientId=cpu_46330-9,
> groupId=cpualgosgroup1] Error sending fetch request (sessionId=661615761,
> epoch=1) to node 2:
> org.apache.kafka.common.errors.DisconnectException: null
> 2023-08-01 21:54:09,198 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Consumer clientId=cpu_46330-7,
> groupId=cpualgosgroup1] Disconnecting from node 1 due to request timeout.
> 2023-08-01 21:54:09,198 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Consumer clientId=cpu_46330-7,
> groupId=cpualgosgroup1] Cancelled in-flight FETCH request with correlation
> id 6902407 due to node 1 being disconnected (elapsed time since creation:
> 38677ms, elapsed time since send: 38677ms, request timeout: 3ms)
> 2023-08-01 21:54:09,198 INFO  org.apache.kafka.clients.FetchSessionHandler
> [] - [Consumer clientId=cpu_46330-7,
> groupId=cpualgosgroup1] Error sending fetch request (sessionId=1602736870,
> epoch=1) to node 1:
> org.apache.kafka.common.errors.DisconnectException: null
> 2023-08-01 21:54:10,425 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-12] Node -1 disconnected.
> 2023-08-01 21:54:10,427 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-45] Node -1 disconnected.
> 2023-08-01 21:54:19,148 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Consumer clientId=cpu_46330-2,
> groupId=cpualgosgroup1] Disconnecting from node 2 due to request timeout.
> 2023-08-01 21:54:19,148 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Consumer clientId=cpu_46330-2,
> groupId=cpualgosgroup1] Cancelled in-flight FETCH request with correlation
> id 7250900 due to node 2 being disconnected (elapsed time since creation:
> 41074ms, elapsed time since send: 41073ms, request timeout: 3ms)
> 2023-08-01 21:54:19,148 INFO  org.apache.kafka.clients.FetchSessionHandler
> [] - [Consumer clientId=cpu_46330-2,
> groupId=cpualgosgroup1] Error sending fetch request (sessionId=151959173,
> epoch=INITIAL) to node 2:
> org.apache.kafka.common.errors.DisconnectException: null
> 2023-08-01 21:59:00,002 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-27] Node 2 disconnected.
> 2023-08-01 21:59:00,003 INFO  org.apache.kafka.clients.NetworkClient
> [] - [Producer clientId=producer-13] Node 2 disconnected.
>
>


Re: Custom connector Update flink 1.13.3 failed notsuchMethod

2023-08-01 Thread Hang Ruan
Hi, longfeng.

I think you should rebuild your connector according to the new API. The
return type of the method `DynamicTableFactory$Context.getCatalogTable()`
is already changed from `CatalogTable` to `ResolvedCatalogTable`[].

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-21913

longfeng Xu  于2023年8月2日周三 08:26写道:

> Flink 1.13.3
>
> Custom connector  Using flink Kafka connector code and little refactoring;
>
> And Custom connector can be load in flink 1.12 when using
> StreamTableEnvironment.
>
> Now flink upgrade to 1.13.3, custom connector dependencies also upgraded
> to 1.13.3
>
> But failed to load:
>
> java.lang.NoSuchMethodError: 'org.apache.flink.table.catalog.CatalogTable
> org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()
>
> Tried many ways but failed( shaded package to fat jar  like demo in
> website. )
>
> Thanks for your help
>


Re: Apache Atlas - Flink Integration

2023-08-01 Thread Shammon FY
Hi arjun,

As @Mate mentioned, the discussion of FLIP-314 has been completed and a
vote will be initiated soon. We would like to introduce the interfaces for
lineage in the next release of Flink after 1.18

Best,
Shammon FY


On Tue, Aug 1, 2023 at 11:07 PM Mate Czagany  wrote:

> Hi,
>
> Unfortunately the Atlas hook you've read about is only available in the
> Cloudera Flink solution and has not been made open-source.
>
> In the future FLIP-314[1] might offer a simple solution to implement the
> Atlas integration.
>
> Best Regards,
> Mate
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>
>
> arjun s  ezt írta (időpont: 2023. aug. 1., K,
> 16:23):
>
>> I am looking to integrate Apache Atlas with Apache Flink to capture Job
>> lineage. I found some references around it from Cloudera (CDP) and they are
>> using Atlas-flink hook , but I am not able to find any documentation or
>> implementation.
>>
>> I had gone through the JIRA link as mentioned below.But in this link the
>> feature is still open.
>>
>> https://issues.apache.org/jira/browse/FLINK-6757
>>
>> I would like to know whether this feature on Apache Atlas with Apache
>> Flink is released or not. If yes, anyone could share with me the references
>> for integrating.
>>
>> Thanks and Regards,
>> Arjun S
>>
>


Flink operator task opens threads internally

2023-08-01 Thread Kamal Mittal via user
Hello Community,

I have a operator pipeline like as below, is it ok if "source" task opens 
threads by using java thread pool and parallelize the work?

This is needed for accepting multiple client socket connections in "single 
custom source server socket function".

Single Custom source server socket function - ServerSocket serversocket = new 
ServerSocket();
Now using thread pool accept multiple connections in separate threads = new 
Runnable () -> serversocket.accept();


[cid:image001.png@01D9C51A.0C833FD0]

Rgds,
Kamal


Re: No. of task managers vs No. of task slots

2023-08-01 Thread Shammon FY
Hi Kamal,

For the three points
> 3. What is the difference between high no. of task managers vs high no.
of task slots (with low no. of task managers)?

I think this is mainly limited by the JVM's efficiency in managing memory.
When we use Flink Session cluster as olap engine, we found that when the
memory of a single TM exceeds specified threshold such as 50g or 100g,
performance may fall back due to GC and other issues. We currently do not
have time to pinpoint the specific reason. So from this perspective, I
think memory limits the number of computing tasks per TM and the number of
tasks per TM also limits the number of slots.

Best,
Shammon FY

On Tue, Aug 1, 2023 at 4:22 PM liu ron  wrote:

> Hi, Kamal
>
> > How many task managers a job manager can handle? Is there any upper
> limit also?
>
> There is no clear limit to how many TMs a JM can cover, and based on my
> past experience, it can handle TMs over 1000+, even more.
>
> > How to decide no. of task managers, is there any way?
>
> I don't think there is a specific formula here, but rather the number of
> TMs based on your actual business scenario.
>
> > What is the difference between high no. of task managers vs high no. of
> task slots (with low no. of task managers)?
>
> Flink uses the SlotSharigGroup mechanism, by default all the operators of
> the pipeline will run on a Slot, this mechanism will lead to all the
> operators will share a share of memory with each other, there will be
> competition, all the Slots of a TM also share the memory of the TM. If
> there are too many Slots on a TM, they may interfere with each other and
> affect the stability of the job. If your job requires high stability, it
> may make more sense to take a high no. of TM way, with fewer Slots per TM.
> However, too many TMs may lead to too much network transmission overhead,
> so if the latency requirement of the job is higher, it is more appropriate
> to adopt the approach of high no. of slot way.
>
> Best,
> Ron
>
>
> Kamal Mittal via user  于2023年8月1日周二 14:21写道:
>
>> Hell Community,
>>
>>
>>
>> Need info. for below –
>>
>>
>>
>>1. How many task managers a job manager can handle? Is there any
>>upper limit also?
>>
>>
>>
>>1. How to decide no. of task managers, is there any way?
>>
>>
>>
>>1. What is the difference between high no. of task managers vs high
>>no. of task slots (with low no. of task managers)?
>>
>>
>>
>> Rgds,
>>
>> Kamal
>>
>


RE: Flink netty connector for TCP source

2023-08-01 Thread Kamal Mittal via user
Thanks Shammon.

Purpose of opening server socket in Split Enumerator was that it has only one 
instance per source and so the server socket too (port binding can happen only 
once). And then accepted Socket connections (serversocket.accept()) will act as 
splits which will be further processed by readers.

Let me know please if there is issue you see in above understanding or some 
other way you can suggest. Issue is that server socket can only bind a port 
once over a machine node and that’s why we thought to open it in split 
enumerator which is called only once and per source.

Rgds,
Kamal

From: Shammon FY 
Sent: 02 August 2023 07:48 AM
To: Kamal Mittal ; user@flink.apache.org
Subject: Re: Flink netty connector for TCP source

Hi Kamal,

It confuses me a little that what's the purpose of opening a server socket in 
SplitEnumerator? Currently there will be only one SplitEnumerator instance in 
JobManager for each one source not each source subtask. If there's only one 
source in your job, no matter how much parallelism this source has, there will 
be only one SplitEnumerator instance in JM which can connect and send events to 
the source subtasks.

Additionally, if you want to read data from a netty source, you can implement 
your NettySplitEnumerator to get splits from netty source and assign them to a 
NettySourceReader which will read data from netty source according to the 
splits.

Best,
Shammon FY


On Tue, Aug 1, 2023 at 12:02 PM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

I created a custom server socket source in which opened server socket in split 
enumerator, source parallelism is =2 and it gives error while running - 
“Address is already in use” i.e. it tried to open two server sockets as per 
parallelism which is not correct as parallelism is applicable for source 
operator and not for split enumerator?

Please correct me if above understanding is not correct.

Rgds,
Kamal

From: Hang Ruan mailto:ruanhang1...@gmail.com>>
Sent: 01 August 2023 08:55 AM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: liu ron mailto:ron9@gmail.com>>; 
user@flink.apache.org
Subject: Re: Flink netty connector for TCP source

Hi, Kamal.

The SplitEnumerator is contained in the SourceCoordinator. They are only used 
in JM.

Best,
Hang

Kamal Mittal via user mailto:user@flink.apache.org>> 
于2023年8月1日周二 10:43写道:
Thanks.

I looked at the link for custom data sources, one query here that how to make 
sure for Split enumerator to execute on Job Manager rather than at Task manager?

From: liu ron mailto:ron9@gmail.com>>
Sent: 31 July 2023 10:06 PM
To: user@flink.apache.org
Subject: Re: Flink netty connector for TCP source

Hi, Kamal
Currently, we don't provide the connector like TCP  source in Flink main repo. 
If you need this connector, you can try to implement it refer to the FLIP-27 
source docs

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

Best,
Ron

Shammon FY mailto:zjur...@gmail.com>> 于2023年7月27日周四 11:23写道:
I cannot find any information about netty source in flink website and it is not 
in the connector list[1], so I'm think that it is not supported by flink 
community

[1]  
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/

Best,
Shammon FY

On Thu, Jul 27, 2023 at 10:53 AM Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello Shammon,

Yes socket text stream I am aware of but was thinking if something like as 
‘https://github.com/apache/bahir-flink/tree/master/flink-connector-netty’
 is also supported by Flink?

Rgds,
Kamal

From: Shammon FY mailto:zjur...@gmail.com>>
Sent: 27 July 2023 08:15 AM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Flink netty connector for TCP source

Hi Kamal,

There's socket text stream in `DataStream` and you can refer to [1] for more 
details.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program

Best,
Shammon FY

On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Does flink provides netty connector for custom TCP source?

Any documentation details please share?

Rgds,
Kamal


Re: Flink netty connector for TCP source

2023-08-01 Thread Shammon FY
Hi Kamal,

It confuses me a little that what's the purpose of opening a server socket
in SplitEnumerator? Currently there will be only one SplitEnumerator
instance in JobManager for each one source not each source subtask. If
there's only one source in your job, no matter how much parallelism this
source has, there will be only one SplitEnumerator instance in JM which can
connect and send events to the source subtasks.

Additionally, if you want to read data from a netty source, you can
implement your NettySplitEnumerator to get splits from netty source and
assign them to a NettySourceReader which will read data from netty source
according to the splits.

Best,
Shammon FY


On Tue, Aug 1, 2023 at 12:02 PM Kamal Mittal via user 
wrote:

> Hello,
>
>
>
> I created a custom server socket source in which opened server socket in
> split enumerator, source parallelism is =2 and it gives error while running
> - “Address is already in use” i.e. it tried to open two server sockets as
> per parallelism which is not correct as parallelism is applicable for
> source operator and not for split enumerator?
>
>
>
> Please correct me if above understanding is not correct.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Hang Ruan 
> *Sent:* 01 August 2023 08:55 AM
> *To:* Kamal Mittal 
> *Cc:* liu ron ; user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal.
>
>
>
> The SplitEnumerator is contained in the SourceCoordinator. They are only
> used in JM.
>
>
>
> Best,
>
> Hang
>
>
>
> Kamal Mittal via user  于2023年8月1日周二 10:43写道:
>
> Thanks.
>
>
>
> I looked at the link for custom data sources, one query here that how to
> make sure for Split enumerator to execute on Job Manager rather than at
> Task manager?
>
>
>
> *From:* liu ron 
> *Sent:* 31 July 2023 10:06 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal
>
> Currently, we don't provide the connector like TCP  source in Flink main
> repo. If you need this connector, you can try to implement it refer to the
> FLIP-27 source docs
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
>
>
>
> Best,
>
> Ron
>
>
>
> Shammon FY  于2023年7月27日周四 11:23写道:
>
> I cannot find any information about netty source in flink website and it
> is not in the connector list[1], so I'm think that it is not supported by
> flink community
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Thu, Jul 27, 2023 at 10:53 AM Kamal Mittal 
> wrote:
>
> Hello Shammon,
>
>
>
> Yes socket text stream I am aware of but was thinking if something like as
> ‘https://github.com/apache/bahir-flink/tree/master/flink-connector-netty
> ’
> is also supported by Flink?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Shammon FY 
> *Sent:* 27 July 2023 08:15 AM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi Kamal,
>
>
>
> There's socket text stream in `DataStream` and you can refer to [1] for
> more details.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Does flink provides netty connector for custom TCP source?
>
>
>
> Any documentation details please share?
>
>
>
> Rgds,
>
> Kamal
>
>


Custom connector Update flink 1.13.3 failed notsuchMethod

2023-08-01 Thread longfeng Xu
Flink 1.13.3

Custom connector  Using flink Kafka connector code and little refactoring;

And Custom connector can be load in flink 1.12 when using 
StreamTableEnvironment.

Now flink upgrade to 1.13.3, custom connector dependencies also upgraded to 
1.13.3

But failed to load:

java.lang.NoSuchMethodError: 'org.apache.flink.table.catalog.CatalogTable 
org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()

Tried many ways but failed( shaded package to fat jar  like demo in website. )

Thanks for your help

Kafka Disconnection Error

2023-08-01 Thread Kenan Kılıçtepe
I got a lot of these disconnection error logs. Why? My flink and kafka
clusters are running in Google Cloud and I dont think there is a network
issue. Also I got this error even my workload is very low.

2023-08-01 21:54:00,003 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-7] Node -1 disconnected.
2023-08-01 21:54:00,003 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-15] Node -1 disconnected.
2023-08-01 21:54:00,004 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-3] Node -1 disconnected.
2023-08-01 21:54:00,108 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-39] Node -1 disconnected.
2023-08-01 21:54:00,109 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-11] Node -1 disconnected.
2023-08-01 21:54:03,000 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-18] Node -1 disconnected.
2023-08-01 21:54:05,128 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=cpu_46330-9,
groupId=cpualgosgroup1] Disconnecting from node 2 due to request timeout.
2023-08-01 21:54:05,128 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=cpu_46330-9,
groupId=cpualgosgroup1] Cancelled in-flight FETCH request with correlation
id 3774321 due to node 2 being disconnected (elapsed time since creation:
38505ms, elapsed time since send: 38505ms, request timeout: 3ms)
2023-08-01 21:54:05,128 INFO  org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=cpu_46330-9,
groupId=cpualgosgroup1] Error sending fetch request (sessionId=661615761,
epoch=1) to node 2:
org.apache.kafka.common.errors.DisconnectException: null
2023-08-01 21:54:09,198 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=cpu_46330-7,
groupId=cpualgosgroup1] Disconnecting from node 1 due to request timeout.
2023-08-01 21:54:09,198 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=cpu_46330-7,
groupId=cpualgosgroup1] Cancelled in-flight FETCH request with correlation
id 6902407 due to node 1 being disconnected (elapsed time since creation:
38677ms, elapsed time since send: 38677ms, request timeout: 3ms)
2023-08-01 21:54:09,198 INFO  org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=cpu_46330-7,
groupId=cpualgosgroup1] Error sending fetch request (sessionId=1602736870,
epoch=1) to node 1:
org.apache.kafka.common.errors.DisconnectException: null
2023-08-01 21:54:10,425 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-12] Node -1 disconnected.
2023-08-01 21:54:10,427 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-45] Node -1 disconnected.
2023-08-01 21:54:19,148 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=cpu_46330-2,
groupId=cpualgosgroup1] Disconnecting from node 2 due to request timeout.
2023-08-01 21:54:19,148 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=cpu_46330-2,
groupId=cpualgosgroup1] Cancelled in-flight FETCH request with correlation
id 7250900 due to node 2 being disconnected (elapsed time since creation:
41074ms, elapsed time since send: 41073ms, request timeout: 3ms)
2023-08-01 21:54:19,148 INFO  org.apache.kafka.clients.FetchSessionHandler
[] - [Consumer clientId=cpu_46330-2,
groupId=cpualgosgroup1] Error sending fetch request (sessionId=151959173,
epoch=INITIAL) to node 2:
org.apache.kafka.common.errors.DisconnectException: null
2023-08-01 21:59:00,002 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-27] Node 2 disconnected.
2023-08-01 21:59:00,003 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-13] Node 2 disconnected.


Dependency injection framework for flink

2023-08-01 Thread Oscar Perez via user
Hi,
we are currently migrating some of our jobs into hexagonal architecture and
I have seen that we can use spring as dependency injection framework, see:

https://getindata.com/blog/writing-flink-jobs-using-spring-dependency-injection-framework/

Has anybody analyzed different JVM DI frameworks e.g guice, micronaut, etc
and feasibility and performance on apache flink?

using google I have found some issues with dagger and flink while
guice/spring seems better suited but I could not find a study of
performance recommendations from the flink community.

Thanks!
Oscar


Re: Apache Atlas - Flink Integration

2023-08-01 Thread Mate Czagany
Hi,

Unfortunately the Atlas hook you've read about is only available in the
Cloudera Flink solution and has not been made open-source.

In the future FLIP-314[1] might offer a simple solution to implement the
Atlas integration.

Best Regards,
Mate

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener


arjun s  ezt írta (időpont: 2023. aug. 1., K,
16:23):

> I am looking to integrate Apache Atlas with Apache Flink to capture Job
> lineage. I found some references around it from Cloudera (CDP) and they are
> using Atlas-flink hook , but I am not able to find any documentation or
> implementation.
>
> I had gone through the JIRA link as mentioned below.But in this link the
> feature is still open.
>
> https://issues.apache.org/jira/browse/FLINK-6757
>
> I would like to know whether this feature on Apache Atlas with Apache
> Flink is released or not. If yes, anyone could share with me the references
> for integrating.
>
> Thanks and Regards,
> Arjun S
>


Apache Atlas - Flink Integration

2023-08-01 Thread arjun s
I am looking to integrate Apache Atlas with Apache Flink to capture Job
lineage. I found some references around it from Cloudera (CDP) and they are
using Atlas-flink hook , but I am not able to find any documentation or
implementation.

I had gone through the JIRA link as mentioned below.But in this link the
feature is still open.

https://issues.apache.org/jira/browse/FLINK-6757

I would like to know whether this feature on Apache Atlas with Apache Flink
is released or not. If yes, anyone could share with me the references for
integrating.

Thanks and Regards,
Arjun S


Re: Flink Kubernetes Operator autoscaling GPU-based workload

2023-08-01 Thread Raihan Sunny via user
Hi,

I've tinkered around a bit more and found that the problem is actually with
Native mode vs Standalone mode. In the standalone mode, the pod definition
doesn't get a Resource request for nvidia/gpu, whereas in the Native mode
it does. I'll open another question since this isn't related to autoscaler
at all. Thanks.


Regards,
Sunny



On Tue, Aug 1, 2023 at 3:34 PM Gyula Fóra  wrote:

> The autoscaler only works for FlinkDeployments in Native mode. You should
> turn off the reactive scheduler mode as well because that's something
> completely different.
> After that you can check the autoscaler logs for more info.
>
> Gyula
>
> On Tue, Aug 1, 2023 at 10:33 AM Raihan Sunny via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>> I have a workload that depends on the GPU. I have only 1 GPU card. As per
>> the documentation I have added the necessary configurations and can run the
>> GPU workload in standalone REACTIVE mode with as many taskmanager instances
>> as required.
>>
>> I have set the number of task slots to 1 so that a raise in parallelism
>> causes a new pod to be created. I can scale up the job just fine in this
>> mode, however when I add autoscaling configurations to the FlinkDeployment
>> manifest, scaling up doesn't work. This is because with the autoscaling
>> manifest, there seems to be resource requests and limits are being
>> automatically set to the pods for the gpu. This is not the case with the
>> standalone mode which is why I guess scaling up doesn't cause any issues.
>>
>> So, what can I do to get the autoscaler working? I'm using Flink version
>> 1.17.1 with PyFlink and Flink Kubernetes Operator version 1.5.0.
>>
>>
>> Regards,
>> Sunny
>>
>> [image: SELISE]
>>
>> SELISE Group
>> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
>> Munich: Tal 44, 80331 München, Germany
>> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
>> Emirates
>> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
>> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
>> Bhutan
>>
>> Visit us: www.selisegroup.com
>>
>> *Important Note: This e-mail and any attachment are confidential and may
>> contain trade secrets and may well also be legally privileged or otherwise
>> protected from disclosure. If you have received it in error, you are on
>> notice of its status. Please notify us immediately by reply e-mail and then
>> delete this e-mail and any attachment from your system. If you are not the
>> intended recipient please understand that you must not copy this e-mail or
>> any attachment or disclose the contents to any other person. Thank you for
>> your cooperation.*
>>
>

-- 









SELISE Group
Zürich: The Circle 37, 8058 Zürich-Airport, 
Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: Building 3, 3rd 
Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas 
Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: Bhutan 
Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit us: 
www.selisegroup.com 




-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to any other person. 
Thank you for your cooperation.*


Is there any way I can do aggregation on DataStream in Apache Flink and store the Aggregated value?

2023-08-01 Thread Muazim Wani
Hi Team,
I am new to Flink. I have this use case where I have a dataStream of
Doubles and I am trying to get the total sum of whole DataStream.

I have used ReduceFunction and AggregateFunction.

Case 1: In Reduced function the output is dataStream of rolling Sum. To get
the final Sum I have to traverse the outputStream and the last value would
be my total . In my case, I don't want to iterate the whole dataStream to
get the final Sum and also don't wan't to use an extra DataStream to just
store the final aggregated value.

Case 2: I am able to access aggregate() method only after countWindow() and
in countWindow() we have to pass the size. As I don't know the size of my
dataStream(User will be sending data to me) I can't use it.

Below is my implementation of ReduceFunction

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource dataStream = env.fromElements(2.00, 3.00, 4.00,
11.00, 13.00, 14.00);

DataStream singleOutputStreamOperator = dataStream.keyBy(value ->
"key").reduce(new ReduceFunction() {
  @Override
  public Double reduce(Double aDouble, Double t1) throws Exception {
return aDouble+ t1;
  }
});

singleOutputStreamOperator.print();
DataStream.Collector doubleCollector = new DataStream.Collector<>();
singleOutputStreamOperator.collectAsync(doubleCollector);
singleOutputStreamOperator.executeAndCollect("Aggregation");

Double result = null;
while( doubleCollector.getOutput().hasNext() ) {
  result = doubleCollector.getOutput().next();
  System.out.println("result = " + result);
}
The output looks like : 2.0, 5.0, 9.0, 20.0, 33.0. I simply want to get
33.0 as my aggregated value and store it in variable and give it to user.

Is there any better way to solve it for my useCase?

Thanks and regards


Re: Flink Kubernetes Operator autoscaling GPU-based workload

2023-08-01 Thread Gyula Fóra
The autoscaler only works for FlinkDeployments in Native mode. You should
turn off the reactive scheduler mode as well because that's something
completely different.
After that you can check the autoscaler logs for more info.

Gyula

On Tue, Aug 1, 2023 at 10:33 AM Raihan Sunny via user 
wrote:

> Hi,
>
> I have a workload that depends on the GPU. I have only 1 GPU card. As per
> the documentation I have added the necessary configurations and can run the
> GPU workload in standalone REACTIVE mode with as many taskmanager instances
> as required.
>
> I have set the number of task slots to 1 so that a raise in parallelism
> causes a new pod to be created. I can scale up the job just fine in this
> mode, however when I add autoscaling configurations to the FlinkDeployment
> manifest, scaling up doesn't work. This is because with the autoscaling
> manifest, there seems to be resource requests and limits are being
> automatically set to the pods for the gpu. This is not the case with the
> standalone mode which is why I guess scaling up doesn't cause any issues.
>
> So, what can I do to get the autoscaler working? I'm using Flink version
> 1.17.1 with PyFlink and Flink Kubernetes Operator version 1.5.0.
>
>
> Regards,
> Sunny
>
> [image: SELISE]
>
> SELISE Group
> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
> Munich: Tal 44, 80331 München, Germany
> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
> Emirates
> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
> Bhutan
>
> Visit us: www.selisegroup.com
>
> *Important Note: This e-mail and any attachment are confidential and may
> contain trade secrets and may well also be legally privileged or otherwise
> protected from disclosure. If you have received it in error, you are on
> notice of its status. Please notify us immediately by reply e-mail and then
> delete this e-mail and any attachment from your system. If you are not the
> intended recipient please understand that you must not copy this e-mail or
> any attachment or disclose the contents to any other person. Thank you for
> your cooperation.*
>


Flink Kubernetes Operator autoscaling GPU-based workload

2023-08-01 Thread Raihan Sunny via user
Hi,

I have a workload that depends on the GPU. I have only 1 GPU card. As per
the documentation I have added the necessary configurations and can run the
GPU workload in standalone REACTIVE mode with as many taskmanager instances
as required.

I have set the number of task slots to 1 so that a raise in parallelism
causes a new pod to be created. I can scale up the job just fine in this
mode, however when I add autoscaling configurations to the FlinkDeployment
manifest, scaling up doesn't work. This is because with the autoscaling
manifest, there seems to be resource requests and limits are being
automatically set to the pods for the gpu. This is not the case with the
standalone mode which is why I guess scaling up doesn't cause any issues.

So, what can I do to get the autoscaler working? I'm using Flink version
1.17.1 with PyFlink and Flink Kubernetes Operator version 1.5.0.


Regards,
Sunny

-- 









SELISE Group
Zürich: The Circle 37, 8058 Zürich-Airport, 
Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: Building 3, 3rd 
Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas 
Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: Bhutan 
Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit us: 
www.selisegroup.com 




-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to any other person. 
Thank you for your cooperation.*


Re: No. of task managers vs No. of task slots

2023-08-01 Thread liu ron
Hi, Kamal

> How many task managers a job manager can handle? Is there any upper limit
also?

There is no clear limit to how many TMs a JM can cover, and based on my
past experience, it can handle TMs over 1000+, even more.

> How to decide no. of task managers, is there any way?

I don't think there is a specific formula here, but rather the number of
TMs based on your actual business scenario.

> What is the difference between high no. of task managers vs high no. of
task slots (with low no. of task managers)?

Flink uses the SlotSharigGroup mechanism, by default all the operators of
the pipeline will run on a Slot, this mechanism will lead to all the
operators will share a share of memory with each other, there will be
competition, all the Slots of a TM also share the memory of the TM. If
there are too many Slots on a TM, they may interfere with each other and
affect the stability of the job. If your job requires high stability, it
may make more sense to take a high no. of TM way, with fewer Slots per TM.
However, too many TMs may lead to too much network transmission overhead,
so if the latency requirement of the job is higher, it is more appropriate
to adopt the approach of high no. of slot way.

Best,
Ron


Kamal Mittal via user  于2023年8月1日周二 14:21写道:

> Hell Community,
>
>
>
> Need info. for below –
>
>
>
>1. How many task managers a job manager can handle? Is there any upper
>limit also?
>
>
>
>1. How to decide no. of task managers, is there any way?
>
>
>
>1. What is the difference between high no. of task managers vs high
>no. of task slots (with low no. of task managers)?
>
>
>
> Rgds,
>
> Kamal
>


Re: [EXTERNAL] Re: Query on flink-operator autoscale support

2023-08-01 Thread Gyula Fóra
The autoscaler scales jobs based on incoming data and processing
throughput. It's completely different from the reactive mod, if the
throughput/processing rate doesn't change it will not scale up even if you
have more resources available.

Also in native mode you cannot add pods to the cluster, Flink manages the
pod requests etc based on the resource needs.

I suggest you read the docs, try out the example that should help you get
started:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/

Gyula

On Sun, Jul 30, 2023 at 6:33 AM Xiao Ma  wrote:

> Hi Gyula,
>
> Could I ask if the autoscaler will support the native session cluster on
> 1.18 flink operator? Also, please correct me if I'm wrong. The autoscale
> will work similarly to the elastic scaling in reactive mode, right? For
> example, in the K8s cluster, if one pod is added to the session cluster,
> the job running on will be rebalanced to the new one, is it correct?
>
> Thank you very much.
> Xiao Ma
>
> On Wed, Feb 1, 2023 at 10:56 AM Gyula Fóra  wrote:
>
>> As I mentioned in the previous email, standalone mode is not on the
>> Autoscaler roadmap because the scheduling/resource model is different.
>> This applies to both standalone app and session clusters.
>>
>> Thanks
>> Gyula
>>
>> On Wed, Feb 1, 2023 at 4:48 PM Swathi Chandrashekar <
>> cswa...@microsoft.com> wrote:
>>
>>> Sure, thanks Gyula.
>>> Is there a roadmap to support standalone session clusters to scale based
>>> on the jobs added/deleted and change in parallelism ?
>>>
>>> Regards,
>>> Swathi C
>>>
>>> --
>>> *From:* Gyula Fóra 
>>> *Sent:* Wednesday, February 1, 2023 8:54 PM
>>> *To:* Swathi Chandrashekar 
>>> *Cc:* user@flink.apache.org 
>>> *Subject:* [EXTERNAL] Re: Query on flink-operator autoscale support
>>>
>>> The autoscaler currently only works with Native App clusters.
>>> Native session clusters may be supported in the future but standalone is
>>> not on our roadmap due to a different resource/scheduling model used.
>>>
>>> Gyula
>>>
>>> On Wed, Feb 1, 2023 at 4:22 PM Swathi Chandrashekar <
>>> cswa...@microsoft.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm was testing OSS flink operator with flink 1.17 for autoscale
>>> feature. I was able to scale the cluster based on load in application
>>> cluster in native mode, but the same did not work in standalone mode as the
>>> operator gave the following error as below [ both for app and session mode
>>> ].
>>>
>>> Is the autoscale supported for the following :
>>>
>>>1. Session cluster in standalone
>>>2. Session cluster in native
>>>3. App cluster in standalone
>>>
>>>
>>> Exception in thread "pool-4-thread-3" java.lang.NoSuchMethodError:
>>> org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient.create(Lorg/apache/flink/configuration/Configuration;Ljava/util/concurrent/ExecutorService;)Lorg/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient;
>>> at
>>> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.createNamespacedKubeClient(StandaloneFlinkService.java:105)
>>> at
>>> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.submitClusterInternal(StandaloneFlinkService.java:110)
>>> at
>>> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.deployApplicationCluster(StandaloneFlinkService.java:69)
>>> at
>>> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:180)
>>> at
>>> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:175)
>>> at
>>> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:58)
>>> at
>>> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:115)
>>> at
>>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:130)
>>> at
>>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>>> at
>>> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
>>> at
>>> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
>>> at
>>> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>>> at
>>> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
>>> at
>>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
>>> at
>>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
>>> at
>>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(Recon

Re: Flink Table API + Jacoco Plugin

2023-08-01 Thread Brendan Cortez
up

On Tue, 18 Jul 2023 at 09:27, Brendan Cortez 
wrote:

> Hi all!
>
> I'm trying to use the jacoco-maven-plugin and run unit-tests for Flink
> Table API, but they fail with an error (see file error_log_flink_17.txt for
> full error stacktrace in attachment):
> java.lang.IllegalArgumentException: fromIndex(2) > toIndex(0)
> ...
>
> I'm using:
> - Flink:
>   - flink-table-api-java-bridge 1.17.1
>   - flink-test-utils 1.17.1
>   - flink-table-test-utils 1.17.1
> - jacoco-maven-plugin 0.8.10
> - maven-surefire-plugin 3.0.0-M7
>
> Flink Table API tests are running correctly if:
> - jacoco-maven-plugin is not used
> - goal prepare-agent of jacoco-maven-plugin is not used
> - Flink 1.16.1 version is used
>
> Best regards,
> Brendan Cortez
>
>