xingyuan cheng created FLINK-17304:
--
Summary: Kafka two streams cannot use Flink SQL to query inner join
Key: FLINK-17304
URL: https://issues.apache.org/jira/browse/FLINK-17304
Project: Flink
Issue Type: Bug
Components: API / DataStream, Table SQL / API
Affects Versions: 1.9.0
Environment: flink.version=1.9.0
scala.binary.version=2.11
Reporter: xingyuan cheng
In my work, I found that when subscribing datastream from two different topics
of Kafka, the operator operations of the two streams can be executed
respectively, but in the end, I did not query the inner join through Flink SQL
as expected. What do I need to do to make it work?
TestStreamSQL.java
```
public class TestStreamSQL {
private static Logger log = LoggerFactory.getLogger(BinlogStreamSQL.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setStateBackend(new
FsStateBackend("hdfs://ido001:8020/user/lwj/flink/checkpoint"));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
StreamQueryConfig queryConfig = new StreamQueryConfig();
queryConfig.withIdleStateRetentionTime(Time.days(10), Time.days(30));
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
"ido001.gzcb.com:9092,ido002.gzcb.com:9092,ido003.gzcb.com:9092");
properties.setProperty("group.id", "flink");
String topic_1 = "bps-16-r3p3";
String topic_2 = "bps-16-r3p4";
DataStreamSource topic1 = env.addSource(new
FlinkKafkaConsumer010(topic_1, new SimpleStringSchema(), properties));
SingleOutputStreamOperator> kafkaSource1 =
topic1.filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
try {
BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
if ("app_case".equals(binLogBean.getTableName())){
return true;
}else {
return false;
}
}catch (Exception e){
log.error("JSON转换失败,str={}", value, e);
return false;
}
}
}).map(new MapFunction>() {
@Override
public Tuple3 map(String s) throws Exception {
BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
String close_time = BinLogUtil.getValueByField(binLogBean, "close_time");
String approve_result = BinLogUtil.getValueByField(binLogBean,
"approve_result");
return new Tuple3(case_id, close_time, approve_result);
}
});
tEnv.registerDataStream("app_case", kafkaSource1, "case_id, close_time,
approve_result");
DataStreamSource topic2 = env.addSource(new
FlinkKafkaConsumer010(topic_2, new SimpleStringSchema(), properties));
SingleOutputStreamOperator> kafkaSource2 =
topic2.filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
try {
BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
if ("cm_customer".equals(binLogBean.getTableName())){
return true;
}else {
return false;
}
}catch (Exception e){
log.error("JSON转换失败,str={}", value, e);
return false;
}
}
}).map(new MapFunction>() {
@Override
public Tuple2 map(String s) throws Exception {
BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
String idtfno = BinLogUtil.getValueByField(binLogBean, "idtfno");
return new Tuple2(case_id, idtfno);
}
});
tEnv.registerDataStream("cm_customer", kafkaSource2, "case_id, idtfno");
Table result = tEnv.sqlQuery("select a.*,b.idtfno " +
"from app_case a left join cm_customer b on a.case_id = b.case_id " +
"where a.close_time not in('')");
tEnv.toRetractStream(result, Row.class, queryConfig).filter(new
FilterFunction>() {
@Override
public boolean filter(Tuple2 booleanRowTuple2) throws Exception {
return booleanRowTuple2.f0;
}
}).print();
env.execute();
}
}
```
BinLogBean.java
```
public class BinLogBean implements Serializable{
private String instance;
private int version;
private Long serverId;
private String executeTime;
private String logfileName;
private Long logfileOffset;
/**
* database name
*/
private String schemaName;
private String tableName;
private String eventType;
private List columnFieldsList;
public String getInstance() {
return instance;
}
public void setInstance(String instance) {
this.instance = instance;
}
public List