[jira] [Created] (FLINK-31820) Support data source sub-database and sub-table

2023-04-17 Thread xingyuan cheng (Jira)
xingyuan cheng created FLINK-31820:
--

 Summary: Support data source sub-database and sub-table
 Key: FLINK-31820
 URL: https://issues.apache.org/jira/browse/FLINK-31820
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: xingyuan cheng


At present, apache/flink-connector-jdbc does not support sub-database and table 
sub-database. Now three commonly used databases Mysql, Postgres and Oracle 
support sub-database and sub-table

 

Taking oracle as an example, users only need to configure the following format 
to use

 
{code:java}
create table oracle_source (
EMPLOYEE_ID BIGINT,
START_DATE TIMESTAMP,
END_DATE TIMESTAMP,
JOB_ID VARCHAR,
DEPARTMENT_ID VARCHAR
) with (
type = 'oracle',    
url = 
'jdbc:oracle:thin:@//localhost:3306/order_([0-9]{1,}),jdbc:oracle:thin:@//localhost:3306/order_([0-9]{1,})',
   userName = 'userName',
password = 'password',
dbName = 'hr',
tableName = 'job_history',
timeField = 'START_DATE',
startTime = '2007-1-1 00:00:00'
); {code}
In the above code, the dbName attribute corresponds to the schema-name 
attribute in oracle or postgres, and the mysql database needs to manually 
specify the dbName

 

At the same time, I am also developing the CDAS whole database synchronization 
syntax for the company, and the data source supports sub-database and table as 
part of it. Add unit tests. For now, please keep this PR in draft status.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31530) For currently commonly used databases, MySQL and Postgres have implemented catalogs. Currently, catalogs are implemented based on Oracle

2023-03-20 Thread xingyuan cheng (Jira)
xingyuan cheng created FLINK-31530:
--

 Summary: For currently commonly used databases, MySQL and Postgres 
have implemented catalogs. Currently, catalogs are implemented based on Oracle
 Key: FLINK-31530
 URL: https://issues.apache.org/jira/browse/FLINK-31530
 Project: Flink
  Issue Type: Improvement
Reporter: xingyuan cheng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28009) Optimize data split

2022-06-11 Thread xingyuan cheng (Jira)
xingyuan cheng created FLINK-28009:
--

 Summary: Optimize data split
 Key: FLINK-28009
 URL: https://issues.apache.org/jira/browse/FLINK-28009
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.1.0
Reporter: xingyuan cheng


Optimizing split data logic for large data volumes using parallel streams



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-22817) About flink1.13 hive integration Kafka SQL

2021-06-01 Thread xingyuan cheng (Jira)
xingyuan cheng created FLINK-22817:
--

 Summary: About flink1.13 hive integration Kafka SQL 
 Key: FLINK-22817
 URL: https://issues.apache.org/jira/browse/FLINK-22817
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Affects Versions: 1.13.1
 Environment: flink: 1.13.1

flink calcite: 1.26.1

 

kafka-eagle: 2.0.5

kafka-eagle calcite 1.21.0
Reporter: xingyuan cheng
 Attachments: hive-2021-06-01-2.png, hive-2021-06-01-3.png, 
hive-2021-06-01.png

hello, I observe the community’s proposal
Flip-152: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility
We can know that



```

CREATE CATALOG myhive WITH (
'Type' ='hive',
'Default-database' ='default',
'Hive-conf-dir' ='/data/hive/conf/'
);

To

USE CATALOG myhive;

Set table.sql-dialect=hive;

```

When specifying the sql-dialect type, it will follow the SPI from the 
configuration file

The org.apache.flink.table.factories.TableFactory file under 
flink-connector-hive is assigned to HiveParserFactory to parse the 
corresponding grammar, and HiveParser is the corresponding grammar parser

And perform grammatical analysis in HiveParserFactory#create



During the course of investigating kafka-eagle, I found

Among them, KSQL is also based on calcite for grammatical analysis, and can 
support DDL and DML of kafka tables

Test-related classes are in: KSqlParser#TestKSqlParser

And completed the analysis of the corresponding grammar in 
KsqlParser#parseQueryKSql

Does the community have any good suggestions for this proposal?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17304) Kafka two streams cannot use Flink SQL to query inner join

2020-04-21 Thread xingyuan cheng (Jira)
xingyuan cheng created FLINK-17304:
--

 Summary: Kafka two streams cannot use Flink SQL to query inner join
 Key: FLINK-17304
 URL: https://issues.apache.org/jira/browse/FLINK-17304
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Table SQL / API
Affects Versions: 1.9.0
 Environment: flink.version=1.9.0

scala.binary.version=2.11
Reporter: xingyuan cheng


In my work, I found that when subscribing datastream from two different topics 
of Kafka, the operator operations of the two streams can be executed 
respectively, but in the end, I did not query the inner join through Flink SQL 
as expected. What do I need to do to make it work?

TestStreamSQL.java

```

public class TestStreamSQL {
 private static Logger log = LoggerFactory.getLogger(BinlogStreamSQL.class);
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.enableCheckpointing(1000);
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
 env.getCheckpointConfig().setCheckpointTimeout(6);
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setStateBackend(new 
FsStateBackend("hdfs://ido001:8020/user/lwj/flink/checkpoint"));

 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

 StreamQueryConfig queryConfig = new StreamQueryConfig();
 queryConfig.withIdleStateRetentionTime(Time.days(10), Time.days(30));

 Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", 
"ido001.gzcb.com:9092,ido002.gzcb.com:9092,ido003.gzcb.com:9092");
 properties.setProperty("group.id", "flink");
 String topic_1 = "bps-16-r3p3";
 String topic_2 = "bps-16-r3p4";

 DataStreamSource topic1 = env.addSource(new 
FlinkKafkaConsumer010(topic_1, new SimpleStringSchema(), properties));
 SingleOutputStreamOperator> kafkaSource1 = 
topic1.filter(new FilterFunction() {
 @Override
 public boolean filter(String value) throws Exception {
 try {
 BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
 if ("app_case".equals(binLogBean.getTableName())){
 return true;
 }else {
 return false;
 }
 }catch (Exception e){
 log.error("JSON转换失败,str={}", value, e);
 return false;
 }
 }
 }).map(new MapFunction>() {
 @Override
 public Tuple3 map(String s) throws Exception {
 BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
 String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
 String close_time = BinLogUtil.getValueByField(binLogBean, "close_time");
 String approve_result = BinLogUtil.getValueByField(binLogBean, 
"approve_result");
 return new Tuple3(case_id, close_time, approve_result);
 }
 });
 tEnv.registerDataStream("app_case", kafkaSource1, "case_id, close_time, 
approve_result");

 DataStreamSource topic2 = env.addSource(new 
FlinkKafkaConsumer010(topic_2, new SimpleStringSchema(), properties));
 SingleOutputStreamOperator> kafkaSource2 = 
topic2.filter(new FilterFunction() {
 @Override
 public boolean filter(String value) throws Exception {
 try {
 BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class);
 if ("cm_customer".equals(binLogBean.getTableName())){
 return true;
 }else {
 return false;
 }
 }catch (Exception e){
 log.error("JSON转换失败,str={}", value, e);
 return false;
 }
 }
 }).map(new MapFunction>() {
 @Override
 public Tuple2 map(String s) throws Exception {
 BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class);
 String case_id = BinLogUtil.getValueByField(binLogBean, "case_id");
 String idtfno = BinLogUtil.getValueByField(binLogBean, "idtfno");
 return new Tuple2(case_id, idtfno);
 }
 });
 tEnv.registerDataStream("cm_customer", kafkaSource2, "case_id, idtfno");

 Table result = tEnv.sqlQuery("select a.*,b.idtfno " +
 "from app_case a left join cm_customer b on a.case_id = b.case_id " +
 "where a.close_time not in('')");

 tEnv.toRetractStream(result, Row.class, queryConfig).filter(new 
FilterFunction>() {
 @Override
 public boolean filter(Tuple2 booleanRowTuple2) throws Exception {
 return booleanRowTuple2.f0;
 }
 }).print();

 env.execute();
 }
}

```

 

BinLogBean.java

```

public class BinLogBean implements Serializable{
 private String instance;
 private int version;
 private Long serverId;

 private String executeTime;
 private String logfileName;
 private Long logfileOffset;
 /**
 * database name
 */
 private String schemaName;
 private String tableName;
 private String eventType;
 private List columnFieldsList;


 public String getInstance() {
 return instance;
 }

 public void setInstance(String instance) {
 this.instance = instance;
 }

 public List