Re: Bug report for reading Hive table as streaming source.
Reported. JIRA link: https://issues.apache.org/jira/browse/FLINK-35118 On Mon, Apr 15, 2024 at 12:03 PM Xiaolong Wang wrote: > Sure > > On Mon, Apr 1, 2024 at 9:28 AM yuxia wrote: > >> Thanks for reporting. Could you please help create a jira about it? >> >> Best regards, >> Yuxia >> >> - 原始邮件 - >> 发件人: "Xiaolong Wang" >> 收件人: "dev" >> 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20 >> 主题: Re: Bug report for reading Hive table as streaming source. >> >> I think it worth mentioning in the documentation of Hive read that it >> cannot read a table that has more than 32,767 partitions. >> >> On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang < >> xiaolong.w...@smartnews.com> >> wrote: >> >> > Found out the reason: >> > >> > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch >> > partitions using the following method: >> > >> > List listPartitionNames(String db_name, String tbl_name, >> > short max_parts) throws MetaException, TException; >> > >> > where the max_parts represents the max number of partitions it can fetch >> > from the Hive metastore. >> > So the max number of partitions it can fetch is Short.MAX_VALUE, which >> is >> > 32767 . >> > >> > But the table has a way more partition number than the max value, thus >> the >> > list partition operations cannot fetch all partitions, hence it cannot >> > consume the recent partition. >> > >> > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang < >> xiaolong.w...@smartnews.com> >> > wrote: >> > >> >> Hi, >> >> >> >> I found a weird bug when reading a Hive table as a streaming source. >> >> >> >> In summary, if the first partition is not time related, then the Hive >> >> table cannot be read as a streaming source. >> >> >> >> e.g. >> >> >> >> I've a Hive table in the definition of >> >> >> >> ``` >> >> CREATE TABLE article ( >> >> id BIGINT, >> >> edition STRING, >> >> dt STRING, >> >> hh STRING >> >> ) >> >> PARTITIONED BY (edition, dt, hh) >> >> USING orc; >> >> ``` >> >> Then I try to query it as a streaming source: >> >> >> >> ``` >> >> INSERT INTO kafka_sink >> >> SELECT id >> >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true', >> >> 'streaming-source.partition-order' = 'partition-name', >> >> 'streaming-source.consume-start-offset' = >> >> 'edition=en_US/dt=2024-03-26/hh=00') */ >> >> ``` >> >> >> >> And I see no output in the `kafka_sink`. >> >> >> >> Then I defined an external table pointing to the same path but has no >> >> `edition` partition, >> >> >> >> ``` >> >> CREATE TABLE en_article ( >> >> id BIGINT, >> >> edition STRING, >> >> dt STRING, >> >> hh STRING >> >> ) >> >> PARTITIONED BY (edition, dt, hh) >> >> LOCATION 's3://xxx/article/edition=en_US' >> >> USING orc; >> >> ``` >> >> >> >> And insert with the following statement: >> >> >> >> ``` >> >> INSERT INTO kafka_sink >> >> SELECT id >> >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', >> >> 'streaming-source.partition-order' = 'partition-name', >> >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ >> >> ``` >> >> >> >> The data is sinked. >> >> >> >> >> > >> >
Re: Bug report for reading Hive table as streaming source.
Sure On Mon, Apr 1, 2024 at 9:28 AM yuxia wrote: > Thanks for reporting. Could you please help create a jira about it? > > Best regards, > Yuxia > > - 原始邮件 - > 发件人: "Xiaolong Wang" > 收件人: "dev" > 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20 > 主题: Re: Bug report for reading Hive table as streaming source. > > I think it worth mentioning in the documentation of Hive read that it > cannot read a table that has more than 32,767 partitions. > > On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang > > wrote: > > > Found out the reason: > > > > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch > > partitions using the following method: > > > > List listPartitionNames(String db_name, String tbl_name, > > short max_parts) throws MetaException, TException; > > > > where the max_parts represents the max number of partitions it can fetch > > from the Hive metastore. > > So the max number of partitions it can fetch is Short.MAX_VALUE, which is > > 32767 . > > > > But the table has a way more partition number than the max value, thus > the > > list partition operations cannot fetch all partitions, hence it cannot > > consume the recent partition. > > > > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang < > xiaolong.w...@smartnews.com> > > wrote: > > > >> Hi, > >> > >> I found a weird bug when reading a Hive table as a streaming source. > >> > >> In summary, if the first partition is not time related, then the Hive > >> table cannot be read as a streaming source. > >> > >> e.g. > >> > >> I've a Hive table in the definition of > >> > >> ``` > >> CREATE TABLE article ( > >> id BIGINT, > >> edition STRING, > >> dt STRING, > >> hh STRING > >> ) > >> PARTITIONED BY (edition, dt, hh) > >> USING orc; > >> ``` > >> Then I try to query it as a streaming source: > >> > >> ``` > >> INSERT INTO kafka_sink > >> SELECT id > >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true', > >> 'streaming-source.partition-order' = 'partition-name', > >> 'streaming-source.consume-start-offset' = > >> 'edition=en_US/dt=2024-03-26/hh=00') */ > >> ``` > >> > >> And I see no output in the `kafka_sink`. > >> > >> Then I defined an external table pointing to the same path but has no > >> `edition` partition, > >> > >> ``` > >> CREATE TABLE en_article ( > >> id BIGINT, > >> edition STRING, > >> dt STRING, > >> hh STRING > >> ) > >> PARTITIONED BY (edition, dt, hh) > >> LOCATION 's3://xxx/article/edition=en_US' > >> USING orc; > >> ``` > >> > >> And insert with the following statement: > >> > >> ``` > >> INSERT INTO kafka_sink > >> SELECT id > >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', > >> 'streaming-source.partition-order' = 'partition-name', > >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ > >> ``` > >> > >> The data is sinked. > >> > >> > > >
Re: Bug report for reading Hive table as streaming source.
Thanks for reporting. Could you please help create a jira about it? Best regards, Yuxia - 原始邮件 - 发件人: "Xiaolong Wang" 收件人: "dev" 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20 主题: Re: Bug report for reading Hive table as streaming source. I think it worth mentioning in the documentation of Hive read that it cannot read a table that has more than 32,767 partitions. On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang wrote: > Found out the reason: > > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch > partitions using the following method: > > List listPartitionNames(String db_name, String tbl_name, > short max_parts) throws MetaException, TException; > > where the max_parts represents the max number of partitions it can fetch > from the Hive metastore. > So the max number of partitions it can fetch is Short.MAX_VALUE, which is > 32767 . > > But the table has a way more partition number than the max value, thus the > list partition operations cannot fetch all partitions, hence it cannot > consume the recent partition. > > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang > wrote: > >> Hi, >> >> I found a weird bug when reading a Hive table as a streaming source. >> >> In summary, if the first partition is not time related, then the Hive >> table cannot be read as a streaming source. >> >> e.g. >> >> I've a Hive table in the definition of >> >> ``` >> CREATE TABLE article ( >> id BIGINT, >> edition STRING, >> dt STRING, >> hh STRING >> ) >> PARTITIONED BY (edition, dt, hh) >> USING orc; >> ``` >> Then I try to query it as a streaming source: >> >> ``` >> INSERT INTO kafka_sink >> SELECT id >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true', >> 'streaming-source.partition-order' = 'partition-name', >> 'streaming-source.consume-start-offset' = >> 'edition=en_US/dt=2024-03-26/hh=00') */ >> ``` >> >> And I see no output in the `kafka_sink`. >> >> Then I defined an external table pointing to the same path but has no >> `edition` partition, >> >> ``` >> CREATE TABLE en_article ( >> id BIGINT, >> edition STRING, >> dt STRING, >> hh STRING >> ) >> PARTITIONED BY (edition, dt, hh) >> LOCATION 's3://xxx/article/edition=en_US' >> USING orc; >> ``` >> >> And insert with the following statement: >> >> ``` >> INSERT INTO kafka_sink >> SELECT id >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', >> 'streaming-source.partition-order' = 'partition-name', >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ >> ``` >> >> The data is sinked. >> >> >
Re: Bug report for reading Hive table as streaming source.
I think it worth mentioning in the documentation of Hive read that it cannot read a table that has more than 32,767 partitions. On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang wrote: > Found out the reason: > > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch > partitions using the following method: > > List listPartitionNames(String db_name, String tbl_name, > short max_parts) throws MetaException, TException; > > where the max_parts represents the max number of partitions it can fetch > from the Hive metastore. > So the max number of partitions it can fetch is Short.MAX_VALUE, which is > 32767 . > > But the table has a way more partition number than the max value, thus the > list partition operations cannot fetch all partitions, hence it cannot > consume the recent partition. > > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang > wrote: > >> Hi, >> >> I found a weird bug when reading a Hive table as a streaming source. >> >> In summary, if the first partition is not time related, then the Hive >> table cannot be read as a streaming source. >> >> e.g. >> >> I've a Hive table in the definition of >> >> ``` >> CREATE TABLE article ( >> id BIGINT, >> edition STRING, >> dt STRING, >> hh STRING >> ) >> PARTITIONED BY (edition, dt, hh) >> USING orc; >> ``` >> Then I try to query it as a streaming source: >> >> ``` >> INSERT INTO kafka_sink >> SELECT id >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true', >> 'streaming-source.partition-order' = 'partition-name', >> 'streaming-source.consume-start-offset' = >> 'edition=en_US/dt=2024-03-26/hh=00') */ >> ``` >> >> And I see no output in the `kafka_sink`. >> >> Then I defined an external table pointing to the same path but has no >> `edition` partition, >> >> ``` >> CREATE TABLE en_article ( >> id BIGINT, >> edition STRING, >> dt STRING, >> hh STRING >> ) >> PARTITIONED BY (edition, dt, hh) >> LOCATION 's3://xxx/article/edition=en_US' >> USING orc; >> ``` >> >> And insert with the following statement: >> >> ``` >> INSERT INTO kafka_sink >> SELECT id >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', >> 'streaming-source.partition-order' = 'partition-name', >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ >> ``` >> >> The data is sinked. >> >> >
Re: Bug report for reading Hive table as streaming source.
Found out the reason: It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch partitions using the following method: List listPartitionNames(String db_name, String tbl_name, short max_parts) throws MetaException, TException; where the max_parts represents the max number of partitions it can fetch from the Hive metastore. So the max number of partitions it can fetch is Short.MAX_VALUE, which is 32767 . But the table has a way more partition number than the max value, thus the list partition operations cannot fetch all partitions, hence it cannot consume the recent partition. On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang wrote: > Hi, > > I found a weird bug when reading a Hive table as a streaming source. > > In summary, if the first partition is not time related, then the Hive > table cannot be read as a streaming source. > > e.g. > > I've a Hive table in the definition of > > ``` > CREATE TABLE article ( > id BIGINT, > edition STRING, > dt STRING, > hh STRING > ) > PARTITIONED BY (edition, dt, hh) > USING orc; > ``` > Then I try to query it as a streaming source: > > ``` > INSERT INTO kafka_sink > SELECT id > FROM article /*+ OPTIONS('streaming-source.enable' = 'true', > 'streaming-source.partition-order' = 'partition-name', > 'streaming-source.consume-start-offset' = > 'edition=en_US/dt=2024-03-26/hh=00') */ > ``` > > And I see no output in the `kafka_sink`. > > Then I defined an external table pointing to the same path but has no > `edition` partition, > > ``` > CREATE TABLE en_article ( > id BIGINT, > edition STRING, > dt STRING, > hh STRING > ) > PARTITIONED BY (edition, dt, hh) > LOCATION 's3://xxx/article/edition=en_US' > USING orc; > ``` > > And insert with the following statement: > > ``` > INSERT INTO kafka_sink > SELECT id > FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', > 'streaming-source.partition-order' = 'partition-name', > 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ > ``` > > The data is sinked. > >