Auditing sink using table api

2023-11-03 Thread Bo
Hello community,


I am looking for a way to perform auditing of the various sinks (mostly 
JdbcDynamicTableSink) using the table api.
By "auditing", I mean to log details of every row data coming into the sink, 
and any anormalies when the sink write to external systems.


Does flink have some kind of auditing mechanism in place? The only way I could 
see now is to make a custom sink that supports detail logging to external 
systems.


Any thoughts/suggestions?


Regards,


Bo

Re: Inquiry about ActiveResourceManager and StandaloneResourceManager in Flink

2023-11-03 Thread Yu Chen
Hi Steven,

As stated in the `StandaloneResourceManager` comments, the manager does not
acquire new resources and the user needs to manually start the Taskmanager
by themself.
While `ActiveResourceManager` achieves requesting or releasing resources on
demand(that's what active means) based on some resource frameworks ( like
yarn and k8s ).

As we know, different users have different environments in production, and
not all of them want to run in yarn or k8s (especially for local debugging,
Standalone Cluster is very convenient).
Therefore, Flink provides users with these two different resource managers
to deal with different usage scenarios.

Please feel free to correct me if there are any misunderstandings.

Best regards,
Yu Chen

Steven Chen  于2023年11月3日周五 13:28写道:

> Dear Flink Community,
>
>
> I am currently using Flink for my project and have a question regarding
> ActiveResourceManager and StandaloneResourceManager.
>
> What does "active" mean in ActiveResourceManager and why is
> StandaloneResourceManager not considered an active resource manager?
>
>
> Thank you for your time and assistance.
>
>
> Best regards,
> Steven Chen
>


Re: Flink custom parallel data source

2023-11-03 Thread David Anderson
> As you suggested message broker below then how it is feasible in this
case?

To my mind, the idea would be to use something like a socket source for
Kafka Connect. This would give you a simple, reliable way to get the data
stored into a replayable data store. You'd then be able to start, stop, and
redeploy the Flink app without worrying about data loss because the data
reception and storage would be decoupled from the data processing.

David

On Tue, Oct 31, 2023 at 7:50 PM Kamal Mittal via user 
wrote:
>
> Thanks for sharing views.
>
>
>
> Our client supports TCP stream based traffic only which is in some
proprietary format and need to decode that. System which is accepting this
traffic is flink based and that’s why all this tried with custom data
source?
>
>
>
> As you suggested message broker below then how it is feasible in this
case?
>
>
>
> From: Alexander Fedulov 
> Sent: 01 November 2023 01:54 AM
> To: Kamal Mittal 
> Cc: user@flink.apache.org
> Subject: Re: Flink custom parallel data source
>
>
>
> Flink natively supports a pull-based model for sources, where the source
operators request data from the external system when they are ready to
process it.  Implementing a TCP server socket operator essentially creates
a push-based source, which could lead to backpressure problems if the data
ingestion rate exceeds the processing rate. You also lose any delivery
guarantees because Flink's fault tolerance model relies on having
replayable sources.
>
> Is using a message broker not feasible in your case?
>
> Best,
>
> Alexander Fedulov
>
>
>
> On Tue, 31 Oct 2023 at 13:08, Kamal Mittal 
wrote:
>
> Hello,
>
>
>
> We are writing TCP server socket custom source function in which TCP
server socket listener will accept connections and read data.
>
> Single Custom source server socket function – ServerSocket serversocket =
new ServerSocket();
>
> Now using thread pool accept multiple connections in separate threads =
new Runnable () -> serversocket.accept();
>
> So client socket will be accepted and given to separate thread for read
data from TCP stream.
>
> Rgds,
>
> Kamal
>
> From: Alexander Fedulov 
> Sent: 31 October 2023 04:03 PM
> To: Kamal Mittal 
> Cc: user@flink.apache.org
> Subject: Re: Flink custom parallel data source
>
>
>
> Please note that SourceFunction API is deprecated and is due to be
removed, possibly in the next major version of Flink.
>
> Ideally you should not be manually spawning threads in your Flink
applications. Typically you would only perform data fetching in the sources
and do processing in the subsequent operators which you can scale
independently from the source parallelism. Can you describe what you are
trying to achieve?
>
>
>
> Best,
>
> Alexander Fedulov
>
>
>
> On Tue, 31 Oct 2023 at 07:22, Kamal Mittal via user 
wrote:
>
> Hello Community,
>
>
>
> I need to have a custom parallel data source (Flink
ParallelSourceFunction) for fetching data based on some custom logic. In
this source function, opening multiple threads via java thread pool to
distribute work further.
>
>
>
> These threads share Flink provided ‘SourceContext’ and collect records
via source_context.collect() method.
>
>
>
> Is it ok to share source context in separate threads and get data?
>
>
>
> Is there any issue for downstream operators due to above design?
>
>
>
> Rgds,
>
> Kamal

>


How to tell if job is being restarted in log?

2023-11-03 Thread John Smith
Hi I'm getting metaspace issues and I understand that certain libraries
like JDBC don't unload properly and we need to put them in the global class
path of flink.

But technically my jobs should not be restarting, so what can I look for in
the logs to see when the restart?


Re: flink-kubernetes-operator cannot handle SPECCHANGE for 100+ FlinkDeployments concurrently

2023-11-03 Thread Nicolas Fraison via user
Hi,

We have faced a similar issue with flink kubernetes operator, having
multiple operators running as leader at the same time.
On our side the issue was on the java-operator-sdk which was not well
killing operator that lost leadership (
https://github.com/operator-framework/java-operator-sdk/issues/2056).
The issue has been solved in java-operator-sdk 4.4.4 and version has been
bumped in flink kubernetes operator (
https://issues.apache.org/jira/browse/FLINK-33125/https://github.com/apache/flink-kubernetes-operator/pull/680
).
But this patch will probably only provided on flink kubernetes operator
1.17 so I would recommend not to rely on multiple operators or patch locally

Nicolas

On Fri, Nov 3, 2023 at 9:57 AM Evgeniy Lyutikov  wrote:

> Hello!
> I constantly get a similar error when operator (working in single
> instance) receiving deployment statuses
> Details described in this message
> https://lists.apache.org/thread/0odcc9pvlpz1x9y2nop9dlmcnp9v1696
> I tried changing versions and allocated resources, as well as the number
> of reconcile threads, but nothing helped
>
> --
> *От:* Tony Chen 
> *Отправлено:* 3 ноября 2023 г. 9:13:51
> *Кому:* user@flink.apache.org
> *Копия:* Nathan Moderwell
> *Тема:* Re: flink-kubernetes-operator cannot handle SPECCHANGE for 100+
> FlinkDeployments concurrently
>
> One of the operator pods logged the following exception before the
> container restarted:
>
> �[m�[33m2023-11-01 14:24:21,260�[m �[36mo.a.f.s.n.i.n.c.AbstractChannel�[m
> �[33m[WARN ] Force-closing a channel whose registration task was not
> accepted by an event loop: [id: 0x1a7718c1]
> java.util.concurrent.RejectedExecutionException: event executor terminated
>
> I did notice that all of our 3 operator pods were reconciling
> FlinkDeployments, and this definitely is an issue. After I churned 2 of the
> pods, there was only 1 pod that was the leader, and this operator pod was
> able to reconcile SPECCHANGES of FlinkDeployments again.
>
> Are there any recommendations on how I can enforce only 1 pod to be the
> leader? For example, would increasing the lease-duration help?
>
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
> 
>
> On Wed, Nov 1, 2023 at 11:16 PM Tony Chen 
> wrote:
>
>> Hi Flink Community,
>>
>> I am currently running flink-kubernetes-operator 1.6-patched (
>> https://github.com/apache/flink-kubernetes-operator/commit/3f0dc2ee5534084bc162e6deaded36e93bb5e384
>> ),
>> and I have 3 flink-kubernetes-operator pods running. Recently, I deployed
>> around 110 new FlinkDeployments, and I had no issues with this initial
>> deployment. However, when I applied changes to all of these 110 new
>> FlinkDeployments concurrently to update their container image, the
>> flink-kubernetes-operator pods seemed to be in conflict with each other
>> constantly.
>>
>> For example, before the SPECCHANGE, FlinkDeployment rh-flinkdeployment-01
>> would be RUNNING (status.jobStatus.state) and STABLE
>> (status.lifecycleState). After the FlinkDeployment spec is updated,
>> rh-flinkdeployment-01 goes through FINISHED (status.jobStatus.state) and
>> UPGRADING (status.jobStatus.state), and then RECONCILING
>> (status.jobStatus.state) and DEPLOYED (status.jobStatus.state). It reaches
>> RUNNING and STABLE again, but then for some reason it goes back to FINISHED
>> and UPGRADING again, and I do notice that the newly created jobmanager pod
>> gets deleted and then recreated. rh-flinkdeployment-01 basically becomes
>> stuck in this loop where it becomes stable and then gets re-deployed by the
>> operator.
>>
>> This doesn't happen to all 110 FlinkDeployments, but it happens to around
>> 30 of them concurrently.
>>
>> I have pasted some logs from one of the operator pods on one of the
>> FlinkDeployments. I have also highlighted messages that seem suspicious to
>> me. I will try to gather more logs and send them tomorrow.
>>
>> For now, to mitigate this, I had to delete all of these FlinkDeployments
>> and run them with the deprecated 

Re: flink-kubernetes-operator cannot handle SPECCHANGE for 100+ FlinkDeployments concurrently

2023-11-03 Thread Evgeniy Lyutikov
Hello!

I constantly get a similar error when operator (working in single instance) 
receiving deployment statuses
Details described in this message 
https://lists.apache.org/thread/0odcc9pvlpz1x9y2nop9dlmcnp9v1696
I tried changing versions and allocated resources, as well as the number of 
reconcile threads, but nothing helped



От: Tony Chen 
Отправлено: 3 ноября 2023 г. 9:13:51
Кому: user@flink.apache.org
Копия: Nathan Moderwell
Тема: Re: flink-kubernetes-operator cannot handle SPECCHANGE for 100+ 
FlinkDeployments concurrently

One of the operator pods logged the following exception before the container 
restarted:

�[m�[33m2023-11-01 14:24:21,260�[m �[36mo.a.f.s.n.i.n.c.AbstractChannel�[m 
�[33m[WARN ] Force-closing a channel whose registration task was not accepted 
by an event loop: [id: 0x1a7718c1]
java.util.concurrent.RejectedExecutionException: event executor terminated

I did notice that all of our 3 operator pods were reconciling FlinkDeployments, 
and this definitely is an issue. After I churned 2 of the pods, there was only 
1 pod that was the leader, and this operator pod was able to reconcile 
SPECCHANGES of FlinkDeployments again.

Are there any recommendations on how I can enforce only 1 pod to be the leader? 
For example, would increasing the lease-duration help?

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/

On Wed, Nov 1, 2023 at 11:16 PM Tony Chen 
mailto:tony.ch...@robinhood.com>> wrote:
Hi Flink Community,

I am currently running flink-kubernetes-operator 1.6-patched 
(https://github.com/apache/flink-kubernetes-operator/commit/3f0dc2ee5534084bc162e6deaded36e93bb5e384),
 and I have 3 flink-kubernetes-operator pods running. Recently, I deployed 
around 110 new FlinkDeployments, and I had no issues with this initial 
deployment. However, when I applied changes to all of these 110 new 
FlinkDeployments concurrently to update their container image, the 
flink-kubernetes-operator pods seemed to be in conflict with each other 
constantly.

For example, before the SPECCHANGE, FlinkDeployment rh-flinkdeployment-01 would 
be RUNNING (status.jobStatus.state) and STABLE (status.lifecycleState). After 
the FlinkDeployment spec is updated, rh-flinkdeployment-01 goes through 
FINISHED (status.jobStatus.state) and UPGRADING (status.jobStatus.state), and 
then RECONCILING (status.jobStatus.state) and DEPLOYED 
(status.jobStatus.state). It reaches RUNNING and STABLE again, but then for 
some reason it goes back to FINISHED and UPGRADING again, and I do notice that 
the newly created jobmanager pod gets deleted and then recreated. 
rh-flinkdeployment-01 basically becomes stuck in this loop where it becomes 
stable and then gets re-deployed by the operator.

This doesn't happen to all 110 FlinkDeployments, but it happens to around 30 of 
them concurrently.

I have pasted some logs from one of the operator pods on one of the 
FlinkDeployments. I have also highlighted messages that seem suspicious to me. 
I will try to gather more logs and send them tomorrow.

For now, to mitigate this, I had to delete all of these FlinkDeployments and 
run them with the deprecated GoogleCloudPlatform operator. I'm hoping to 
resolve this soon so that I don't have to run anything on the 
GoogleCloudPlatform operator anymore.

Thanks!
Tony


�[m�[33m2023-11-02 05:26:02,132�[m �[36mi.j.o.p.e.ReconciliationDispatcher�[m 
�[1;31m[ERROR][/] Error during event processing 
ExecutionScope{ resource id: ResourceID{name='
...
2023-11-02 05:27:25,945 o.a.f.k.o.o.d.ApplicationObserver [WARN 
][/] Running deployment generation -1 doesn't match 
upgrade target generation 2.
2023-11-02 05:27:25,946 o.a.f.c.Configuration  [WARN 
][/] Config uses deprecated configuration key 
'high-availability' instead of proper key 'high-availability.type'
2023-11-02 05:27:26,034 o.a.f.k.o.l.AuditUtils [INFO 
][/] >>> Status | Info| UPGRADING   | The 
resource is being upgraded
2023-11-02 05:27:26,057 o.a.f.k.o.l.AuditUtils [INFO 

Re:疑似BUG: 在滑动窗口中使用reduce()聚合时数据被多次处理

2023-11-03 Thread Xuyang
Hi,
   验证了下,问题疑似出现在reduce函数中,复用了下wordCount1这个对象。我试了下new一个新的WordCount作为输出应该就可以了。
猜测这可能和基于Heap的state backend有关,多个窗口的heap state可能直接使用的是一个对象的地址。


```
.reduce(
(wordCount1, wordCount2) -> {
WordCount newWC =
new WordCount(
wordCount1.word, wordCount1.count + wordCount2.count);
System.err.println(newWC);
return newWC;
})
```

--

Best!
Xuyang





在 2023-11-03 10:53:37,"tao zhang"  写道:
>reduce()方法的状态在窗口间未被隔离,多个窗口聚合时使用的是同一对象.一个数据进入时,被重复累加
>是reduce的特性吗? 还是reduce中的窗口间隔离出现问题? 希望得到回复
>
>测试输入如下:
>1001,/home,1000
>1002,/home,2000
>
>输出如下:
>input> test.Event(user=1001, page=/home, ts=1000)
>input> test.Event(user=1002, page=/home, ts=2000)
>test.WordCount(word=/home, count=2)
>test.WordCount(word=/home, count=3)
>
>代码如下:
>
>import lombok.AllArgsConstructor;
>import lombok.Data;
>import lombok.NoArgsConstructor;
>import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import 
>org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
>import org.apache.flink.streaming.api.windowing.time.Time;
>import java.io.Serializable;
>import java.time.Duration;
>
>public class test {
>public static void main(String[] args) {
>//准备环境
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>
>//从端口读数据
>SingleOutputStreamOperator ds1 = 
> env.socketTextStream("hadoop102", 5).map(
>value->{
>String[] strings = value.split(",");
>return new 
> Event(strings[0].trim(),strings[1].trim(),Long.valueOf(strings[2].trim()) );
>}
>
>).assignTimestampsAndWatermarks(
>//增加水位线策略
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((Event,
>  l) -> Event.getTs())
>);
>//检查输入流
>ds1.print("input");
>
>
>ds1.map(event -> new WordCount(event.getPage(), 1)
>).keyBy(WordCount::getWord
>//按键分组
>).window(
>//TumblingEventTimeWindows.of(Time.seconds(10))
>SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))
>//size为10步长为5的滑动窗口
>).reduce(
>//先增量聚合.将多个数据处理为一个中间结果
>
>(wordCount1, wordCount2) -> {
>
>Integer count = wordCount1.getCount();
>
>wordCount1.setCount(count + 1);
>
>System.out.println(wordCount1);
>
>return wordCount1;
>}
>
>
>);
>
>try {
>env.execute();
>} catch (Exception e) {
>throw new RuntimeException(e);
>}
>}
>
>@Data
>@AllArgsConstructor
>@NoArgsConstructor
>public static class Event {
>private String user;
>private String page;
>private Long ts;
>
>}
>
>@Data
>@AllArgsConstructor
>@NoArgsConstructor
>
>public static class WordCount implements Serializable {
>private String word;
>private Integer count;
>
>}
>
>
>
>}
>