Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-20 Thread Yik San Chan
Hi Jingsong,

I have created a JIRA ticket
https://issues.apache.org/jira/browse/FLINK-23891.

Best,
Yik San

On Fri, Aug 20, 2021 at 3:32 PM Yik San Chan 
wrote:

> Hi Caizhi,
>
> Thanks for the work around! It should work fine.
>
> Hi Jingsong,
>
> Thanks for the suggestion. Before creating a JIRA ticket, I wonder if this
> is considered a valid ask at the first glance? If so, I will create a JIRA
> ticket.
>
> Best,
> Yik San
>
> On Fri, Aug 20, 2021 at 11:28 AM Jingsong Li 
> wrote:
>
>> Hi Yik,
>>
>> The **batch** Hive sink does not support
>> `sink.partition-commit.policy.kind`.
>>
>> Default **batch** Hive sink will commit metastore without success-file.
>>
>> You can create a JIRA for this.
>>
>> Best,
>> Jingsong
>>
>> On Fri, Aug 20, 2021 at 11:01 AM Caizhi Weng 
>> wrote:
>> >
>> > Hi!
>> >
>> > As far as I know Flink batch jobs will not add the _SUCCESS file.
>> However for batch jobs you can register a JobListener and add the _SUCCESS
>> file by yourself in JobListener#onJobExecuted. See registerJobListener
>> method in StreamExecutionEnvironment.
>> >
>> > Yik San Chan  于2021年8月20日周五 上午10:26写道:
>> >>
>> >> Hi community,
>> >>
>> >> According to the [docs](
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
>> if I create a Hive table with config
>> sink.partition-commit.policy.kind="metastore,success-file", once the write
>> to the **streaming** Hive sink is finished:
>> >>
>> >> - The HDFS directory will be registered to the Hive metastore,
>> >> - There will be a _SUCCESS file written to the directory when the job
>> finishes.
>> >>
>> >> An example result directory on HDFS looks like this:
>> >>
>> >> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
>> >> Found 9 items
>> >> -rw-r-   2 basedata aiinfra  0 2021-08-20 08:56
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
>> >> -rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
>> >> -rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
>> >> -rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
>> >> -rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
>> >> -rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
>> >> -rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
>> >> -rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
>> >> -rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0
>> >>
>> >> There are 8 part-* files because I set the flink run parallelism to 8.
>> After all part-* are written, a _SUCCESS file is added (see the timestamp
>> 08:56, which is later than all the rest).
>> >>
>> >> I wonder: can I do the same with **batch** Hive sink as well? Ideally,
>> after the job finishes, I would like to have a _SUCCESS file added to the
>> directory. However, I haven't figured out how to do it yet.
>> >>
>> >> Any help? Thanks!
>> >>
>> >> Best,
>> >> Yik San
>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-20 Thread Yik San Chan
Hi Caizhi,

Thanks for the work around! It should work fine.

Hi Jingsong,

Thanks for the suggestion. Before creating a JIRA ticket, I wonder if this
is considered a valid ask at the first glance? If so, I will create a JIRA
ticket.

Best,
Yik San

On Fri, Aug 20, 2021 at 11:28 AM Jingsong Li  wrote:

> Hi Yik,
>
> The **batch** Hive sink does not support
> `sink.partition-commit.policy.kind`.
>
> Default **batch** Hive sink will commit metastore without success-file.
>
> You can create a JIRA for this.
>
> Best,
> Jingsong
>
> On Fri, Aug 20, 2021 at 11:01 AM Caizhi Weng  wrote:
> >
> > Hi!
> >
> > As far as I know Flink batch jobs will not add the _SUCCESS file.
> However for batch jobs you can register a JobListener and add the _SUCCESS
> file by yourself in JobListener#onJobExecuted. See registerJobListener
> method in StreamExecutionEnvironment.
> >
> > Yik San Chan  于2021年8月20日周五 上午10:26写道:
> >>
> >> Hi community,
> >>
> >> According to the [docs](
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
> if I create a Hive table with config
> sink.partition-commit.policy.kind="metastore,success-file", once the write
> to the **streaming** Hive sink is finished:
> >>
> >> - The HDFS directory will be registered to the Hive metastore,
> >> - There will be a _SUCCESS file written to the directory when the job
> finishes.
> >>
> >> An example result directory on HDFS looks like this:
> >>
> >> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
> >> Found 9 items
> >> -rw-r-   2 basedata aiinfra  0 2021-08-20 08:56
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
> >> -rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
> >> -rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
> >> -rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
> >> -rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
> >> -rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
> >> -rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
> >> -rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
> >> -rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0
> >>
> >> There are 8 part-* files because I set the flink run parallelism to 8.
> After all part-* are written, a _SUCCESS file is added (see the timestamp
> 08:56, which is later than all the rest).
> >>
> >> I wonder: can I do the same with **batch** Hive sink as well? Ideally,
> after the job finishes, I would like to have a _SUCCESS file added to the
> directory. However, I haven't figured out how to do it yet.
> >>
> >> Any help? Thanks!
> >>
> >> Best,
> >> Yik San
>
>
>
> --
> Best, Jingsong Lee
>


Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Jingsong Li
Hi Yik,

The **batch** Hive sink does not support `sink.partition-commit.policy.kind`.

Default **batch** Hive sink will commit metastore without success-file.

You can create a JIRA for this.

Best,
Jingsong

On Fri, Aug 20, 2021 at 11:01 AM Caizhi Weng  wrote:
>
> Hi!
>
> As far as I know Flink batch jobs will not add the _SUCCESS file. However for 
> batch jobs you can register a JobListener and add the _SUCCESS file by 
> yourself in JobListener#onJobExecuted. See registerJobListener method in 
> StreamExecutionEnvironment.
>
> Yik San Chan  于2021年8月20日周五 上午10:26写道:
>>
>> Hi community,
>>
>> According to the 
>> [docs](https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
>>  if I create a Hive table with config 
>> sink.partition-commit.policy.kind="metastore,success-file", once the write 
>> to the **streaming** Hive sink is finished:
>>
>> - The HDFS directory will be registered to the Hive metastore,
>> - There will be a _SUCCESS file written to the directory when the job 
>> finishes.
>>
>> An example result directory on HDFS looks like this:
>>
>> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
>> Found 9 items
>> -rw-r-   2 basedata aiinfra  0 2021-08-20 08:56 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
>> -rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
>> -rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
>> -rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
>> -rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
>> -rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
>> -rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
>> -rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
>> -rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0
>>
>> There are 8 part-* files because I set the flink run parallelism to 8. After 
>> all part-* are written, a _SUCCESS file is added (see the timestamp 08:56, 
>> which is later than all the rest).
>>
>> I wonder: can I do the same with **batch** Hive sink as well? Ideally, after 
>> the job finishes, I would like to have a _SUCCESS file added to the 
>> directory. However, I haven't figured out how to do it yet.
>>
>> Any help? Thanks!
>>
>> Best,
>> Yik San



-- 
Best, Jingsong Lee


Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Caizhi Weng
Hi!

As far as I know Flink batch jobs will not add the _SUCCESS file. However
for batch jobs you can register a JobListener and add the _SUCCESS file by
yourself in JobListener#onJobExecuted. See registerJobListener method in
StreamExecutionEnvironment.

Yik San Chan  于2021年8月20日周五 上午10:26写道:

> Hi community,
>
> According to the [docs](
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
> if I create a Hive table with config
> sink.partition-commit.policy.kind="metastore,success-file", once the write
> to the **streaming** Hive sink is finished:
>
> - The HDFS directory will be registered to the Hive metastore,
> - There will be a _SUCCESS file written to the directory when the job
> finishes.
>
> An example result directory on HDFS looks like this:
>
> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
> Found 9 items
> -rw-r-   2 basedata aiinfra  0 2021-08-20 08:56
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
> -rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
> -rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
> -rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
> -rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
> -rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
> -rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
> -rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
> -rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0
>
> There are 8 part-* files because I set the flink run parallelism to 8.
> After all part-* are written, a _SUCCESS file is added (see the timestamp
> 08:56, which is later than all the rest).
>
> I wonder: can I do the same with **batch** Hive sink as well? Ideally,
> after the job finishes, I would like to have a _SUCCESS file added to the
> directory. However, I haven't figured out how to do it yet.
>
> Any help? Thanks!
>
> Best,
> Yik San
>


How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Yik San Chan
Hi community,

According to the [docs](
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
if I create a Hive table with config
sink.partition-commit.policy.kind="metastore,success-file", once the write
to the **streaming** Hive sink is finished:

- The HDFS directory will be registered to the Hive metastore,
- There will be a _SUCCESS file written to the directory when the job
finishes.

An example result directory on HDFS looks like this:

[10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
Found 9 items
-rw-r-   2 basedata aiinfra  0 2021-08-20 08:56
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
-rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
-rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
-rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
-rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
-rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
-rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
-rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
-rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0

There are 8 part-* files because I set the flink run parallelism to 8.
After all part-* are written, a _SUCCESS file is added (see the timestamp
08:56, which is later than all the rest).

I wonder: can I do the same with **batch** Hive sink as well? Ideally,
after the job finishes, I would like to have a _SUCCESS file added to the
directory. However, I haven't figured out how to do it yet.

Any help? Thanks!

Best,
Yik San