通过sql client操作flink mysql-cdc异常

2020-09-12 文章 陈帅
flink版本是1.11.1,我将
flink-connector-debezium-1.1.0.jar,
flink-connector-mysql-cdc-1.1.0.jar,
flink-sql-connector-kafka_2.12-1.11.1.jar,
flink-sql-connector-mysql-cdc-1.1.0.jar,
flink-format-changelog-json-1.1.0.jar
下载并加入到 $FLINK_HOME/lib目录,并以embedded模式启动flink sql
client,同时在mysql中插入一张表,然后在flink sql client中创建相应的表,语句如下

Flink SQL>  CREATE TABLE order_info(
id BIGINT,
user_id BIGINT,
create_time TIMESTAMP(0),
operate_time TIMESTAMP(0),
province_id INT,
order_status STRING,
total_amount DECIMAL(10, 5)
  ) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'x,
'database-name' = 'test',
'table-name' = 'order_info'
);

最后在flink sql client中执行查询
Flink SQL>  select * from order_info;

*[ERROR] Could not execute SQL statement. Reason:*

*java.lang.ClassNotFoundException:
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction*


结果报了如上exception,我查了下这个类是属于flink-connector-debezium-1.1.0.jar的,而这个jar我已经放到$FLINK_HOME/lib目录下,并且能够解压看到所报缺失类全路径,这是为什么?要如何修复?

sql client日志如下:

ClassLoader info: URL ClassLoader:
file:
'/var/folders/7n/pfzv54s94w9d9jl578txzx20gn/T/blobStore-ae8ce496-872b-4934-a212-405e34ecfd6f/job_a2634d29a69aa47bfdb0e65b522ff2e8/blob_p-949ecf57aaab045c3136da62fe2e2ab39f502c30-723503d095b849a5d7f3375ef6ddc85f'
(valid JAR)
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:288)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
Caused by: java.lang.ClassNotFoundException:
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_231]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_231]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_231]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_231]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_231]
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
~[?:1.8.0_231]
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
~[?:1.8.0_231]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
~[?:1.8.0_231]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_231]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_231]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_231]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_231]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_231]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_231]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_231]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_231]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_231]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
~[?:1.8.0_231]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at

Re:时间窗口和reduce算子问题

2020-09-12 文章 allanqinjy
你好,
   
reduce在没有开窗口的时候,是一条一条来处理的。因为keyby以后是根据key分组以后的,不开窗口是无限流的形式走的。当开了window窗口以后,你可以理解为一个batch,然后对这一块数据进行了keyby后就会有一条数据了,如果你reduce里面再有个规则,比如按照time进行大小比较,只要最近的那一条重复的,那么最后就是那一条最新的数据了。这个自己也可以做个demo。如下是自己本地的测试,你也可以体验一下。希望可以帮助到你,能力有限,哪儿说的不对请见谅!!
public class ReduceTest {

private static final Logger logger = LoggerFactory.getLogger(ReduceTest.class);

public static void main(String[] args) {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

DataStream source = 
env.readTextFile("/Users/allanqin/myprojects/spend-report/demo/src/main/resources/reduce.txt");

source
.keyBy(new KeySelector() {
@Override
public String getKey(String s) throws Exception {
ObjectMapper mapper = new ObjectMapper();
ReduceEntity reduceEntity = mapper.readValue(s, ReduceEntity.class);
return reduceEntity.getName();
}
}, TypeInformation.of(new TypeHint() {
}))

.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction() {

ReduceEntity entity = new ReduceEntity();

@Override
public String reduce(String v1, String v2) throws Exception {

//ObjectMapper mapper = new ObjectMapper();
//ReduceEntity reduceEntity = mapper.readValue(v1, 
ReduceEntity.class);
//ReduceEntity reduceEntity2 = mapper.readValue(v2, 
ReduceEntity.class);
//if (reduceEntity.getAge() > reduceEntity2.getAge()) {
//return v1;
//}

return v2;

}
})

.print();

env.execute("test");

}
}
txt文件内容:
{ "name" : "allanqinjy", "age" : 4 }
{ "name" : "allanqinjy", "age" : 45 }
{ "name" : "allanqinjy", "age" : 6 }
{ "name" : "allanqinjy", "age" : 9 }










在 2020-09-12 17:57:01,"ゞ野蠻遊戲χ"  写道:

大家好



   
在window算子之后使用reduce算子,是否是把当前窗口的所有元素根据reduce算子计算完成之后,仅仅输出一条到下游,还是当前窗口前后2个元素每次进入reduce算子,计算完成之后就往下游输出一条?







Thanks
嘉治

??????????reduce????????

2020-09-12 文章 ?g???U?[????
??


   
??windowreducereduce??2??reduce







Thanks


请教一下Flink和主流数据湖集成的情况

2020-09-12 文章 dixingxing85
Hi all:
想请教一个问题,现在一些公司已经开始应用数据湖技术了,目前flink和iceberg,hudi,delta 
lake这些的集成情况分别是怎样的?社区有主动跟进某一个数据湖技术的集成吗?预计什么时候能有相对完善的source,sink。谢谢

Sent from my iPhone


Sent from my iPhone