nastra commented on a change in pull request #3543: URL: https://github.com/apache/iceberg/pull/3543#discussion_r749226473
########## File path: core/src/test/java/org/apache/iceberg/CachingCatalogTestHelper.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.github.benmanes.caffeine.cache.Ticker; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.Assert; + +/** + * Utility class for accessing package-private VisibleForTesting annotated methods + * outside the org.apache.iceberg package. + */ +public class CachingCatalogTestHelper { + + // TODO - Consider making this a delegator? + private CachingCatalogTestHelper() { + } + + // Escape package-private with VisibleForTesting annotation + public static CachingCatalog wrapWithTicker(Catalog catalog, boolean expirationEnabled, long expirationIntervalMillis, + Ticker ticker) { + return (CachingCatalog) CachingCatalog.wrap(catalog, expirationEnabled, expirationIntervalMillis, ticker); + } + + public static void assertTableIsCached(String assertionMessage, CachingCatalog catalog, TableIdentifier identifier) { + catalog.cache().cleanUp(); + Assert.assertTrue(assertionMessage, catalog.tableFromCacheQuietly(identifier).isPresent()); Review comment: having methods in the Cache return Optionals makes testing this more difficult than necessary imo. It also makes it more difficult to debug when the test actually fails. I would suggest something like `Assertions.assertThat(catalog.cache().asMap()).containsKey(identifier);` as that will show you all the entries in the cache when this check fails, thus making our lives easier :) ########## File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase { + + private final String catalogConfigPrefix; + private final boolean isCacheEnabled; + private final boolean isCacheExpirationEnabled; + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { "testcacheexpirationnotenabled", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + CatalogProperties.TABLE_CACHE_ENABLED, "false", + CatalogProperties.TABLE_CACHE_EXPIRATION_ENABLED, "false" + ) }, + { "testhadoop", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hadoop", + CatalogProperties.TABLE_CACHE_ENABLED, "true", + CatalogProperties.TABLE_CACHE_EXPIRATION_ENABLED, "false" + ) } + }; + } + + public TestSparkCatalogCacheExpiration(String catalogName, + String implementation, + Map<String, String> config) { + super(catalogName, implementation, config); + this.catalogConfigPrefix = "spark.sql.catalog." + catalogName + "."; + this.isCacheEnabled = Boolean.parseBoolean( + config.getOrDefault(CatalogProperties.TABLE_CACHE_ENABLED, "true")); + this.isCacheExpirationEnabled = Boolean.parseBoolean( + config.getOrDefault(CatalogProperties.TABLE_CACHE_EXPIRATION_ENABLED, "false")); + } + + // TODO - Looking for a better way to test this. Not 100% sure how to access the Iceberg catalog Review comment: you could try something like `Assertions.assertThat(catalog).extracting("icebergCatalog").isInstanceOf(CachingCatalog.class);` ########## File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase { + + private final String catalogConfigPrefix; + private final boolean isCacheEnabled; + private final boolean isCacheExpirationEnabled; + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { "testcacheexpirationnotenabled", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + CatalogProperties.TABLE_CACHE_ENABLED, "false", + CatalogProperties.TABLE_CACHE_EXPIRATION_ENABLED, "false" + ) }, + { "testhadoop", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hadoop", + CatalogProperties.TABLE_CACHE_ENABLED, "true", + CatalogProperties.TABLE_CACHE_EXPIRATION_ENABLED, "false" + ) } + }; + } + + public TestSparkCatalogCacheExpiration(String catalogName, + String implementation, + Map<String, String> config) { + super(catalogName, implementation, config); + this.catalogConfigPrefix = "spark.sql.catalog." + catalogName + "."; + this.isCacheEnabled = Boolean.parseBoolean( + config.getOrDefault(CatalogProperties.TABLE_CACHE_ENABLED, "true")); + this.isCacheExpirationEnabled = Boolean.parseBoolean( + config.getOrDefault(CatalogProperties.TABLE_CACHE_EXPIRATION_ENABLED, "false")); + } + + // TODO - Looking for a better way to test this. Not 100% sure how to access the Iceberg catalog Review comment: so basically your checks could become something like this: ``` Assertions.assertThat(catalog).extracting("icebergCatalog").isInstanceOf(CachingCatalog.class); Assertions.assertThat(catalog).extracting("icebergCatalog").extracting("expirationIntervalMillis").isEqualTo(0); Assertions.assertThat(catalog).extracting("icebergCatalog").extracting("expirationEnabled").isEqualTo(true); Assertions.assertThat(catalog).extracting("cacheEnabled").isEqualTo(true) ``` ########## File path: core/src/test/java/org/apache/iceberg/CachingCatalogTestHelper.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.github.benmanes.caffeine.cache.Ticker; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.Assert; + +/** + * Utility class for accessing package-private VisibleForTesting annotated methods + * outside the org.apache.iceberg package. + */ +public class CachingCatalogTestHelper { + + // TODO - Consider making this a delegator? + private CachingCatalogTestHelper() { + } + + // Escape package-private with VisibleForTesting annotation + public static CachingCatalog wrapWithTicker(Catalog catalog, boolean expirationEnabled, long expirationIntervalMillis, + Ticker ticker) { + return (CachingCatalog) CachingCatalog.wrap(catalog, expirationEnabled, expirationIntervalMillis, ticker); + } + + public static void assertTableIsCached(String assertionMessage, CachingCatalog catalog, TableIdentifier identifier) { + catalog.cache().cleanUp(); Review comment: cleanup is already being called by `catalog.tableFromCacheQuietly(identifier).isPresent()`. Also I don't think test code should be calling stuff on the cache directly ########## File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java ########## @@ -20,33 +20,143 @@ package org.apache.iceberg; import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.CacheWriter; import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Ticker; +import java.time.Duration; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CachingCatalog implements Catalog { + + private static final Logger LOG = LoggerFactory.getLogger(CachingCatalog.class); + private static final RemovalListener<TableIdentifier, Table> identLoggingRemovalListener = + (key, value, cause) -> LOG.debug("{} was evicted from the TableCache with {} cause", key, cause); + public static Catalog wrap(Catalog catalog) { - return wrap(catalog, true); + return wrap(catalog, false, 0); + } + + public static Catalog wrap(Catalog catalog, boolean expirationEnabled, long expirationIntervalMilllis) { + return wrap(catalog, true, expirationEnabled, expirationIntervalMilllis); + } + + public static Catalog wrap( + Catalog catalog, boolean caseSensitive, boolean expirationEnabled, long expirationIntervalMillis) { + return new CachingCatalog(catalog, caseSensitive, expirationEnabled, expirationIntervalMillis); } - public static Catalog wrap(Catalog catalog, boolean caseSensitive) { - return new CachingCatalog(catalog, caseSensitive); + @VisibleForTesting + static Catalog wrap(Catalog catalog, boolean expirationEnabled, long expirationIntervalMillis, Ticker ticker) { + return new CachingCatalog(catalog, true, expirationEnabled, expirationIntervalMillis, ticker); } - private final Cache<TableIdentifier, Table> tableCache = Caffeine.newBuilder().softValues().build(); private final Catalog catalog; private final boolean caseSensitive; + private final boolean expirationEnabled; + private final long expirationIntervalMillis; + private final Cache<TableIdentifier, Table> tableCache; + + private CachingCatalog(Catalog catalog, + boolean caseSensitive, + boolean isExpirationEnabled, + long expirationIntervalInMillis) { + this(catalog, caseSensitive, isExpirationEnabled, expirationIntervalInMillis, Ticker.systemTicker()); + } - private CachingCatalog(Catalog catalog, boolean caseSensitive) { + private CachingCatalog(Catalog catalog, + boolean caseSensitive, + boolean isExpirationEnabled, + long expirationIntervalMillis, + Ticker ticker) { this.catalog = catalog; this.caseSensitive = caseSensitive; + this.expirationEnabled = isExpirationEnabled; + this.expirationIntervalMillis = expirationIntervalMillis; + + this.tableCache = createTableCache(ticker); + } + + /** + * Return the age of an entry in the cache. + * <p> + * This method is only visible for testing the cache expiration policy, as cache invalidation is handled + * by the catalog and not the cache itself. + * <p> + * Returns the age of the cache entry corresponding to the identifier, or {@code Optional.empty} if the table + * is not present in the cache or if no expireAfterAccess policy is present in this CachingCatalog. + */ + @VisibleForTesting + Optional<Duration> cachedEntryAge(TableIdentifier identifier) { + return tableCache.policy() Review comment: maybe `tableCache.policy().expireAfterAccess().get().ageOf(identifier)`? ########## File path: core/src/test/java/org/apache/iceberg/CachingCatalogTestHelper.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.github.benmanes.caffeine.cache.Ticker; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.Assert; + +/** + * Utility class for accessing package-private VisibleForTesting annotated methods + * outside the org.apache.iceberg package. + */ +public class CachingCatalogTestHelper { + + // TODO - Consider making this a delegator? + private CachingCatalogTestHelper() { + } + + // Escape package-private with VisibleForTesting annotation + public static CachingCatalog wrapWithTicker(Catalog catalog, boolean expirationEnabled, long expirationIntervalMillis, + Ticker ticker) { + return (CachingCatalog) CachingCatalog.wrap(catalog, expirationEnabled, expirationIntervalMillis, ticker); + } + + public static void assertTableIsCached(String assertionMessage, CachingCatalog catalog, TableIdentifier identifier) { + catalog.cache().cleanUp(); + Assert.assertTrue(assertionMessage, catalog.tableFromCacheQuietly(identifier).isPresent()); + } + + public static void assertCatalogHasExpiredTable(CachingCatalog catalog, TableIdentifier tableIdent) { + Assert.assertFalse("The catalog should not serve table's that are past their TTL", Review comment: maybe replace this with `Assertions.assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);`. Then you don't need to have a specific error message, since this code will actually show you the entries in the cache when the test fails, thus it will be quite obvious why this check failed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org