Re:Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-28 Thread
CDC是自己编译的2.3,对应flink1.14的版本,还有一个问题是,可以读到变更数据。比如11点30写入mysql,flinkcdc读出来要慢几分钟,5~7分钟之后才能读到新写入或者变更的数据,第二个问题就行,变更数据插不到另外一张mysql表里

















在 2022-11-07 10:11:56,"Shengkai Fang"  写道:
>你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。
>
>Best,
>Shengkai
>
>左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> .print(); 去掉也不行,
>>  
>> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-11-04 16:52:08,"yinghua...@163.com"  写道:
>>
>> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>> >StatementSet statementSet = tenv.createStatementSet();
>> >statementSet.addInsertSql(sql1);
>> >statementSet.addInsertSql(sql2);
>> >TableResult result = statementSet.execute();
>> >result.getJobClient().get().getJobID().toString();
>> >
>> >
>> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>>  这个任务给去掉
>> >
>> >
>> >
>> >yinghua...@163.com
>> >
>> >发件人: 左岩
>> >发送时间: 2022-11-04 14:34
>> >收件人: user-zh
>> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>> >代码如下:控制台打印情况见附件
>> >public static void main(String[] args) throws Exception {
>> >Configuration conf = new Configuration();
>> >conf.setInteger("rest.port", 10041);
>> >StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> >
>> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
>> >
>> >env.setParallelism(1);
>> >// 建表
>> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>> >"  userid INT, " +
>> >"  username string, " +
>> >"  age string, " +
>> >"  `partition` INT, " +
>> >" PRIMARY KEY(userid) NOT ENFORCED " +
>> >" ) WITH ( " +
>> >" 'connector' = 'mysql-cdc', " +
>> >" 'server-id' = '5401-5404', " +
>> >" 'scan.startup.mode' = 'latest-offset', " +
>> >//" 'scan.startup.mode' = 'earliest-offset', " +
>> >" 'hostname' = '192.168.0.220', " +
>> >" 'port' = '3306', " +
>> >" 'username' = 'root', " +
>> >" 'password' = 'root', " +
>> >" 'database-name' = 'zy', " +
>> >" 'table-name' = 't_stu' " +
>> >")");
>> >
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>> >
>> >
>> >// 建一个目标表,用来存放查询结果
>> >tenv.executeSql(
>> >"CREATE TABLE flink_t_stu2 ( " +
>> >"  userid INT, " +
>> >"  username string, " +
>> >"  age string, " +
>> >"  `partition` INT, " +
>> >" PRIMARY KEY(userid) NOT ENFORCED " +
>> >" ) WITH ( " +
>> >"  'connector' = 'jdbc', " +
>> >"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>> >"  'table-name' = 't_stu2', " +
>> >"  'username' = 'root', " +
>> >"  'password' = 'root'  " +
>> >")"
>> >);
>> >
>> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
>> >"SELECT * FROM flink_t_stu");
>> >env.execute();
>> >
>> >}
>>


Re:回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-04 Thread









.print(); 去掉也不行,   
跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢








在 2022-11-04 16:52:08,"yinghua...@163.com"  写道:
>你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>StatementSet statementSet = tenv.createStatementSet();
>statementSet.addInsertSql(sql1);
>statementSet.addInsertSql(sql2);
>TableResult result = statementSet.execute();
>result.getJobClient().get().getJobID().toString();
>
>
>或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();   
>--------这个任务给去掉
>
>
>
>yinghua...@163.com
> 
>发件人: 左岩
>发送时间: 2022-11-04 14:34
>收件人: user-zh
>主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>代码如下:控制台打印情况见附件
>public static void main(String[] args) throws Exception {
>Configuration conf = new Configuration();
>conf.setInteger("rest.port", 10041);
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment(conf);
>StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> 
>env.setParallelism(1);
>// 建表
>tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>"  userid INT, " +
>"  username string, " +
>"  age string, " +
>"  `partition` INT, " +
>" PRIMARY KEY(userid) NOT ENFORCED " +
>" ) WITH ( " +
>" 'connector' = 'mysql-cdc', " +
>" 'server-id' = '5401-5404', " +
>" 'scan.startup.mode' = 'latest-offset', " +
>//" 'scan.startup.mode' = 'earliest-offset', " +
>" 'hostname' = '192.168.0.220', " +
>" 'port' = '3306', " +
>" 'username' = 'root', " +
>" 'password' = 'root', " +
>" 'database-name' = 'zy', " +
>" 'table-name' = 't_stu' " +
>")");
> 
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();
> 
> 
>// 建一个目标表,用来存放查询结果
>tenv.executeSql(
>"CREATE TABLE flink_t_stu2 ( " +
>"  userid INT, " +
>"  username string, " +
>"  age string, " +
>"  `partition` INT, " +
>" PRIMARY KEY(userid) NOT ENFORCED " +
>" ) WITH ( " +
>"  'connector' = 'jdbc', " +
>"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>"  'table-name' = 't_stu2', " +
>"  'username' = 'root', " +
>"  'password' = 'root'  " +
>")"
>);
> 
>tenv.executeSql("INSERT INTO flink_t_stu2 " +
>"SELECT * FROM flink_t_stu");
>env.execute();
> 
>}


FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-04 Thread
用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
代码如下:控制台打印情况见附件
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");

env.setParallelism(1);
// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
"  userid INT, " +
"  username string, " +
"  age string, " +
"  `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'server-id' = '5401-5404', " +
" 'scan.startup.mode' = 'latest-offset', " +
//" 'scan.startup.mode' = 'earliest-offset', " +
" 'hostname' = '192.168.0.220', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'database-name' = 'zy', " +
" 'table-name' = 't_stu' " +
")");

// 查询
tenv.executeSql("select * from flink_t_stu").print();


// 建一个目标表,用来存放查询结果
tenv.executeSql(
"CREATE TABLE flink_t_stu2 ( " +
"  userid INT, " +
"  username string, " +
"  age string, " +
"  `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
"  'connector' = 'jdbc', " +
"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
"  'table-name' = 't_stu2', " +
"  'username' = 'root', " +
"  'password' = 'root'  " +
")"
);

tenv.executeSql("INSERT INTO flink_t_stu2 " +
"SELECT * FROM flink_t_stu");
env.execute();

}

flinkcdc 读不到mysql中数据

2022-11-02 Thread
我用的是flink1.14 
,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");

//StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

env.setParallelism(4);

// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
"  userid INT, " +
"  username string, " +
"  age string, " +
"  `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'server-id' = '5401-5404', " +
" 'hostname' = '192.168.0.220', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'database-name' = 'zy', " +
" 'table-name' = 't_stu' " +
")");

// 查询
tenv.executeSql("select * from flink_t_stu").print();

env.execute();

}2022-11-03 11:00:07   INFO 
(com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils:lambda$listTables$1)
 -   including 'zy.t_stu' for further processing
2022-11-03 11:00:07   INFO 
(io.debezium.jdbc.JdbcConnection:lambda$doClose$3) - Connection gracefully 
closed
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.jobmaster.JobMaster:startSchedulingInternal) - 
Starting scheduling with scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - Job 
collect (eb0f7496f91a379a11275df436c9126e) switched from state CREATED to 
RUNNING.
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - 
Source: TableSourceScan(table=[[default_catalog, default_database, 
flink_t_stu]], fields=[userid, username, age, partition]) -> 
NotNullEnforcer(fields=[userid]) (1/4) (7efdc2342589f6956c0535432b64ff63) 
switched from CREATED to SCHEDULED.
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - 
Source: TableSourceScan(table=[[default_catalog, default_database, 
flink_t_stu]], fields=[userid, username, age, partition]) -> 
NotNullEnforcer(fields=[userid]) (2/4) (1908624c2dcdcd14e177c97e7cdd2ebd) 
switched from CREATED to SCHEDULED.
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - 
Source: TableSourceScan(table=[[default_catalog, default_database, 
flink_t_stu]], fields=[userid, username, age, partition]) -> 
NotNullEnforcer(fields=[userid]) (3/4) (3e53cda97f421ec89364880238602f02) 
switched from CREATED to SCHEDULED.
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - 
Source: TableSourceScan(table=[[default_catalog, default_database, 
flink_t_stu]], fields=[userid, username, age, partition]) -> 
NotNullEnforcer(fields=[userid]) (4/4) (7f41479cd1ae4863689ab9cbea9d78b6) 
switched from CREATED to SCHEDULED.
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - 
Sink: Collect table sink (1/1) (4ed520c7f7b76b4206674043f820df1e) switched from 
CREATED to SCHEDULED.
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.jobmaster.JobMaster:connectToResourceManager) - 
Connecting to ResourceManager 
akka://flink/user/rpc/resourcemanager_2(a0ede642fd5cff6cbf01b9b62abb4ae7)
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.jobmaster.JobMaster:lambda$startRegistration$0) - 
Resolved ResourceManager address, beginning registration
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:registerJobManager)
 - Registering job manager 
80572ffe1494b6a58e3117fbc66c4d96@akka://flink/user/rpc/jobmanager_3 for job 
eb0f7496f91a379a11275df436c9126e.
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:registerJobMasterInternal)
 - Registered job manager 
80572ffe1494b6a58e3117fbc66c4d96@akka://flink/user/rpc/jobmanager_3 for job 
eb0f7496f91a379a11275df436c9126e.
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.jobmaster.JobMaster:establishResourceManagerConnection)
 - JobManager successfully registered at ResourceManager, leader id: 
a0ede642fd5cff6cbf01b9b62abb4ae7.
2022-11-03 11:00:07   INFO 
(org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager:processResourceRequirements)
 - Received resource requirements from job eb0f7496f91a379a11275df436c9126e: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=4}]
2022-11-03 11:00:07   

Re:Re: Re: upsert kafka作为source时,消费不到kafka中的数据

2022-10-30 Thread






还是没有消费到,麻烦查看附件中的图片











在 2022-10-31 10:03:05,"guozhi mang"  写道:
>我想你的格式错了
>下面我修改了一下
>tenv.executeSql(
>" create table t_upsert_kafka( "
>+ "userid int ,"
>+ "username string,  "
>+ "age int, "
>+ "`partition` int ,"
>+ "  PRIMARY KEY (userid) NOT ENFORCED "
>+ " ) with ("
>+ "  'connector' = 'upsert-kafka',  "
>+ "  'topic' = 'test02',"
>+ "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
>+ "  'key.format' = 'json', "
>+ "  'value.format' = 'json'"
>+ " )  "
>);
>
>*下面是官方案例*
>
>CREATE TABLE pageviews_per_region (
>  user_region STRING,
>  pv BIGINT,
>  uv BIGINT,
>  PRIMARY KEY (user_region) NOT ENFORCED) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'pageviews_per_region',
>  'properties.bootstrap.servers' = '...',
>  'key.format' = 'avro',
>  'value.format' = 'avro');
>
>
>左岩 <13520871...@163.com> 于2022年10月31日周一 09:57写道:
>
>>
>>
>>
>>
>> public static void main(String[] args) throws Exception {
>> Configuration conf = new Configuration();
>> conf.setInteger("rest.port", 10041);
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> //env.setParallelism(1);
>>
>> env.enableCheckpointing(3000);
>> env.setStateBackend(new HashMapStateBackend());
>> env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt");
>>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>>
>> env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>> // 创建目标 kafka映射表
>> tenv.executeSql(
>> " create table t_upsert_kafka( "
>> + "userid int primary key not enforced,"
>> + "username string,  "
>> + "age int, "
>> + "`partition` int "
>> + " ) with ("
>> + "  'connector' = 'upsert-kafka',  "
>> + "  'topic' = 'test02',"
>> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
>> + "  'key.format' = 'json', "
>> + "  'value.format' = 'json'"
>> + " )  "
>> );
>>
>> tenv.executeSql("select * from t_upsert_kafka").print();
>>
>> tenv.executeSql(
>> " CREATE TABLE t_kafka_connector (   "
>> + "userid int ,"
>> + "username string,  "
>> + "age int, "
>> + "`partition` int "
>> + " ) WITH (   "
>> + "  'connector' = 'kafka',"
>> + "  'topic' = 'test02', "
>> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
>> + "  'properties.group.id' = 'testGroup1',  "
>> + "  'scan.startup.mode' = 'earliest-offset',   "
>> + "  'format'='json'   "
>> + " )   "
>>
>> );
>>
>> tenv.executeSql("select * from t_kafka_connector").print();
>>
>> env.execute();
>>
>>
>>
>>
>>
>> t_upsert_kafka 消费不到   t_kafka_connector可以消费到
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-10-31 09:43:49,"Shengkai Fang"  写道:
>> >hi,
>> >
>> >看不到的图片。能不能直接展示文字或者用图床工具?
>> >
>> >Best,
>> >Shengkai
>> >
>> >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
>> >
>> >> upsert kafka作为source时,消费不到kafka中的数据
>> >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
>> >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
>> >>
>>


Re:Re: upsert kafka作为source时,消费不到kafka中的数据

2022-10-30 Thread




public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
//env.setParallelism(1);

env.enableCheckpointing(3000);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt");
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 创建目标 kafka映射表
tenv.executeSql(
" create table t_upsert_kafka( "
+ "userid int primary key not enforced,"
+ "username string,  "
+ "age int, "
+ "`partition` int "
+ " ) with ("
+ "  'connector' = 'upsert-kafka',  "
+ "  'topic' = 'test02',"
+ "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
+ "  'key.format' = 'json', "
+ "  'value.format' = 'json'"
+ " )  "
);

tenv.executeSql("select * from t_upsert_kafka").print();

tenv.executeSql(
" CREATE TABLE t_kafka_connector (   "
+ "userid int ,"
+ "username string,  "
+ "age int, "
+ "`partition` int "
+ " ) WITH (   "
+ "  'connector' = 'kafka',"
+ "  'topic' = 'test02', "
+ "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
+ "  'properties.group.id' = 'testGroup1',  "
+ "  'scan.startup.mode' = 'earliest-offset',   "
+ "  'format'='json'   "
+ " )       "

);

tenv.executeSql("select * from t_kafka_connector").print();

env.execute();





t_upsert_kafka 消费不到   t_kafka_connector可以消费到











在 2022-10-31 09:43:49,"Shengkai Fang"  写道:
>hi,
>
>看不到的图片。能不能直接展示文字或者用图床工具?
>
>Best,
>Shengkai
>
>左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
>
>> upsert kafka作为source时,消费不到kafka中的数据
>> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
>> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
>>


upsert kafka作为source时,消费不到kafka中的数据

2022-10-28 Thread
upsert kafka作为source时,消费不到kafka中的数据
通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka 
连接器消费这个topic 就能读到,都是读的同一个topic,代码如下

Re:Flink 1.15 FileSink合并压缩的小文件后不可读

2022-08-04 Thread
您好,您这种情况我试了一下,确实不可读,具体原因可能还要看hdfs的支持(flink的Gzip压缩出去的文件hdfs是否认可,这个待确认),不过我这边使用lzo压缩,hdfs是可读的,可参考以下内容




//  create the stream with kafka source, test_topic must return Student!

val kafkaStream: DataStream[Student] = env

  .addSource(kafkaConsumer)




// 构建StreamingFileSink,指定BasePath和序列化Encoder

val sink: StreamingFileSink[Student] = StreamingFileSink

.forBulkFormat(outputBasePath, 
ParquetAvroCompressionWriters.forReflectRecord(classOf[Student], 
CompressionCodecName.LZO))

.withBucketAssigner(new EventDateTimeBucketAssigner("MMdd"))

.build()

// 添加Sink到InputDataSteam即可

kafkaStream.addSink(sink)




// execute program

env.execute("Kafka to Parquet")











在 2022-08-04 10:48:02,"Wenhao Xiao"  写道:
>大佬们好,有用过1.15 FileSink DataStream api的合并小文件功能吗,我这里写文件用gz格式压缩,发现合并后的文件读不了。
>Format 
>Type使用的是FileSink.forBulkFormat;BulkWriter.Factory使用的是CompressWriterFactory,使用gz压缩;合并文件的Compactor使用的是原生的ConcatFileCompactor。
>
>
>以下是使用FileSink的相关代码:
>CompressWriterFactory writer =
>   CompressWriters.forExtractor(new DefaultExtractor())
>   .withHadoopCompression("Gzip", new Configuration());
>
>FileSink hdfsSink = FileSink.forBulkFormat(new Path("hdfs://"), writer)
>   .withBucketCheckInterval(6)
>   .enableCompact(
>   FileCompactStrategy.Builder.newBuilder()
>   .setNumCompactThreads(1024)
>   .enableCompactionOnCheckpoint(3)
>   .build(),
>   new ConcatFileCompactor())
>   .build();
>
>
>当压缩的小文件被合并后,发现这个被合并的文件不可读:
>使用hdfs dfs -text 
>//2022-08-03--13/compacted-part-00857a94-bf23-4ae7-a55b-d39c2fdca565-0.gz后
>报错:text: invalid block type
>
>
>请问是1.15 FileSink DataStream api目前是还不支持压缩文件的合并吗?还是我有什么地方用错了或者没注意到的?
>
>
>感谢各位大佬抽空解答!


Re:hive catalog不支持jdk11

2022-07-26 Thread
JDK11目前是不支持的,需要把JDK版本降到JDK8,在用JDK11的时候,需要手动添加lib包,走到hive客户端连接的时候,是连接不上的,需要将三台服务器JDK的版本统一降为JDK8,而且配置文件(/etc/profile、hadoop-env.sh、yarn-env.sh)中的内容必须一致








在 2022-07-27 09:58:16,"Jeff"  写道:
>flink 1.15.1在jdk8的时能正常使用hive 
>catalog(3.1.3),但升级到jdk11后有版本问题,通过报错上网查询到hive目前不支持jdk11,请问这个有什么解决方案呢?