Re: MSI Auth to Azure Storage Account with Flink Apache Operator not working

2023-05-16 Thread Surendra Singh Lilhore
Hi DEROCCO,

Flink uses shaded jars for the Hadoop Azure Storage plugin, so in order to
correct the ClassNotFoundException, you need to adjust the configuration.
Please configure the MSITokenProvider as shown below.

fs.azure.account.oauth.provider.type:
*org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider*


Thanks
Surendra


On Wed, May 17, 2023 at 5:32 AM Ivan Webber via user 
wrote:

> When you create your cluster you probably need to ensure the following
> settings are set. I briefly looked into MSI but ended up using Azure Key
> Vault with CSI-storage driver for initial prototype (
> https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/aks/csi-secrets-store-driver.md#upgrade-an-existing-aks-cluster-with-azure-key-vault-provider-for-secrets-store-csi-driver-support
> ).
>
>
>
> For me it helped to think about it as Hadoop configuration.
>
>
>
> If you do get MSI working I would be interested in hearing what made it
> work for you, so be sure to update the docs or put it on this thread.
>
>
>
> * To create from scratch*
>
> Create an AKS cluster with the required settings.
>
> ```bash
>
> # create an AKS cluster with pod-managed identity and Azure CNI
>
> az aks create --resource-group $RESOURCE_GROUP --name $CLUSTER
> --enable-managed-identity --network-plugin azure --enable-pod-identity
>
> ```
>
>
>
> I hope that is somehow helpful.
>
>
>
> Best of luck,
>
>
>
> Ivan
>
>
>
> *From: *DEROCCO, CHRISTOPHER 
> *Sent: *Monday, May 8, 2023 3:40 PM
> *To: *Shammon FY 
> *Cc: *user@flink.apache.org
> *Subject: *[EXTERNAL] RE: MSI Auth to Azure Storage Account with Flink
> Apache Operator not working
>
>
>
> You don't often get email from cd9...@att.com. Learn why this is important
> 
>
> Shammon,
>
>
>
> I’m still having trouble setting the package in my cluster environment. I 
> have these lines added to my dockerfile
>
> mkdir ./plugins/azure-fs-hadoop
>
> cp ./opt/flink-azure-fs-hadoop-1.16.0.jar ./plugins/azure-fs-hadoop/
>
>
>
> according to the flink docs here (
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/
> )
>
> This should enable the flink-azure-fs-hadoop jar in the environment which
> has the classes to enable the adls2 MSI authentication.
>
> I also have the following dependency in my pom to add it to the FAT Jar.
>
>
>
> 
>
> org.apache.flink
>
> flink-azure-fs-hadoop
>
> ${flink.version}
>
> 
>
>
>
> However, I still get the class not found error and the flink job is not
> able to authenticate to the azure storage account to store its checkpoints.
> I’m not sure what other configuration pieces I’m missing. Has anyone had
> successful with writing checkpoints to Azure ADLS2gen Storage with managed
> service identity (MSI) authentication.?
>
>
>
>
>
>
>
> *From:* Shammon FY 
> *Sent:* Friday, May 5, 2023 8:38 PM
> *To:* DEROCCO, CHRISTOPHER 
> *Cc:* user@flink.apache.org
> *Subject:* Re: MSI Auth to Azure Storage Account with Flink Apache
> Operator not working
>
>
>
> Hi DEROCCO,
>
>
>
> I think you can check the startup command of the job on k8s to see if the
> jar file is in the classpath.
>
>
>
> If your job is DataStream, you need to add hadoop azure dependency in your
> project, and if it is an SQL job, you need to include this jar file in your
> Flink release package. Or you can also add this package in your cluster
> environment.
>
>
>
> Best,
>
> Shammon FY
>
>
>
>
>
> On Fri, May 5, 2023 at 10:21 PM DEROCCO, CHRISTOPHER 
> wrote:
>
> How can I add the package to the flink job or check if it is there?
>
>
>
> *From:* Shammon FY 
> *Sent:* Thursday, May 4, 2023 9:59 PM
> *To:* DEROCCO, CHRISTOPHER 
> *Cc:* user@flink.apache.org
> *Subject:* Re: MSI Auth to Azure Storage Account with Flink Apache
> Operator not working
>
>
>
> Hi DEROCCO,
>
>
>
> I think you need to check whether there is a hadoop-azure jar file in the
> classpath of your flink job. From an error message '*Caused by:
> java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.*', your
> flink job may be missing this package.
>
>
>
> Best,
>
> Shammon FY
>
>
>
>
>
> On Fri, May 5, 2023 at 4:40 AM DEROCCO, CHRISTOPHER 
> wrote:
>
>
>
> I receive the error:  *Caused by: java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.*
>
> I’m using flink 1.16 running in Azure Kubernetes using the Flink Apache
> Kubernetes Operator.
>
> I have the following specified in the spec.flinkConfiguration: as per the
> Apache Kubernetes operator documentation.
>
>
>
> fs.azure.createRemoteFileSystemDuringInitialization: "true"
>
> fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net
> 

table api定义rowtime未生效

2023-05-16 Thread 小昌同学
各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("eventTime").rowtime());
tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = 
tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as 
minTime\n" +"FROM TABLE(CUMULATE(\n" +" TABLE 
midTable1"+//" TABLE "+ midTable +" , 
DESCRIPTOR(eventTime)\n" +" , INTERVAL '60' SECOND\n" + 
   " , INTERVAL '1' DAY))\n" +" GROUP BY 
window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime 
timestamp is not defined. Please make sure that a proper TimestampAssigner is 
defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: 回复:报错显示为bug

2023-05-16 Thread 小昌同学
好滴呀  谢谢各位老师


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年5月16日 08:46 |
| 收件人 |  ,
 |
| 主题 | Re: 回复:报错显示为bug |
Hi,

从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段

Best,
Shammon FY

On Mon, May 15, 2023 at 7:29 PM lxk  wrote:

你好,从报错来看是类型不兼容导致的。
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换

















At 2023-05-15 18:29:15, "小昌同学"  wrote:
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//使用侧输出流
OutputTag requestStream = new
OutputTag("requestStream") {
};
OutputTag answerStream = new
OutputTag("answerStream") {
};

//1、连接测试环境kafka的数据
String servers =
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName =
FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
<315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}

//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator baseInfoStream =
sourceStream.map(new MapFunction() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp,
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});

//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator tagStream =
baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));

//4、根据不同的tag进行不同的输出流设定
DataStream requestDataStream =
tagStream.getSideOutput(requestStream);
DataStream answerDataStream =
tagStream.getSideOutput(answerStream);

requestDataStream.print("requestDataStream");

Re: Query on RestartPipelinedRegionFailoverStrategy

2023-05-16 Thread Shammon FY
Hi Prabhu,

Whether the tasks are in the same region depends on the DistributionPattern
between upstream and downstream Operators. For example, if the
DistributionPattern from A to B is ALL_TO_ALL, all subtasks for A and B
will be in the same range. Otherwise, if the DistributionPattern is
POINTWISE, JobManager will create independent relations between subtasks
and put the related subtasks to one region.

Best,
Shammon FY

On Tue, May 16, 2023 at 6:42 PM Prabhu Joseph 
wrote:

> Yes i expected the same. But all the tasks goes into one region and
> RestartPipelinedRegionFailoverStrategy restarts all of them. I see this
> strategy does not make any difference from RestartAllFailoverStrategy in
> stream execution mode. It could only help in Batch execution mode where
> Blocking result type is used which forms multiple different regions.
>
> I will come up/modify the strategy which returns two different regions and
> validate if it works without any impact.
>
>
> On Tue, 16 May, 2023, 8:09 am weijie guo, 
> wrote:
>
>> Hi Prabhu,
>>
>> If the edge between a -> b -> c -> d -> e all are point-wise, In theory,
>> it should form two regions.
>>
>> Best regards,
>>
>> Weijie
>>
>>
>> Prabhu Joseph  于2023年5月15日周一 09:58写道:
>>
>>> Hi, I am testing the Flink Fine-Grained Recovery
>>> 
>>> from Task Failures on Flink 1.17 and am facing some issues where I need
>>> some advice. Have a jobgraph below with 5 operators, and all connections
>>> between operators are pipelined and the job's parallelism.default is set to
>>> 2. Have configured RestartPipelinedRegionFailoverStrategy with Exponential
>>> Delay Restart Strategy.
>>>
>>> A -> B -> C -> D -> E
>>>
>>> There are a total of 10 tasks running. The first pipeline  (a1 to e1)
>>> runs on a TaskManager (say TM1), and the second pipeline (a2 to e2) runs on
>>> another TaskManager (say TM2).
>>>
>>> a1 -> b1 -> c1 -> d1 -> e1
>>> a2 -> b2 -> c2 -> d2 -> e2
>>>
>>> When TM1 failed, I expected only 5 tasks (a1 to e1) would fail and they
>>> alone would be restarted, but all 10 tasks are getting restarted. There is
>>> only one pipeline region, which consists of all 10 execution vertices, and
>>> RestartPipelinedRegionFailoverStrategy#getTasksNeedingRestart returns all
>>> 10 tasks. Is it the right behaviour, or could there be any issue? Is it
>>> possible to restart only the pipeline of the failed task (a1 to e1) without
>>> restarting other parallel pipelines.
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>


RE: MSI Auth to Azure Storage Account with Flink Apache Operator not working

2023-05-16 Thread Ivan Webber via user
When you create your cluster you probably need to ensure the following settings 
are set. I briefly looked into MSI but ended up using Azure Key Vault with 
CSI-storage driver for initial prototype 
(https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/aks/csi-secrets-store-driver.md#upgrade-an-existing-aks-cluster-with-azure-key-vault-provider-for-secrets-store-csi-driver-support).

For me it helped to think about it as Hadoop configuration.

If you do get MSI working I would be interested in hearing what made it work 
for you, so be sure to update the docs or put it on this thread.

 To create from scratch
Create an AKS cluster with the required settings.
```bash
# create an AKS cluster with pod-managed identity and Azure CNI
az aks create --resource-group $RESOURCE_GROUP --name $CLUSTER 
--enable-managed-identity --network-plugin azure --enable-pod-identity
```

I hope that is somehow helpful.

Best of luck,

Ivan

From: DEROCCO, CHRISTOPHER
Sent: Monday, May 8, 2023 3:40 PM
To: Shammon FY
Cc: user@flink.apache.org
Subject: [EXTERNAL] RE: MSI Auth to Azure Storage Account with Flink Apache 
Operator not working

You don't often get email from cd9...@att.com. Learn why this is 
important

Shammon,



I’m still having trouble setting the package in my cluster environment. I have 
these lines added to my dockerfile

mkdir ./plugins/azure-fs-hadoop

cp ./opt/flink-azure-fs-hadoop-1.16.0.jar ./plugins/azure-fs-hadoop/

according to the flink docs here 
(https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/)
This should enable the flink-azure-fs-hadoop jar in the environment which has 
the classes to enable the adls2 MSI authentication.
I also have the following dependency in my pom to add it to the FAT Jar.


org.apache.flink
flink-azure-fs-hadoop
${flink.version}


However, I still get the class not found error and the flink job is not able to 
authenticate to the azure storage account to store its checkpoints. I’m not 
sure what other configuration pieces I’m missing. Has anyone had successful 
with writing checkpoints to Azure ADLS2gen Storage with managed service 
identity (MSI) authentication.?



From: Shammon FY 
Sent: Friday, May 5, 2023 8:38 PM
To: DEROCCO, CHRISTOPHER 
Cc: user@flink.apache.org
Subject: Re: MSI Auth to Azure Storage Account with Flink Apache Operator not 
working

Hi DEROCCO,

I think you can check the startup command of the job on k8s to see if the jar 
file is in the classpath.

If your job is DataStream, you need to add hadoop azure dependency in your 
project, and if it is an SQL job, you need to include this jar file in your 
Flink release package. Or you can also add this package in your cluster 
environment.

Best,
Shammon FY


On Fri, May 5, 2023 at 10:21 PM DEROCCO, CHRISTOPHER 
mailto:cd9...@att.com>> wrote:
How can I add the package to the flink job or check if it is there?

From: Shammon FY mailto:zjur...@gmail.com>>
Sent: Thursday, May 4, 2023 9:59 PM
To: DEROCCO, CHRISTOPHER mailto:cd9...@att.com>>
Cc: user@flink.apache.org
Subject: Re: MSI Auth to Azure Storage Account with Flink Apache Operator not 
working

Hi DEROCCO,

I think you need to check whether there is a hadoop-azure jar file in the 
classpath of your flink job. From an error message 'Caused by: 
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.', your flink 
job may be missing this package.

Best,
Shammon FY


On Fri, May 5, 2023 at 4:40 AM DEROCCO, CHRISTOPHER 
mailto:cd9...@att.com>> wrote:

I receive the error:  Caused by: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.
I’m using flink 1.16 running in Azure Kubernetes using the Flink Apache 
Kubernetes Operator.
I have the following specified in the spec.flinkConfiguration: as per the 
Apache Kubernetes operator documentation.

fs.azure.createRemoteFileSystemDuringInitialization: "true"

fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net:
 OAuth

fs.azure.account.oauth.provider.type..dfs.core.windows.net:
 org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider
fs.azure.account.oauth2.msi.tenant. 
.dfs.core.windows.net:
 


Re: [DISCUSS] Status of Statefun Project

2023-05-16 Thread Galen Warren via user
Hi Martijn, since you opened this discussion thread, I'm curious what your
thoughts are in light of the responses? Thanks.

On Wed, Apr 19, 2023 at 1:21 PM Galen Warren 
wrote:

> I use Apache Flink for stream processing, and StateFun as a hand-off point
>> for the rest of the application.
>> It serves well as a bridge between a Flink Streaming job and
>> micro-services.
>
>
> This is essentially how I use it as well, and I would also be sad to see
> it sunsetted. It works well; I don't know that there is a lot of new
> development required, but if there are no new Statefun releases, then
> Statefun can only be used with older Flink versions.
>
> On Tue, Apr 18, 2023 at 10:04 PM Marco Villalobos <
> mvillalo...@kineteque.com> wrote:
>
>> I am currently using Stateful Functions in my application.
>>
>> I use Apache Flink for stream processing, and StateFun as a hand-off
>> point for the rest of the application.
>> It serves well as a bridge between a Flink Streaming job and
>> micro-services.
>>
>> I would be disappointed if StateFun was sunsetted.  Its a good idea.
>>
>> If there is anything I can do to help, as a contributor perhaps, please
>> let me know.
>>
>> > On Apr 3, 2023, at 2:02 AM, Martijn Visser 
>> wrote:
>> >
>> > Hi everyone,
>> >
>> > I want to open a discussion on the status of the Statefun Project [1]
>> in Apache Flink. As you might have noticed, there hasn't been much
>> development over the past months in the Statefun repository [2]. There is
>> currently a lack of active contributors and committers who are able to help
>> with the maintenance of the project.
>> >
>> > In order to improve the situation, we need to solve the lack of
>> committers and the lack of contributors.
>> >
>> > On the lack of committers:
>> >
>> > 1. Ideally, there are some of the current Flink committers who have the
>> bandwidth and can help with reviewing PRs and merging them.
>> > 2. If that's not an option, it could be a consideration that current
>> committers only approve and review PRs, that are approved by those who are
>> willing to contribute to Statefun and if the CI passes
>> >
>> > On the lack of contributors:
>> >
>> > 3. Next to having this discussion on the Dev and User mailing list, we
>> can also create a blog with a call for new contributors on the Flink
>> project website, send out some tweets on the Flink / Statefun twitter
>> accounts, post messages on Slack etc. In that message, we would inform how
>> those that are interested in contributing can start and where they could
>> reach out for more information.
>> >
>> > There's also option 4. where a group of interested people would split
>> Statefun from the Flink project and make it a separate top level project
>> under the Apache Flink umbrella (similar as recently has happened with
>> Flink Table Store, which has become Apache Paimon).
>> >
>> > If we see no improvements in the coming period, we should consider
>> sunsetting Statefun and communicate that clearly to the users.
>> >
>> > I'm looking forward to your thoughts.
>> >
>> > Best regards,
>> >
>> > Martijn
>> >
>> > [1] https://nightlies.apache.org/flink/flink-statefun-docs-master/ <
>> https://nightlies.apache.org/flink/flink-statefun-docs-master/>
>> > [2] https://github.com/apache/flink-statefun <
>> https://github.com/apache/flink-statefun>
>>
>


Dynamin Windowing in with Pyflink

2023-05-16 Thread Nawaz Nayeem via user
Hey, I’ve been trying to emulate the behavior of a dynamic window, as Flink
does not support dynamic window sizes. My operator inherits from
KeyedProcessFunction, and I’m only using KeyedStates to manipulate the
window_size. I’m clearing the KeyedStates when my bucket(window) is
complete, to reset the bucket size.

My concern is, as Flink does not support dynamic windows, is this approach
going against Flink Architecture? Like will it break checkpointing
mechanism in distributed systems? It's been noted that I’m only using
KeyedStates for maintaining or implementing the dynamic window.


Any feedback would be appreciated.
Thank you.

-- 









SELISE Group
Zürich: The Circle 37, 8058 Zürich-Airport, 
Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: Building 3, 3rd 
Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas 
Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: Bhutan 
Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit us: 
www.selisegroup.com 




-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to any other person. 
Thank you for your cooperation.*


(无主题)

2023-05-16 Thread 湘晗刚
Hi all
Linux system rngd.service ,entropy_avail is small cause flink task back 
pressure high,how to solve other than restart rngd.service every time


Thanks in advance
Tian

Re: Query on RestartPipelinedRegionFailoverStrategy

2023-05-16 Thread Prabhu Joseph
Yes i expected the same. But all the tasks goes into one region and
RestartPipelinedRegionFailoverStrategy restarts all of them. I see this
strategy does not make any difference from RestartAllFailoverStrategy in
stream execution mode. It could only help in Batch execution mode where
Blocking result type is used which forms multiple different regions.

I will come up/modify the strategy which returns two different regions and
validate if it works without any impact.


On Tue, 16 May, 2023, 8:09 am weijie guo,  wrote:

> Hi Prabhu,
>
> If the edge between a -> b -> c -> d -> e all are point-wise, In theory,
> it should form two regions.
>
> Best regards,
>
> Weijie
>
>
> Prabhu Joseph  于2023年5月15日周一 09:58写道:
>
>> Hi, I am testing the Flink Fine-Grained Recovery
>> 
>> from Task Failures on Flink 1.17 and am facing some issues where I need
>> some advice. Have a jobgraph below with 5 operators, and all connections
>> between operators are pipelined and the job's parallelism.default is set to
>> 2. Have configured RestartPipelinedRegionFailoverStrategy with Exponential
>> Delay Restart Strategy.
>>
>> A -> B -> C -> D -> E
>>
>> There are a total of 10 tasks running. The first pipeline  (a1 to e1)
>> runs on a TaskManager (say TM1), and the second pipeline (a2 to e2) runs on
>> another TaskManager (say TM2).
>>
>> a1 -> b1 -> c1 -> d1 -> e1
>> a2 -> b2 -> c2 -> d2 -> e2
>>
>> When TM1 failed, I expected only 5 tasks (a1 to e1) would fail and they
>> alone would be restarted, but all 10 tasks are getting restarted. There is
>> only one pipeline region, which consists of all 10 execution vertices, and
>> RestartPipelinedRegionFailoverStrategy#getTasksNeedingRestart returns all
>> 10 tasks. Is it the right behaviour, or could there be any issue? Is it
>> possible to restart only the pipeline of the failed task (a1 to e1) without
>> restarting other parallel pipelines.
>>
>> Thanks,
>> Prabhu Joseph
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Best practices for handling exceptions in Apache Flink 1.16.0 and global exception handling

2023-05-16 Thread Sharif Khan via user
I'm currently working on a Flink job using version 1.16.0 of Apache
Flink and I would like to know what are the best practices for
handling exceptions in my application. I'm interested in learning
about recommended approaches for handling exceptions in Flink, and how
to ensure the robustness and reliability of my job.

In particular, I'm looking for information on best practices for
handling exceptions in Apache flink.

In addition, I would also like to know if there is a way to handle
exceptions globally in Flink.
I understand that operator-specific exception handlers are
recommended, but I'm curious if there's a way to handle exceptions
globally to provide a catch-all for unhandled exceptions.

I'm using the Python version of the Flink API and my current version
of the Flink cluster is 1.16.0.

Any guidance or resources on these topics would be greatly
appreciated. Thank you in advance for your help.

-- 









SELISE Group
Zürich: The Circle 37, 8058 Zürich-Airport, 
Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: Building 3, 3rd 
Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas 
Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: Bhutan 
Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit us: 
www.selisegroup.com 




-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to any other person. 
Thank you for your cooperation.*


[no subject]

2023-05-16 Thread Sharif Khan via user
sharif.k...@selise.ch

-- 









SELISE Group
Zürich: The Circle 37, 8058 Zürich-Airport, 
Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: Building 3, 3rd 
Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas 
Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: Bhutan 
Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit us: 
www.selisegroup.com 




-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to any other person. 
Thank you for your cooperation.*