[ https://issues.apache.org/jira/browse/FLINK-17304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu closed FLINK-17304. --------------------------- Resolution: Invalid Close since no response. > Kafka two streams cannot use Flink SQL to query inner join > ---------------------------------------------------------- > > Key: FLINK-17304 > URL: https://issues.apache.org/jira/browse/FLINK-17304 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Table SQL / API > Affects Versions: 1.9.0 > Environment: flink.version=1.9.0 > scala.binary.version=2.11 > Reporter: xingyuan cheng > Priority: Major > > In my work, I found that when subscribing datastream from two different > topics of Kafka, the operator operations of the two streams can be executed > respectively, but in the end, I did not query the inner join through Flink > SQL as expected. What do I need to do to make it work? > TestStreamSQL.java > ``` > public class TestStreamSQL { > private static Logger log = LoggerFactory.getLogger(BinlogStreamSQL.class); > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > env.getCheckpointConfig().setCheckpointTimeout(60000); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > // env.setStateBackend(new > FsStateBackend("hdfs://ido001:8020/user/lwj/flink/checkpoint")); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > StreamQueryConfig queryConfig = new StreamQueryConfig(); > queryConfig.withIdleStateRetentionTime(Time.days(10), Time.days(30)); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", > "ido001.gzcb.com:9092,ido002.gzcb.com:9092,ido003.gzcb.com:9092"); > properties.setProperty("group.id", "flink"); > String topic_1 = "bps-16-r3p3"; > String topic_2 = "bps-16-r3p4"; > DataStreamSource<String> topic1 = env.addSource(new > FlinkKafkaConsumer010<String>(topic_1, new SimpleStringSchema(), properties)); > SingleOutputStreamOperator<Tuple3<String, String, String>> kafkaSource1 = > topic1.filter(new FilterFunction<String>() { > @Override > public boolean filter(String value) throws Exception { > try { > BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class); > if ("app_case".equals(binLogBean.getTableName())){ > return true; > }else { > return false; > } > }catch (Exception e){ > log.error("JSON转换失败,str={}", value, e); > return false; > } > } > }).map(new MapFunction<String, Tuple3<String, String, String>>() { > @Override > public Tuple3<String, String, String> map(String s) throws Exception { > BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class); > String case_id = BinLogUtil.getValueByField(binLogBean, "case_id"); > String close_time = BinLogUtil.getValueByField(binLogBean, "close_time"); > String approve_result = BinLogUtil.getValueByField(binLogBean, > "approve_result"); > return new Tuple3<String, String, String>(case_id, close_time, > approve_result); > } > }); > tEnv.registerDataStream("app_case", kafkaSource1, "case_id, close_time, > approve_result"); > DataStreamSource<String> topic2 = env.addSource(new > FlinkKafkaConsumer010<String>(topic_2, new SimpleStringSchema(), properties)); > SingleOutputStreamOperator<Tuple2<String, String>> kafkaSource2 = > topic2.filter(new FilterFunction<String>() { > @Override > public boolean filter(String value) throws Exception { > try { > BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class); > if ("cm_customer".equals(binLogBean.getTableName())){ > return true; > }else { > return false; > } > }catch (Exception e){ > log.error("JSON转换失败,str={}", value, e); > return false; > } > } > }).map(new MapFunction<String, Tuple2<String, String>>() { > @Override > public Tuple2<String, String> map(String s) throws Exception { > BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class); > String case_id = BinLogUtil.getValueByField(binLogBean, "case_id"); > String idtfno = BinLogUtil.getValueByField(binLogBean, "idtfno"); > return new Tuple2<String, String>(case_id, idtfno); > } > }); > tEnv.registerDataStream("cm_customer", kafkaSource2, "case_id, idtfno"); > Table result = tEnv.sqlQuery("select a.*,b.idtfno " + > "from app_case a left join cm_customer b on a.case_id = b.case_id " + > "where a.close_time not in('')"); > tEnv.toRetractStream(result, Row.class, queryConfig).filter(new > FilterFunction<Tuple2<Boolean, Row>>() { > @Override > public boolean filter(Tuple2<Boolean, Row> booleanRowTuple2) throws > Exception { > return booleanRowTuple2.f0; > } > }).print(); > env.execute(); > } > } > ``` > > BinLogBean.java > ``` > public class BinLogBean implements Serializable{ > private String instance; > private int version; > private Long serverId; > private String executeTime; > private String logfileName; > private Long logfileOffset; > /** > * database name > */ > private String schemaName; > private String tableName; > private String eventType; > private List<ColumnField> columnFieldsList; > public String getInstance() { > return instance; > } > public void setInstance(String instance) { > this.instance = instance; > } > public List<ColumnField> getColumnFieldsList() { > return columnFieldsList; > } > public void setColumnFieldsList(List<ColumnField> columnFieldsList) { > this.columnFieldsList = columnFieldsList; > } > public int getVersion() { > return version; > } > public void setVersion(int version) { > this.version = version; > } > public Long getServerId() { > return serverId; > } > public void setServerId(Long serverId) { > this.serverId = serverId; > } > public String getExecuteTime() { > return executeTime; > } > public void setExecuteTime(String executeTime) { > this.executeTime = executeTime; > } > public String getLogfileName() { > return logfileName; > } > public void setLogfileName(String logfileName) { > this.logfileName = logfileName; > } > public Long getLogfileOffset() { > return logfileOffset; > } > public void setLogfileOffset(Long logfileOffset) { > this.logfileOffset = logfileOffset; > } > public String getSchemaName() { > return schemaName; > } > public void setSchemaName(String schemaName) { > this.schemaName = schemaName; > } > public String getTableName() { > return tableName; > } > public void setTableName(String tableName) { > this.tableName = tableName; > } > public String getEventType() { > return eventType; > } > public void setEventType(String eventType) { > this.eventType = eventType; > } > } > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)