[ 
https://issues.apache.org/jira/browse/FLINK-27652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-27652:
------------------------------
    Description: 
h3. Issue Description

When enabling {{commit.force-compact}} for the partitioned managed table, there 
had a chance that the successive synchronized
writes got failure.  {{rewrite}} method messing up with the wrong data file 
with the {{partition}} and {{{}bucket{}}}.
{code:java}
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: java.io.FileNotFoundException: File 
file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0
 does not exist or the user running Flink ('jane.cjm') has insufficient 
permissions to access it. at 
org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
{code}
However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to 
partition Autumn. It seems like the rewriter found the wrong partition/bucket 
with the wrong file.
h3. How to Reproduce
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.store.connector;

import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

/** A reproducible case. */
public class ForceCompactionITCase extends FileStoreTableITCase {

    @Override
    protected List<String> ddl() {
        return Collections.singletonList(
                "CREATE TABLE IF NOT EXISTS T1 ("
                        + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1)");
    }

    @Test
    public void test() throws ExecutionException, InterruptedException {
        bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')");
        bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming')"
                                + ",(2, 'Winter', 'The First Snowflake'), "
                                + "(2, 'Spring', 'The First Rose in Spring'), "
                                + "(7, 'Summer', 'Summertime Sadness')")
                .await();
        bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last 
Christmas')").await();
        bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is 
Coming')").await();
        bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 
'Refrain')").await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon 
Sugar'), "
                                + "(4, 'Spring', 'Spring Water')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(6666, 'Summer', 'Summer Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(66666, 'Summer', 'Summer Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(666666, 'Summer', 'Summer 
Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(6666666, 'Summer', 'Summer 
Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
    }
}

{code}

  was:
h3. Issue Description
When enabling {{commit.force-compact}} for the partitioned managed table, there 
had a chance that the successive synchronized
writes got failure.  The current impl of {{CompactManager.Rewriter}} is an 
anonymous class in {{FileStoreWriteImpl}}. However, the {{partition}} and 
{{bucket}} are referenced as local variables, and the dataFileReader is 
initiliazed when rewrite is called; and this may lead to the {{rewrite}} method 
messing up with the wrong data file with the {{partition}} and {{bucket}}.

h3. Root Cause
{code:java}
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: java.io.FileNotFoundException: File 
file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0
 does not exist or the user running Flink ('jane.cjm') has insufficient 
permissions to access it. at 
org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
{code}
However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to 
partition Autumn. It seems like the rewriter found the wrong partition/bucket 
with the wrong file.

h3. How to Reproduce
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.store.connector;

import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

/** A reproducible case. */
public class ForceCompactionITCase extends FileStoreTableITCase {

    @Override
    protected List<String> ddl() {
        return Collections.singletonList(
                "CREATE TABLE IF NOT EXISTS T1 ("
                        + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1)");
    }

    @Test
    public void test() throws ExecutionException, InterruptedException {
        bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')");
        bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming')"
                                + ",(2, 'Winter', 'The First Snowflake'), "
                                + "(2, 'Spring', 'The First Rose in Spring'), "
                                + "(7, 'Summer', 'Summertime Sadness')")
                .await();
        bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last 
Christmas')").await();
        bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is 
Coming')").await();
        bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 
'Refrain')").await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon 
Sugar'), "
                                + "(4, 'Spring', 'Spring Water')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(6666, 'Summer', 'Summer Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(66666, 'Summer', 'Summer Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(666666, 'Summer', 'Summer 
Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
        bEnv.executeSql(
                        "INSERT INTO T1 VALUES(6666666, 'Summer', 'Summer 
Vibe'),"
                                + " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
                .await();
    }
}

{code}


> CompactManager.Rewriter cannot handle different partition keys invoked 
> compaction
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-27652
>                 URL: https://issues.apache.org/jira/browse/FLINK-27652
>             Project: Flink
>          Issue Type: Bug
>          Components: Table Store
>    Affects Versions: table-store-0.2.0
>            Reporter: Jane Chan
>            Assignee: Jane Chan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: table-store-0.2.0
>
>
> h3. Issue Description
> When enabling {{commit.force-compact}} for the partitioned managed table, 
> there had a chance that the successive synchronized
> writes got failure.  {{rewrite}} method messing up with the wrong data file 
> with the {{partition}} and {{{}bucket{}}}.
> {code:java}
> Caused by: java.io.IOException: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: java.io.FileNotFoundException: File 
> file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0
>  does not exist or the user running Flink ('jane.cjm') has insufficient 
> permissions to access it. at 
> org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
> {code}
> However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to 
> partition Autumn. It seems like the rewriter found the wrong partition/bucket 
> with the wrong file.
> h3. How to Reproduce
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.flink.table.store.connector;
> import org.junit.Test;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.ExecutionException;
> /** A reproducible case. */
> public class ForceCompactionITCase extends FileStoreTableITCase {
>     @Override
>     protected List<String> ddl() {
>         return Collections.singletonList(
>                 "CREATE TABLE IF NOT EXISTS T1 ("
>                         + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY 
> (f1)");
>     }
>     @Test
>     public void test() throws ExecutionException, InterruptedException {
>         bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')");
>         bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 
> 'true')");
>         bEnv.executeSql(
>                         "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is 
> Coming')"
>                                 + ",(2, 'Winter', 'The First Snowflake'), "
>                                 + "(2, 'Spring', 'The First Rose in Spring'), 
> "
>                                 + "(7, 'Summer', 'Summertime Sadness')")
>                 .await();
>         bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last 
> Christmas')").await();
>         bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is 
> Coming')").await();
>         bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 
> 'Refrain')").await();
>         bEnv.executeSql(
>                         "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon 
> Sugar'), "
>                                 + "(4, 'Spring', 'Spring Water')")
>                 .await();
>         bEnv.executeSql(
>                         "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
>                                 + " (9, 'Autumn', 'Wake Me Up When September 
> Ends')")
>                 .await();
>         bEnv.executeSql(
>                         "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'),"
>                                 + " (9, 'Autumn', 'Wake Me Up When September 
> Ends')")
>                 .await();
>         bEnv.executeSql(
>                         "INSERT INTO T1 VALUES(6666, 'Summer', 'Summer 
> Vibe'),"
>                                 + " (9, 'Autumn', 'Wake Me Up When September 
> Ends')")
>                 .await();
>         bEnv.executeSql(
>                         "INSERT INTO T1 VALUES(66666, 'Summer', 'Summer 
> Vibe'),"
>                                 + " (9, 'Autumn', 'Wake Me Up When September 
> Ends')")
>                 .await();
>         bEnv.executeSql(
>                         "INSERT INTO T1 VALUES(666666, 'Summer', 'Summer 
> Vibe'),"
>                                 + " (9, 'Autumn', 'Wake Me Up When September 
> Ends')")
>                 .await();
>         bEnv.executeSql(
>                         "INSERT INTO T1 VALUES(6666666, 'Summer', 'Summer 
> Vibe'),"
>                                 + " (9, 'Autumn', 'Wake Me Up When September 
> Ends')")
>                 .await();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to