Hi,

你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。

Best,
Jingsong

On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 你好:
>     我现在在使用flink 1.11.2版本 hive1.1.0 版本;
>     当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
>     好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
>     下面是我的代码
>      object StreamMain {
>   def main(args: Array[String]): Unit = {
>     val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>     streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     streamEnv.setParallelism(3)
>
>     val tableEnvSettings = EnvironmentSettings.newInstance()
>       .useBlinkPlanner()
>       .inStreamingMode()
>       .build()
>
>     val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
>     
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE)
>
>     
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20))
>
>     val dataStream = streamEnv.addSource(new MySource)
>
>     val catalogName = "my_catalog"
>     val catalog = new HiveCatalog(
>       catalogName,              // catalog name
>       "yutest",                // default database
>
>       "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive 
> config (hive-site.xml) directory
>       "1.1.0"                   // Hive version
>     )
>     tableEnv.registerCatalog(catalogName, catalog)
>     tableEnv.useCatalog(catalogName)
>
>     tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>     tableEnv.useDatabase("yutest")
>
>
>     tableEnv.createTemporaryView("users", dataStream)
>     tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
>     //      如果hive中已经存在了相应的表,则这段代码省略
>     val hiveSql = """CREATE external TABLE fs_table (
>                     user_id STRING,
>                     order_amount DOUBLE
>                   )
>                   partitioned by(
>                   dt string,
>                   h string,
>                   m string) stored as parquet
>                   TBLPROPERTIES (
>
>                     'partition.time-extractor.timestamp-pattern'='$dt 
> $h:$m:00',
>                     'sink.partition-commit.delay'='0s',
>                     'sink.partition-commit.trigger'='partition-time',
>
>                     
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>                   )""".stripMargin
>     tableEnv.executeSql(hiveSql)
>
>
>     val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
> DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
> FROM users"
>     tableEnv.executeSql(insertSql)
>   }
> }
> public class MySource implements SourceFunction<UserInfo> {
>     private volatile boolean run = true;
>     String userids[] = {
>
>             "4760858d-2bec-483c-a535-291de04b2247", 
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
>             "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
>             "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
>             "3ebfb9602ac07779||3ebfe9612a007979", 
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
>             "e7e896cd939685d7||e7e8e6c1930689d7", 
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
>     };
>
>     @Override
>
>     public void run(SourceFunction.SourceContext<UserInfo> sourceContext) 
> throws Exception {
>
>         while (run) {
>
>             String userid = userids[(int) (Math.random() * (userids.length - 
> 1))];
>             UserInfo userInfo = new UserInfo();
>             userInfo.setUserId(userid);
>             userInfo.setAmount(Math.random() * 100);
>             userInfo.setTs(new Timestamp(System.currentTimeMillis()));
>             sourceContext.collect(userInfo);
>             Thread.sleep(100);
>         }
>     }
>
>     @Override
>     public void cancel() {
>         run = false;
>     }
> }
> public class UserInfo implements Serializable {
>     private String userId;
>     private Double amount;
>     private Timestamp ts;
>
>     public String getUserId() {
>         return userId;
>     }
>
>     public void setUserId(String userId) {
>         this.userId = userId;
>     }
>
>     public Double getAmount() {
>         return amount;
>     }
>
>     public void setAmount(Double amount) {
>         this.amount = amount;
>     }
>
>     public Timestamp getTs() {
>         return ts;
>     }
>
>     public void setTs(Timestamp ts) {
>         this.ts = ts;
>     }
> }
>
> hive (yutest)>
>              >
>              > show partitions fs_table;
> OK
> partition
> Time taken: 20.214 seconds
>
> ------------------------------
> hdxg1101300...@163.com
>


-- 
Best, Jingsong Lee

Reply via email to