Hi! 流作业写入 hive 的数据需要在 checkpoint 之后才可见。我看你的代码里没有启用 checkpoint,可以试一下启用 checkpoint。
jdbc sink 是一个写入即可见的 sink,但它只能保证最终一致性。也就是说如果中途作业出现 failover,那么从上一次 checkpoint 到 failover 之间写入 jdbc sink 的数据就是“冗余”的,要被作业重启后的新数据覆盖才能回归到一致性。 filesystem sink 写入的时候应该创建的是一个临时文件,filesystem source 是不会读这个文件的,只有 checkpoint 之后才会把临时文件重命名。 799590...@qq.com.INVALID <799590...@qq.com.invalid> 于2022年3月12日周六 14:51写道: > > 软件版本 > flink:1.13.6 > hive:1.1.1 > hadoop:2.6.0-cdh5.16.2 > > 通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下, > > 运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。 > > > 测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。 > > String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini"; > String keytab = "/home/tetris/conf/company.keytab"; > String principal = "company"; > System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH); > > Configuration configuration = new Configuration(); > configuration.set("hadoop.security.authentication", "kerberos"); > configuration.set("keytab.file", keytab); > configuration.setBoolean("hadoop.security.authorization", true); > configuration.set("kerberos.principal", principal); > UserGroupInformation.setConfiguration(configuration); > UserGroupInformation.loginUserFromKeytab(principal, keytab); > > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081); > StreamTableEnvironment flinkTableEnv = > StreamTableEnvironment.create(env,bsSettings); > > HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris", > "/home/tetris/conf", "1.1.1"); > flinkTableEnv.registerCatalog("myhive",hiveCatalog); > flinkTableEnv.useCatalog("myhive"); > flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print(); > flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id > STRING,status STRING) WITH (\n" + > "'connector' = 'kafka',\n" + > "'topic' = 'person',\n" + > "'properties.bootstrap.servers' = '121.4.89.228:9092',\n" + > "'properties.group.id' = 'testGroup',\n" + > "'scan.startup.mode' = 'latest-offset',\n" + > "'format' = 'json',\n" + > > "'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" + > ")").print(); > flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print(); > flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id > STRING,status STRING)").print(); > flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > TableResult result = flinkTableEnv.executeSql("INSERT INTO > output_2431_4930 SELECT id, user_id ,`status` FROM data_2431_4928"); > System.out.println(result.getJobClient().get().getJobID()); > > > > 谌祥,杭州 - java后端开发 - 大数据方向 > 799590...@qq.com >