Jane Chan created FLINK-27652: --------------------------------- Summary: CompactManager.Rewriter cannot handle different partition keys invoked compaction Key: FLINK-27652 URL: https://issues.apache.org/jira/browse/FLINK-27652 Project: Flink Issue Type: Bug Components: Table Store Affects Versions: table-store-0.2.0 Reporter: Jane Chan Fix For: table-store-0.2.0
h3. Issue Description When enable {{commit.force-compact}} for partitioned managed table, there had a chance that the successive synchronized writes got failure. The root cause is h3. Root Cause {code:java} Caused by: java.io.IOException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.FileNotFoundException: File file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not exist or the user running Flink ('jane.cjm') has insufficient permissions to access it. at org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172) {code} However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to partition Autumn. It seems like the rewriter found the wrong partition/bucket with the wrong file. h3. How to Reproduce {code:java} /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.table.store.connector; import org.junit.Test; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; /** A reproducible case. */ public class ForceCompactionITCase extends FileStoreTableITCase { @Override protected List<String> ddl() { return Collections.singletonList( "CREATE TABLE IF NOT EXISTS T1 (" + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1)"); } @Test public void test() throws ExecutionException, InterruptedException { bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')"); bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')"); bEnv.executeSql( "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming')" + ",(2, 'Winter', 'The First Snowflake'), " + "(2, 'Spring', 'The First Rose in Spring'), " + "(7, 'Summer', 'Summertime Sadness')") .await(); bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')").await(); bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')").await(); bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')").await(); bEnv.executeSql( "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), " + "(4, 'Spring', 'Spring Water')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(6666, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(66666, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(666666, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(6666666, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); } } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)