Found out the reason: It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch partitions using the following method:
List<String> listPartitionNames(String db_name, String tbl_name, short max_parts) throws MetaException, TException; where the max_parts represents the max number of partitions it can fetch from the Hive metastore. So the max number of partitions it can fetch is Short.MAX_VALUE, which is 32767 . But the table has a way more partition number than the max value, thus the list partition operations cannot fetch all partitions, hence it cannot consume the recent partition. On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang <xiaolong.w...@smartnews.com> wrote: > Hi, > > I found a weird bug when reading a Hive table as a streaming source. > > In summary, if the first partition is not time related, then the Hive > table cannot be read as a streaming source. > > e.g. > > I've a Hive table in the definition of > > ``` > CREATE TABLE article ( > id BIGINT, > edition STRING, > dt STRING, > hh STRING > ) > PARTITIONED BY (edition, dt, hh) > USING orc; > ``` > Then I try to query it as a streaming source: > > ``` > INSERT INTO kafka_sink > SELECT id > FROM article /*+ OPTIONS('streaming-source.enable' = 'true', > 'streaming-source.partition-order' = 'partition-name', > 'streaming-source.consume-start-offset' = > 'edition=en_US/dt=2024-03-26/hh=00') */ > ``` > > And I see no output in the `kafka_sink`. > > Then I defined an external table pointing to the same path but has no > `edition` partition, > > ``` > CREATE TABLE en_article ( > id BIGINT, > edition STRING, > dt STRING, > hh STRING > ) > PARTITIONED BY (edition, dt, hh) > LOCATION 's3://xxx/article/edition=en_US' > USING orc; > ``` > > And insert with the following statement: > > ``` > INSERT INTO kafka_sink > SELECT id > FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', > 'streaming-source.partition-order' = 'partition-name', > 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ > ``` > > The data is sinked. > >