Re: 回复:报错显示为bug

2023-05-15 文章 Shammon FY
Hi,

从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段

Best,
Shammon FY

On Mon, May 15, 2023 at 7:29 PM lxk  wrote:

> 你好,从报错来看是类型不兼容导致的。
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
> 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
> 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-05-15 18:29:15, "小昌同学"  wrote:
> >|
> >package job;
> >import bean.BaseInfo;
> >import bean.MidInfo;
> >import bean.OutInfo;
> >import bean.ResultInfo;
> >import com.alibaba.fastjson.JSON;
> >import com.alibaba.fastjson.JSONObject;
> >import config.FlinkConfig;
> >import function.MyProcessFunction;
> >import org.apache.flink.api.common.functions.MapFunction;
> >import org.apache.flink.api.common.serialization.SimpleStringSchema;
> >import org.apache.flink.api.java.tuple.Tuple2;
> >import org.apache.flink.streaming.api.TimeCharacteristic;
> >import org.apache.flink.streaming.api.datastream.DataStream;
> >import org.apache.flink.streaming.api.datastream.DataStreamSource;
> >import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> >import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> >import org.apache.flink.table.api.DataTypes;
> >import org.apache.flink.table.api.Schema;
> >import org.apache.flink.table.api.Table;
> >import org.apache.flink.table.api.TableSchema;
> >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> >import org.apache.flink.table.types.DataType;
> >import org.apache.flink.util.OutputTag;
> >import sink.Sink2Mysql;
> >import utils.DateUtil;
> >import utils.DateUtils;
> >import utils.JdbcUtil;
> >
> >import java.sql.Connection;
> >import java.sql.PreparedStatement;
> >import java.sql.ResultSet;
> >import java.time.*;
> >import java.util.Date;
> >import java.util.HashMap;
> >import java.util.Properties;
> >
> >public class RytLogAnly4 {
> >public static void main(String[] args) throws Exception {
> >StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >//使用侧输出流
> >OutputTag requestStream = new
> OutputTag("requestStream") {
> >};
> >OutputTag answerStream = new
> OutputTag("answerStream") {
> >};
> >
> >//1、连接测试环境kafka的数据
> >String servers =
> FlinkConfig.config.getProperty("dev_bootstrap.servers");
> >String topicName =
> FlinkConfig.config.getProperty("dev_topicName");
> >String groupId = FlinkConfig.config.getProperty("dev_groupId");
> >String devMode = FlinkConfig.config.getProperty("dev_mode");
> >Properties prop = new Properties();
> >prop.setProperty("bootstrap.servers", servers);
> >prop.setProperty("group.id", groupId);
> >prop.setProperty("auto.offset.reset", devMode);
> >DataStreamSource sourceStream = env.addSource(new
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
> >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
> <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
> >
> >//2、对源数据进行处理,生成baseInfo基类的数据
> >SingleOutputStreamOperator baseInfoStream =
> sourceStream.map(new MapFunction() {
> >@Override
> >public BaseInfo map(String value) throws Exception {
> >JSONObject jsonObject = JSON.parseObject(value);
> >//获取到不同的服务器IP
> >String serverIp = jsonObject.getString("ip");
> >//获取到不同的data的数据
> >String datas = jsonObject.getString("data");
> >
> >String[] splits = datas.split("\n");
> >HashMap dataMap = new HashMap<>();
> >//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
> >String time = splits[0].substring(7, 19);
> >//将subData填充到自定义类型中,用来判断时请求还是应答
> >String subData = datas.substring(0, 10);
> >for (int i = 0; i < splits.length; i++) {
> >if (splits[i].contains("=")) {
> >splits[i] = splits[i].replaceFirst("=", "&");
> >String[] temp = splits[i].split("&");
> >if (temp.length > 1) {
> >dataMap.put(temp[0].toLowerCase(), temp[1]);
> >}
> >}
> >

Re:Re: Flink提交作业是否可以跳过上传作业jar包这一步?

2023-05-15 文章 casel.chen
Application Mode没有这个问题,现在是Session Mode提交作业会遇到这个问题
./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar




后面这个作业TopSpeedWindowing.jar包可以使用hdfs/oss路径指定吗?如果是分布式文件路径的话是不是就不用上传作业jar包到jobManager了,而是由jobManager自行下载?





在 2023-05-15 19:27:21,"shimin huang"  写道:
>可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java
> 
>
>> 2023年5月15日 19:21,casel.chen  写道:
>> 
>> 我们开发了一个实时计算平台提交flink 
>> sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
>>  
>> jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink
>>  sql作业提交的速度。
>


Re:回复:报错显示为bug

2023-05-15 文章 lxk
你好,从报错来看是类型不兼容导致的。
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: 
Cannot cast "java.lang.String" to "java.time.LocalDateTime"  
可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换

















At 2023-05-15 18:29:15, "小昌同学"  wrote:
>|
>package job;
>import bean.BaseInfo;
>import bean.MidInfo;
>import bean.OutInfo;
>import bean.ResultInfo;
>import com.alibaba.fastjson.JSON;
>import com.alibaba.fastjson.JSONObject;
>import config.FlinkConfig;
>import function.MyProcessFunction;
>import org.apache.flink.api.common.functions.MapFunction;
>import org.apache.flink.api.common.serialization.SimpleStringSchema;
>import org.apache.flink.api.java.tuple.Tuple2;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import org.apache.flink.streaming.api.datastream.DataStream;
>import org.apache.flink.streaming.api.datastream.DataStreamSource;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>import org.apache.flink.table.api.DataTypes;
>import org.apache.flink.table.api.Schema;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.TableSchema;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.types.DataType;
>import org.apache.flink.util.OutputTag;
>import sink.Sink2Mysql;
>import utils.DateUtil;
>import utils.DateUtils;
>import utils.JdbcUtil;
>
>import java.sql.Connection;
>import java.sql.PreparedStatement;
>import java.sql.ResultSet;
>import java.time.*;
>import java.util.Date;
>import java.util.HashMap;
>import java.util.Properties;
>
>public class RytLogAnly4 {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>//使用侧输出流
>OutputTag requestStream = new 
> OutputTag("requestStream") {
>};
>OutputTag answerStream = new 
> OutputTag("answerStream") {
>};
>
>//1、连接测试环境kafka的数据
>String servers = 
> FlinkConfig.config.getProperty("dev_bootstrap.servers");
>String topicName = FlinkConfig.config.getProperty("dev_topicName");
>String groupId = FlinkConfig.config.getProperty("dev_groupId");
>String devMode = FlinkConfig.config.getProperty("dev_mode");
>Properties prop = new Properties();
>prop.setProperty("bootstrap.servers", servers);
>prop.setProperty("group.id", groupId);
>prop.setProperty("auto.offset.reset", devMode);
>DataStreamSource sourceStream = env.addSource(new 
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
>//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- 
><315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
>
>//2、对源数据进行处理,生成baseInfo基类的数据
>SingleOutputStreamOperator baseInfoStream = 
> sourceStream.map(new MapFunction() {
>@Override
>public BaseInfo map(String value) throws Exception {
>JSONObject jsonObject = JSON.parseObject(value);
>//获取到不同的服务器IP
>String serverIp = jsonObject.getString("ip");
>//获取到不同的data的数据
>String datas = jsonObject.getString("data");
>
>String[] splits = datas.split("\n");
>HashMap dataMap = new HashMap<>();
>//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
>String time = splits[0].substring(7, 19);
>//将subData填充到自定义类型中,用来判断时请求还是应答
>String subData = datas.substring(0, 10);
>for (int i = 0; i < splits.length; i++) {
>if (splits[i].contains("=")) {
>splits[i] = splits[i].replaceFirst("=", "&");
>String[] temp = splits[i].split("&");
>if (temp.length > 1) {
>dataMap.put(temp[0].toLowerCase(), temp[1]);
>}
>}
>}
>return new BaseInfo(dataMap.get("action"), serverIp, 
>DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
>}
>});
>
>//3、使用process方法进行baseInfoStream流切割
>SingleOutputStreamOperator tagStream = 
> baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));
>
>//4、根据不同的tag进行不同的输出流设定
>   

Re: Flink提交作业是否可以跳过上传作业jar包这一步?

2023-05-15 文章 shimin huang
可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java
 

> 2023年5月15日 19:21,casel.chen  写道:
> 
> 我们开发了一个实时计算平台提交flink 
> sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
>  
> jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink
>  sql作业提交的速度。



Flink提交作业是否可以跳过上传作业jar包这一步?

2023-05-15 文章 casel.chen
我们开发了一个实时计算平台提交flink 
sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
 
jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink
 sql作业提交的速度。

StreamTable Environment initialized failed -- "Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath"

2023-05-15 文章 krislee

Hi  ALL,

OS:   CentOS 7.9

Flink version:  1.16.0


It looks like  I'm hitting a  notorious exception which had been 
discoverd since earlier fink version.  The issue was triggered


when below java code executed:

   StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

More detailed trace is as below :

Exception in thread "main" org.apache.flink.table.api.TableException: Could not 
instantiate the executor. Make sure a planner module is on the classpath
at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:109)
at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:101)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94)
at 
com.sugon.cloud.paas.flink.cdc.FlinkCDC_mysql2doris_example.main(FlinkCDC_mysql2doris_example.java:63)

Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in 
the classpath.
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:533)
at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:106)
... 4 more



What I've done:
1) Added missed dependencies in "pom.xml",  for example:



org.apache.flink

flink-table-api-java-uber

1.16.1

provided






   

   org.apache.flink

   flink-table-planner_${scala.binary.version}

   ${flink.version}

   provided




2)Tried two methods to run application, got same error(see above)
  
   mvn exec:java -Dexec.mainClass="xxx"


   java -jar target/xxx.jar


   I'm confused by the error because all necessary jar files does exist in 
Maven's local repository
or FLINK_HOME's lib dir.


The completed "pom.xml" is included in attachment.


Thanks,
Leo




http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;>
  4.0.0

  com.mycompany.cloud.bigdata.flink
  flink-cdc-doris-example
  1.0-SNAPSHOT
  jar

  
UTF-8
1.8
1.8
1.16.0
2.3.0
2.12

  

  


  org.apache.flink
  flink-java
  ${flink.version}


  org.apache.flink
  flink-streaming-java
  ${flink.version}


  org.apache.flink
  flink-table-api-java-bridge
  ${flink.version}




  com.ververica
  flink-connector-mysql-cdc
  ${flink.connector.version}




  
  org.apache.flink
  flink-table-planner_${scala.binary.version}
  ${flink.version}
  provided




  org.apache.flink
  flink-table-api-java-uber
  ${flink.version}
  provided

  

  

  
org.apache.maven.plugins
maven-compiler-plugin
3.8.1

  ${maven.compiler.source}
  ${maven.compiler.target}

  
  
org.apache.maven.plugins
maven-shade-plugin
3.2.4

  
package

  shade


  



  com.mycompany.cloud.bigdata.flink.cdc.FlinkCDC_mysql2doris_example

  
  

  org.apache.flink:force-shading

  
  
 
*:*

  META-INF/*.SF
  META-INF/*.DSA
  META-INF/*.RSA

 
  

  

  

  
  


回复:报错显示为bug

2023-05-15 文章 小昌同学
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//使用侧输出流
OutputTag requestStream = new 
OutputTag("requestStream") {
};
OutputTag answerStream = new 
OutputTag("answerStream") {
};

//1、连接测试环境kafka的数据
String servers = 
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- 
<315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}

//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator baseInfoStream = 
sourceStream.map(new MapFunction() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});

//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator tagStream = 
baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));

//4、根据不同的tag进行不同的输出流设定
DataStream requestDataStream = 
tagStream.getSideOutput(requestStream);
DataStream answerDataStream = 
tagStream.getSideOutput(answerStream);

requestDataStream.print("requestDataStream");
answerDataStream.print("answerDataStream");

//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
//5.1 先对请求流进行处理

Re:报错显示为bug

2023-05-15 文章 lxk
你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。

















在 2023-05-15 17:11:42,"小昌同学"  写道:
>各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: 
>org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
>compiled. This is a bug. Please file an issue. “
>flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作
>
>
>| |
>小昌同学
>|
>|
>ccc0606fight...@163.com
>|


报错显示为bug

2023-05-15 文章 小昌同学
各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue. “
flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作


| |
小昌同学
|
|
ccc0606fight...@163.com
|

Re:Re: Re: Flink广播流状态清理策略不生效

2023-05-15 文章 lxk



好的,感谢














在 2023-05-15 15:49:12,"Hangxiang Yu"  写道:
>Hi, 可以参考这个 Ticket ,就是讨论要给 Broadcast State 加 TTL 的,当时应该没有继续深入讨论:
>https://issues.apache.org/jira/browse/FLINK-13721
>方便的话你可以在 Ticket 下面也分享下你的使用场景、观察到的现象吗?也可以在 Ticket 下 Vote for this issue.
>我这边也会帮忙一起看下
>
>On Mon, May 15, 2023 at 1:41 PM lxk  wrote:
>
>> 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。
>> 或者使用广播流的时候有没有什么能够手动清理状态的方法?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-05-15 11:28:54,"Hangxiang Yu"  写道:
>> >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
>> ><
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl
>> >对
>> >State TTL 的描述;
>> >
>> >On Mon, May 15, 2023 at 11:05 AM lxk  wrote:
>> >
>> >> flink版本:1.14
>> >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
>> >> 在主程序中,我设置了状态过期策略:
>> >>SingleOutputStreamOperator baiduStream =
>> >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
>> >> AdvertiseClick.class)).name("BaiDuAdClick");
>> >> MapStateDescriptor baiduInfoMap = new
>> >> MapStateDescriptor<>("advertiseInfo", String.class,
>> AdvertiseClick.class);
>> >> StateTtlConfig ttlConfig = StateTtlConfig
>> >> .newBuilder(Time.days(7))
>> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> >> .cleanupFullSnapshot()
>> >> .cleanupIncrementally(200, true)
>> >> .build();
>> >> baiduInfoMap.enableTimeToLive(ttlConfig);
>> >> 在BroadcastProcessFunction中,我也设置了状态清除策略:
>> >> public void open(Configuration parameters) throws Exception {
>> >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
>> >> baiduInfoDesc = new MapStateDescriptor> >> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
>> >> StateTtlConfig ttlConfig = StateTtlConfig
>> >> .newBuilder(Time.days(7))
>> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> >> .cleanupFullSnapshot()
>> >> .cleanupIncrementally(200, true)
>> >> .build();
>> >> baiduInfoDesc.enableTimeToLive(ttlConfig);
>> >>
>> >> }
>> >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
>> >>
>> >>
>> >> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg
>> >>
>> >>
>> >>
>> >>
>> >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。
>> >
>> >
>> >
>> >--
>> >Best,
>> >Hangxiang.
>>
>
>
>-- 
>Best,
>Hangxiang.


Re: Re: Flink广播流状态清理策略不生效

2023-05-15 文章 Hangxiang Yu
Hi, 可以参考这个 Ticket ,就是讨论要给 Broadcast State 加 TTL 的,当时应该没有继续深入讨论:
https://issues.apache.org/jira/browse/FLINK-13721
方便的话你可以在 Ticket 下面也分享下你的使用场景、观察到的现象吗?也可以在 Ticket 下 Vote for this issue.
我这边也会帮忙一起看下

On Mon, May 15, 2023 at 1:41 PM lxk  wrote:

> 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。
> 或者使用广播流的时候有没有什么能够手动清理状态的方法?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-05-15 11:28:54,"Hangxiang Yu"  写道:
> >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
> ><
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl
> >对
> >State TTL 的描述;
> >
> >On Mon, May 15, 2023 at 11:05 AM lxk  wrote:
> >
> >> flink版本:1.14
> >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
> >> 在主程序中,我设置了状态过期策略:
> >>SingleOutputStreamOperator baiduStream =
> >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
> >> AdvertiseClick.class)).name("BaiDuAdClick");
> >> MapStateDescriptor baiduInfoMap = new
> >> MapStateDescriptor<>("advertiseInfo", String.class,
> AdvertiseClick.class);
> >> StateTtlConfig ttlConfig = StateTtlConfig
> >> .newBuilder(Time.days(7))
> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >> .cleanupFullSnapshot()
> >> .cleanupIncrementally(200, true)
> >> .build();
> >> baiduInfoMap.enableTimeToLive(ttlConfig);
> >> 在BroadcastProcessFunction中,我也设置了状态清除策略:
> >> public void open(Configuration parameters) throws Exception {
> >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
> >> baiduInfoDesc = new MapStateDescriptor >> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
> >> StateTtlConfig ttlConfig = StateTtlConfig
> >> .newBuilder(Time.days(7))
> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >> .cleanupFullSnapshot()
> >> .cleanupIncrementally(200, true)
> >> .build();
> >> baiduInfoDesc.enableTimeToLive(ttlConfig);
> >>
> >> }
> >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
> >>
> >>
> >> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg
> >>
> >>
> >>
> >>
> >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。
> >
> >
> >
> >--
> >Best,
> >Hangxiang.
>


-- 
Best,
Hangxiang.