Re:Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
CDC是自己编译的2.3,对应flink1.14的版本,还有一个问题是,可以读到变更数据。比如11点30写入mysql,flinkcdc读出来要慢几分钟,5~7分钟之后才能读到新写入或者变更的数据,第二个问题就行,变更数据插不到另外一张mysql表里 在 2022-11-07 10:11:56,"Shengkai Fang" 写道: >你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。 > >Best, >Shengkai > >左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道: > >> >> >> >> >> >> >> >> >> >> .print(); 去掉也不行, >> >> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢 >> >> >> >> >> >> >> >> >> 在 2022-11-04 16:52:08,"yinghua...@163.com" 写道: >> >> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下 >> >StatementSet statementSet = tenv.createStatementSet(); >> >statementSet.addInsertSql(sql1); >> >statementSet.addInsertSql(sql2); >> >TableResult result = statementSet.execute(); >> >result.getJobClient().get().getJobID().toString(); >> > >> > >> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。 >> >// 查询 >> >tenv.executeSql("select * from flink_t_stu").print(); >> 这个任务给去掉 >> > >> > >> > >> >yinghua...@163.com >> > >> >发件人: 左岩 >> >发送时间: 2022-11-04 14:34 >> >收件人: user-zh >> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里 >> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 >> >代码如下:控制台打印情况见附件 >> >public static void main(String[] args) throws Exception { >> >Configuration conf = new Configuration(); >> >conf.setInteger("rest.port", 10041); >> >StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(conf); >> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env); >> > >> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); >> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); >> > >> >env.setParallelism(1); >> >// 建表 >> >tenv.executeSql("CREATE TABLE flink_t_stu ( " + >> >" userid INT, " + >> >" username string, " + >> >" age string, " + >> >" `partition` INT, " + >> >" PRIMARY KEY(userid) NOT ENFORCED " + >> >" ) WITH ( " + >> >" 'connector' = 'mysql-cdc', " + >> >" 'server-id' = '5401-5404', " + >> >" 'scan.startup.mode' = 'latest-offset', " + >> >//" 'scan.startup.mode' = 'earliest-offset', " + >> >" 'hostname' = '192.168.0.220', " + >> >" 'port' = '3306', " + >> >" 'username' = 'root', " + >> >" 'password' = 'root', " + >> >" 'database-name' = 'zy', " + >> >" 'table-name' = 't_stu' " + >> >")"); >> > >> >// 查询 >> >tenv.executeSql("select * from flink_t_stu").print(); >> > >> > >> >// 建一个目标表,用来存放查询结果 >> >tenv.executeSql( >> >"CREATE TABLE flink_t_stu2 ( " + >> >" userid INT, " + >> >" username string, " + >> >" age string, " + >> >" `partition` INT, " + >> >" PRIMARY KEY(userid) NOT ENFORCED " + >> >" ) WITH ( " + >> >" 'connector' = 'jdbc', " + >> >" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + >> >" 'table-name' = 't_stu2', " + >> >" 'username' = 'root', " + >> >" 'password' = 'root' " + >> >")" >> >); >> > >> >tenv.executeSql("INSERT INTO flink_t_stu2 " + >> >"SELECT * FROM flink_t_stu"); >> >env.execute(); >> > >> >} >>
Re:回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
.print(); 去掉也不行, 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢 在 2022-11-04 16:52:08,"yinghua...@163.com" 写道: >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下 >StatementSet statementSet = tenv.createStatementSet(); >statementSet.addInsertSql(sql1); >statementSet.addInsertSql(sql2); >TableResult result = statementSet.execute(); >result.getJobClient().get().getJobID().toString(); > > >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。 >// 查询 >tenv.executeSql("select * from flink_t_stu").print(); >--------这个任务给去掉 > > > >yinghua...@163.com > >发件人: 左岩 >发送时间: 2022-11-04 14:34 >收件人: user-zh >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里 >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 >代码如下:控制台打印情况见附件 >public static void main(String[] args) throws Exception { >Configuration conf = new Configuration(); >conf.setInteger("rest.port", 10041); >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(conf); >StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); > >env.setParallelism(1); >// 建表 >tenv.executeSql("CREATE TABLE flink_t_stu ( " + >" userid INT, " + >" username string, " + >" age string, " + >" `partition` INT, " + >" PRIMARY KEY(userid) NOT ENFORCED " + >" ) WITH ( " + >" 'connector' = 'mysql-cdc', " + >" 'server-id' = '5401-5404', " + >" 'scan.startup.mode' = 'latest-offset', " + >//" 'scan.startup.mode' = 'earliest-offset', " + >" 'hostname' = '192.168.0.220', " + >" 'port' = '3306', " + >" 'username' = 'root', " + >" 'password' = 'root', " + >" 'database-name' = 'zy', " + >" 'table-name' = 't_stu' " + >")"); > >// 查询 >tenv.executeSql("select * from flink_t_stu").print(); > > >// 建一个目标表,用来存放查询结果 >tenv.executeSql( >"CREATE TABLE flink_t_stu2 ( " + >" userid INT, " + >" username string, " + >" age string, " + >" `partition` INT, " + >" PRIMARY KEY(userid) NOT ENFORCED " + >" ) WITH ( " + >" 'connector' = 'jdbc', " + >" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + >" 'table-name' = 't_stu2', " + >" 'username' = 'root', " + >" 'password' = 'root' " + >")" >); > >tenv.executeSql("INSERT INTO flink_t_stu2 " + >"SELECT * FROM flink_t_stu"); >env.execute(); > >}
FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 代码如下:控制台打印情况见附件 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 10041); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); env.setParallelism(1); // 建表 tenv.executeSql("CREATE TABLE flink_t_stu ( " + " userid INT, " + " username string, " + " age string, " + " `partition` INT, " + " PRIMARY KEY(userid) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'mysql-cdc', " + " 'server-id' = '5401-5404', " + " 'scan.startup.mode' = 'latest-offset', " + //" 'scan.startup.mode' = 'earliest-offset', " + " 'hostname' = '192.168.0.220', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'root', " + " 'database-name' = 'zy', " + " 'table-name' = 't_stu' " + ")"); // 查询 tenv.executeSql("select * from flink_t_stu").print(); // 建一个目标表,用来存放查询结果 tenv.executeSql( "CREATE TABLE flink_t_stu2 ( " + " userid INT, " + " username string, " + " age string, " + " `partition` INT, " + " PRIMARY KEY(userid) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'jdbc', " + " 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + " 'table-name' = 't_stu2', " + " 'username' = 'root', " + " 'password' = 'root' " + ")" ); tenv.executeSql("INSERT INTO flink_t_stu2 " + "SELECT * FROM flink_t_stu"); env.execute(); }
flinkcdc 读不到mysql中数据
我用的是flink1.14 ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件) public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 10041); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); //StreamTableEnvironment tenv = StreamTableEnvironment.create(env); env.setParallelism(4); // 建表 tenv.executeSql("CREATE TABLE flink_t_stu ( " + " userid INT, " + " username string, " + " age string, " + " `partition` INT, " + " PRIMARY KEY(userid) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'mysql-cdc', " + " 'server-id' = '5401-5404', " + " 'hostname' = '192.168.0.220', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'root', " + " 'database-name' = 'zy', " + " 'table-name' = 't_stu' " + ")"); // 查询 tenv.executeSql("select * from flink_t_stu").print(); env.execute(); }2022-11-03 11:00:07 INFO (com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils:lambda$listTables$1) - including 'zy.t_stu' for further processing 2022-11-03 11:00:07 INFO (io.debezium.jdbc.JdbcConnection:lambda$doClose$3) - Connection gracefully closed 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.jobmaster.JobMaster:startSchedulingInternal) - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - Job collect (eb0f7496f91a379a11275df436c9126e) switched from state CREATED to RUNNING. 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - Source: TableSourceScan(table=[[default_catalog, default_database, flink_t_stu]], fields=[userid, username, age, partition]) -> NotNullEnforcer(fields=[userid]) (1/4) (7efdc2342589f6956c0535432b64ff63) switched from CREATED to SCHEDULED. 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - Source: TableSourceScan(table=[[default_catalog, default_database, flink_t_stu]], fields=[userid, username, age, partition]) -> NotNullEnforcer(fields=[userid]) (2/4) (1908624c2dcdcd14e177c97e7cdd2ebd) switched from CREATED to SCHEDULED. 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - Source: TableSourceScan(table=[[default_catalog, default_database, flink_t_stu]], fields=[userid, username, age, partition]) -> NotNullEnforcer(fields=[userid]) (3/4) (3e53cda97f421ec89364880238602f02) switched from CREATED to SCHEDULED. 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - Source: TableSourceScan(table=[[default_catalog, default_database, flink_t_stu]], fields=[userid, username, age, partition]) -> NotNullEnforcer(fields=[userid]) (4/4) (7f41479cd1ae4863689ab9cbea9d78b6) switched from CREATED to SCHEDULED. 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.executiongraph.ExecutionGraph:transitionState) - Sink: Collect table sink (1/1) (4ed520c7f7b76b4206674043f820df1e) switched from CREATED to SCHEDULED. 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.jobmaster.JobMaster:connectToResourceManager) - Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_2(a0ede642fd5cff6cbf01b9b62abb4ae7) 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.jobmaster.JobMaster:lambda$startRegistration$0) - Resolved ResourceManager address, beginning registration 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:registerJobManager) - Registering job manager 80572ffe1494b6a58e3117fbc66c4d96@akka://flink/user/rpc/jobmanager_3 for job eb0f7496f91a379a11275df436c9126e. 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:registerJobMasterInternal) - Registered job manager 80572ffe1494b6a58e3117fbc66c4d96@akka://flink/user/rpc/jobmanager_3 for job eb0f7496f91a379a11275df436c9126e. 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.jobmaster.JobMaster:establishResourceManagerConnection) - JobManager successfully registered at ResourceManager, leader id: a0ede642fd5cff6cbf01b9b62abb4ae7. 2022-11-03 11:00:07 INFO (org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager:processResourceRequirements) - Received resource requirements from job eb0f7496f91a379a11275df436c9126e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=4}] 2022-11-03 11:00:07
Re:Re: Re: upsert kafka作为source时,消费不到kafka中的数据
还是没有消费到,麻烦查看附件中的图片 在 2022-10-31 10:03:05,"guozhi mang" 写道: >我想你的格式错了 >下面我修改了一下 >tenv.executeSql( >" create table t_upsert_kafka( " >+ "userid int ," >+ "username string, " >+ "age int, " >+ "`partition` int ," >+ " PRIMARY KEY (userid) NOT ENFORCED " >+ " ) with (" >+ " 'connector' = 'upsert-kafka', " >+ " 'topic' = 'test02'," >+ " 'properties.bootstrap.servers' = '192.168.0.82:9092', " >+ " 'key.format' = 'json', " >+ " 'value.format' = 'json'" >+ " ) " >); > >*下面是官方案例* > >CREATE TABLE pageviews_per_region ( > user_region STRING, > pv BIGINT, > uv BIGINT, > PRIMARY KEY (user_region) NOT ENFORCED) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'pageviews_per_region', > 'properties.bootstrap.servers' = '...', > 'key.format' = 'avro', > 'value.format' = 'avro'); > > >左岩 <13520871...@163.com> 于2022年10月31日周一 09:57写道: > >> >> >> >> >> public static void main(String[] args) throws Exception { >> Configuration conf = new Configuration(); >> conf.setInteger("rest.port", 10041); >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(conf); >> StreamTableEnvironment tenv = StreamTableEnvironment.create(env); >> //env.setParallelism(1); >> >> env.enableCheckpointing(3000); >> env.setStateBackend(new HashMapStateBackend()); >> env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt"); >> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> env.getCheckpointConfig().setCheckpointTimeout(20 * 1000); >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); >> >> env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> >> // 创建目标 kafka映射表 >> tenv.executeSql( >> " create table t_upsert_kafka( " >> + "userid int primary key not enforced," >> + "username string, " >> + "age int, " >> + "`partition` int " >> + " ) with (" >> + " 'connector' = 'upsert-kafka', " >> + " 'topic' = 'test02'," >> + " 'properties.bootstrap.servers' = '192.168.0.82:9092', " >> + " 'key.format' = 'json', " >> + " 'value.format' = 'json'" >> + " ) " >> ); >> >> tenv.executeSql("select * from t_upsert_kafka").print(); >> >> tenv.executeSql( >> " CREATE TABLE t_kafka_connector ( " >> + "userid int ," >> + "username string, " >> + "age int, " >> + "`partition` int " >> + " ) WITH ( " >> + " 'connector' = 'kafka'," >> + " 'topic' = 'test02', " >> + " 'properties.bootstrap.servers' = '192.168.0.82:9092', " >> + " 'properties.group.id' = 'testGroup1', " >> + " 'scan.startup.mode' = 'earliest-offset', " >> + " 'format'='json' " >> + " ) " >> >> ); >> >> tenv.executeSql("select * from t_kafka_connector").print(); >> >> env.execute(); >> >> >> >> >> >> t_upsert_kafka 消费不到 t_kafka_connector可以消费到 >> >> >> >> >> >> >> >> >> >> >> >> 在 2022-10-31 09:43:49,"Shengkai Fang" 写道: >> >hi, >> > >> >看不到的图片。能不能直接展示文字或者用图床工具? >> > >> >Best, >> >Shengkai >> > >> >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道: >> > >> >> upsert kafka作为source时,消费不到kafka中的数据 >> >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka >> >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下 >> >> >>
Re:Re: upsert kafka作为source时,消费不到kafka中的数据
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 10041); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); //env.setParallelism(1); env.enableCheckpointing(3000); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt"); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(20 * 1000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 创建目标 kafka映射表 tenv.executeSql( " create table t_upsert_kafka( " + "userid int primary key not enforced," + "username string, " + "age int, " + "`partition` int " + " ) with (" + " 'connector' = 'upsert-kafka', " + " 'topic' = 'test02'," + " 'properties.bootstrap.servers' = '192.168.0.82:9092', " + " 'key.format' = 'json', " + " 'value.format' = 'json'" + " ) " ); tenv.executeSql("select * from t_upsert_kafka").print(); tenv.executeSql( " CREATE TABLE t_kafka_connector ( " + "userid int ," + "username string, " + "age int, " + "`partition` int " + " ) WITH ( " + " 'connector' = 'kafka'," + " 'topic' = 'test02', " + " 'properties.bootstrap.servers' = '192.168.0.82:9092', " + " 'properties.group.id' = 'testGroup1', " + " 'scan.startup.mode' = 'earliest-offset', " + " 'format'='json' " + " ) " ); tenv.executeSql("select * from t_kafka_connector").print(); env.execute(); t_upsert_kafka 消费不到 t_kafka_connector可以消费到 在 2022-10-31 09:43:49,"Shengkai Fang" 写道: >hi, > >看不到的图片。能不能直接展示文字或者用图床工具? > >Best, >Shengkai > >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道: > >> upsert kafka作为source时,消费不到kafka中的数据 >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下 >>
upsert kafka作为source时,消费不到kafka中的数据
upsert kafka作为source时,消费不到kafka中的数据 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
Re:Flink 1.15 FileSink合并压缩的小文件后不可读
您好,您这种情况我试了一下,确实不可读,具体原因可能还要看hdfs的支持(flink的Gzip压缩出去的文件hdfs是否认可,这个待确认),不过我这边使用lzo压缩,hdfs是可读的,可参考以下内容 // create the stream with kafka source, test_topic must return Student! val kafkaStream: DataStream[Student] = env .addSource(kafkaConsumer) // 构建StreamingFileSink,指定BasePath和序列化Encoder val sink: StreamingFileSink[Student] = StreamingFileSink .forBulkFormat(outputBasePath, ParquetAvroCompressionWriters.forReflectRecord(classOf[Student], CompressionCodecName.LZO)) .withBucketAssigner(new EventDateTimeBucketAssigner("MMdd")) .build() // 添加Sink到InputDataSteam即可 kafkaStream.addSink(sink) // execute program env.execute("Kafka to Parquet") 在 2022-08-04 10:48:02,"Wenhao Xiao" 写道: >大佬们好,有用过1.15 FileSink DataStream api的合并小文件功能吗,我这里写文件用gz格式压缩,发现合并后的文件读不了。 >Format >Type使用的是FileSink.forBulkFormat;BulkWriter.Factory使用的是CompressWriterFactory,使用gz压缩;合并文件的Compactor使用的是原生的ConcatFileCompactor。 > > >以下是使用FileSink的相关代码: >CompressWriterFactory writer = > CompressWriters.forExtractor(new DefaultExtractor()) > .withHadoopCompression("Gzip", new Configuration()); > >FileSink hdfsSink = FileSink.forBulkFormat(new Path("hdfs://"), writer) > .withBucketCheckInterval(6) > .enableCompact( > FileCompactStrategy.Builder.newBuilder() > .setNumCompactThreads(1024) > .enableCompactionOnCheckpoint(3) > .build(), > new ConcatFileCompactor()) > .build(); > > >当压缩的小文件被合并后,发现这个被合并的文件不可读: >使用hdfs dfs -text >//2022-08-03--13/compacted-part-00857a94-bf23-4ae7-a55b-d39c2fdca565-0.gz后 >报错:text: invalid block type > > >请问是1.15 FileSink DataStream api目前是还不支持压缩文件的合并吗?还是我有什么地方用错了或者没注意到的? > > >感谢各位大佬抽空解答!
Re:hive catalog不支持jdk11
JDK11目前是不支持的,需要把JDK版本降到JDK8,在用JDK11的时候,需要手动添加lib包,走到hive客户端连接的时候,是连接不上的,需要将三台服务器JDK的版本统一降为JDK8,而且配置文件(/etc/profile、hadoop-env.sh、yarn-env.sh)中的内容必须一致 在 2022-07-27 09:58:16,"Jeff" 写道: >flink 1.15.1在jdk8的时能正常使用hive >catalog(3.1.3),但升级到jdk11后有版本问题,通过报错上网查询到hive目前不支持jdk11,请问这个有什么解决方案呢?