Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?
Hi Jingsong, I have created a JIRA ticket https://issues.apache.org/jira/browse/FLINK-23891. Best, Yik San On Fri, Aug 20, 2021 at 3:32 PM Yik San Chan wrote: > Hi Caizhi, > > Thanks for the work around! It should work fine. > > Hi Jingsong, > > Thanks for the suggestion. Before creating a JIRA ticket, I wonder if this > is considered a valid ask at the first glance? If so, I will create a JIRA > ticket. > > Best, > Yik San > > On Fri, Aug 20, 2021 at 11:28 AM Jingsong Li > wrote: > >> Hi Yik, >> >> The **batch** Hive sink does not support >> `sink.partition-commit.policy.kind`. >> >> Default **batch** Hive sink will commit metastore without success-file. >> >> You can create a JIRA for this. >> >> Best, >> Jingsong >> >> On Fri, Aug 20, 2021 at 11:01 AM Caizhi Weng >> wrote: >> > >> > Hi! >> > >> > As far as I know Flink batch jobs will not add the _SUCCESS file. >> However for batch jobs you can register a JobListener and add the _SUCCESS >> file by yourself in JobListener#onJobExecuted. See registerJobListener >> method in StreamExecutionEnvironment. >> > >> > Yik San Chan 于2021年8月20日周五 上午10:26写道: >> >> >> >> Hi community, >> >> >> >> According to the [docs]( >> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy), >> if I create a Hive table with config >> sink.partition-commit.policy.kind="metastore,success-file", once the write >> to the **streaming** Hive sink is finished: >> >> >> >> - The HDFS directory will be registered to the Hive metastore, >> >> - There will be a _SUCCESS file written to the directory when the job >> finishes. >> >> >> >> An example result directory on HDFS looks like this: >> >> >> >> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819 >> >> Found 9 items >> >> -rw-r- 2 basedata aiinfra 0 2021-08-20 08:56 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS >> >> -rw-r- 2 basedata aiinfra 10684668 2021-08-20 08:49 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0 >> >> -rw-r- 2 basedata aiinfra 10712792 2021-08-20 08:48 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0 >> >> -rw-r- 2 basedata aiinfra 10759066 2021-08-20 08:46 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0 >> >> -rw-r- 2 basedata aiinfra 10754886 2021-08-20 08:46 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0 >> >> -rw-r- 2 basedata aiinfra 10681155 2021-08-20 08:45 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0 >> >> -rw-r- 2 basedata aiinfra 10725101 2021-08-20 08:46 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0 >> >> -rw-r- 2 basedata aiinfra 10717976 2021-08-20 08:56 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0 >> >> -rw-r- 2 basedata aiinfra 10585453 2021-08-20 08:45 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0 >> >> >> >> There are 8 part-* files because I set the flink run parallelism to 8. >> After all part-* are written, a _SUCCESS file is added (see the timestamp >> 08:56, which is later than all the rest). >> >> >> >> I wonder: can I do the same with **batch** Hive sink as well? Ideally, >> after the job finishes, I would like to have a _SUCCESS file added to the >> directory. However, I haven't figured out how to do it yet. >> >> >> >> Any help? Thanks! >> >> >> >> Best, >> >> Yik San >> >> >> >> -- >> Best, Jingsong Lee >> >
Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?
Hi Caizhi, Thanks for the work around! It should work fine. Hi Jingsong, Thanks for the suggestion. Before creating a JIRA ticket, I wonder if this is considered a valid ask at the first glance? If so, I will create a JIRA ticket. Best, Yik San On Fri, Aug 20, 2021 at 11:28 AM Jingsong Li wrote: > Hi Yik, > > The **batch** Hive sink does not support > `sink.partition-commit.policy.kind`. > > Default **batch** Hive sink will commit metastore without success-file. > > You can create a JIRA for this. > > Best, > Jingsong > > On Fri, Aug 20, 2021 at 11:01 AM Caizhi Weng wrote: > > > > Hi! > > > > As far as I know Flink batch jobs will not add the _SUCCESS file. > However for batch jobs you can register a JobListener and add the _SUCCESS > file by yourself in JobListener#onJobExecuted. See registerJobListener > method in StreamExecutionEnvironment. > > > > Yik San Chan 于2021年8月20日周五 上午10:26写道: > >> > >> Hi community, > >> > >> According to the [docs]( > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy), > if I create a Hive table with config > sink.partition-commit.policy.kind="metastore,success-file", once the write > to the **streaming** Hive sink is finished: > >> > >> - The HDFS directory will be registered to the Hive metastore, > >> - There will be a _SUCCESS file written to the directory when the job > finishes. > >> > >> An example result directory on HDFS looks like this: > >> > >> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819 > >> Found 9 items > >> -rw-r- 2 basedata aiinfra 0 2021-08-20 08:56 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS > >> -rw-r- 2 basedata aiinfra 10684668 2021-08-20 08:49 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0 > >> -rw-r- 2 basedata aiinfra 10712792 2021-08-20 08:48 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0 > >> -rw-r- 2 basedata aiinfra 10759066 2021-08-20 08:46 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0 > >> -rw-r- 2 basedata aiinfra 10754886 2021-08-20 08:46 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0 > >> -rw-r- 2 basedata aiinfra 10681155 2021-08-20 08:45 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0 > >> -rw-r- 2 basedata aiinfra 10725101 2021-08-20 08:46 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0 > >> -rw-r- 2 basedata aiinfra 10717976 2021-08-20 08:56 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0 > >> -rw-r- 2 basedata aiinfra 10585453 2021-08-20 08:45 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0 > >> > >> There are 8 part-* files because I set the flink run parallelism to 8. > After all part-* are written, a _SUCCESS file is added (see the timestamp > 08:56, which is later than all the rest). > >> > >> I wonder: can I do the same with **batch** Hive sink as well? Ideally, > after the job finishes, I would like to have a _SUCCESS file added to the > directory. However, I haven't figured out how to do it yet. > >> > >> Any help? Thanks! > >> > >> Best, > >> Yik San > > > > -- > Best, Jingsong Lee >
Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?
Hi Yik, The **batch** Hive sink does not support `sink.partition-commit.policy.kind`. Default **batch** Hive sink will commit metastore without success-file. You can create a JIRA for this. Best, Jingsong On Fri, Aug 20, 2021 at 11:01 AM Caizhi Weng wrote: > > Hi! > > As far as I know Flink batch jobs will not add the _SUCCESS file. However for > batch jobs you can register a JobListener and add the _SUCCESS file by > yourself in JobListener#onJobExecuted. See registerJobListener method in > StreamExecutionEnvironment. > > Yik San Chan 于2021年8月20日周五 上午10:26写道: >> >> Hi community, >> >> According to the >> [docs](https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy), >> if I create a Hive table with config >> sink.partition-commit.policy.kind="metastore,success-file", once the write >> to the **streaming** Hive sink is finished: >> >> - The HDFS directory will be registered to the Hive metastore, >> - There will be a _SUCCESS file written to the directory when the job >> finishes. >> >> An example result directory on HDFS looks like this: >> >> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819 >> Found 9 items >> -rw-r- 2 basedata aiinfra 0 2021-08-20 08:56 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS >> -rw-r- 2 basedata aiinfra 10684668 2021-08-20 08:49 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0 >> -rw-r- 2 basedata aiinfra 10712792 2021-08-20 08:48 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0 >> -rw-r- 2 basedata aiinfra 10759066 2021-08-20 08:46 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0 >> -rw-r- 2 basedata aiinfra 10754886 2021-08-20 08:46 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0 >> -rw-r- 2 basedata aiinfra 10681155 2021-08-20 08:45 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0 >> -rw-r- 2 basedata aiinfra 10725101 2021-08-20 08:46 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0 >> -rw-r- 2 basedata aiinfra 10717976 2021-08-20 08:56 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0 >> -rw-r- 2 basedata aiinfra 10585453 2021-08-20 08:45 >> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0 >> >> There are 8 part-* files because I set the flink run parallelism to 8. After >> all part-* are written, a _SUCCESS file is added (see the timestamp 08:56, >> which is later than all the rest). >> >> I wonder: can I do the same with **batch** Hive sink as well? Ideally, after >> the job finishes, I would like to have a _SUCCESS file added to the >> directory. However, I haven't figured out how to do it yet. >> >> Any help? Thanks! >> >> Best, >> Yik San -- Best, Jingsong Lee
Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?
Hi! As far as I know Flink batch jobs will not add the _SUCCESS file. However for batch jobs you can register a JobListener and add the _SUCCESS file by yourself in JobListener#onJobExecuted. See registerJobListener method in StreamExecutionEnvironment. Yik San Chan 于2021年8月20日周五 上午10:26写道: > Hi community, > > According to the [docs]( > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy), > if I create a Hive table with config > sink.partition-commit.policy.kind="metastore,success-file", once the write > to the **streaming** Hive sink is finished: > > - The HDFS directory will be registered to the Hive metastore, > - There will be a _SUCCESS file written to the directory when the job > finishes. > > An example result directory on HDFS looks like this: > > [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819 > Found 9 items > -rw-r- 2 basedata aiinfra 0 2021-08-20 08:56 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS > -rw-r- 2 basedata aiinfra 10684668 2021-08-20 08:49 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0 > -rw-r- 2 basedata aiinfra 10712792 2021-08-20 08:48 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0 > -rw-r- 2 basedata aiinfra 10759066 2021-08-20 08:46 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0 > -rw-r- 2 basedata aiinfra 10754886 2021-08-20 08:46 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0 > -rw-r- 2 basedata aiinfra 10681155 2021-08-20 08:45 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0 > -rw-r- 2 basedata aiinfra 10725101 2021-08-20 08:46 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0 > -rw-r- 2 basedata aiinfra 10717976 2021-08-20 08:56 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0 > -rw-r- 2 basedata aiinfra 10585453 2021-08-20 08:45 > /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0 > > There are 8 part-* files because I set the flink run parallelism to 8. > After all part-* are written, a _SUCCESS file is added (see the timestamp > 08:56, which is later than all the rest). > > I wonder: can I do the same with **batch** Hive sink as well? Ideally, > after the job finishes, I would like to have a _SUCCESS file added to the > directory. However, I haven't figured out how to do it yet. > > Any help? Thanks! > > Best, > Yik San >
How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?
Hi community, According to the [docs]( https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy), if I create a Hive table with config sink.partition-commit.policy.kind="metastore,success-file", once the write to the **streaming** Hive sink is finished: - The HDFS directory will be registered to the Hive metastore, - There will be a _SUCCESS file written to the directory when the job finishes. An example result directory on HDFS looks like this: [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819 Found 9 items -rw-r- 2 basedata aiinfra 0 2021-08-20 08:56 /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS -rw-r- 2 basedata aiinfra 10684668 2021-08-20 08:49 /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0 -rw-r- 2 basedata aiinfra 10712792 2021-08-20 08:48 /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0 -rw-r- 2 basedata aiinfra 10759066 2021-08-20 08:46 /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0 -rw-r- 2 basedata aiinfra 10754886 2021-08-20 08:46 /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0 -rw-r- 2 basedata aiinfra 10681155 2021-08-20 08:45 /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0 -rw-r- 2 basedata aiinfra 10725101 2021-08-20 08:46 /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0 -rw-r- 2 basedata aiinfra 10717976 2021-08-20 08:56 /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0 -rw-r- 2 basedata aiinfra 10585453 2021-08-20 08:45 /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0 There are 8 part-* files because I set the flink run parallelism to 8. After all part-* are written, a _SUCCESS file is added (see the timestamp 08:56, which is later than all the rest). I wonder: can I do the same with **batch** Hive sink as well? Ideally, after the job finishes, I would like to have a _SUCCESS file added to the directory. However, I haven't figured out how to do it yet. Any help? Thanks! Best, Yik San