This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 36a7fde [FLINK-13458][table] ThreadLocalCache clashes for Blink planner 36a7fde is described below commit 36a7fde092df89e3be7660075df3dcdfafc762b6 Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Jul 29 09:32:48 2019 +0200 [FLINK-13458][table] ThreadLocalCache clashes for Blink planner This closes #9257. --- .../flink/table/utils}/ThreadLocalCache.java | 10 +++-- .../runtime/functions/DateTimeFunctions.scala | 1 + .../table/runtime/functions/ThreadLocalCache.scala | 49 ---------------------- .../table/runtime/functions/SqlDateTimeUtils.java | 1 + .../table/runtime/functions/SqlFunctionUtils.java | 1 + 5 files changed, 10 insertions(+), 52 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java similarity index 90% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java index 39de4df..ca47ab0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java @@ -16,15 +16,19 @@ * limitations under the License. */ -package org.apache.flink.table.runtime.functions; +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; import java.util.LinkedHashMap; import java.util.Map; /** - * Provides a ThreadLocal cache with a maximum cache size per thread. - * Values must not be null. + * Provides a thread local cache with a maximum cache size per thread. + * + * <p>Note: Values must not be null. */ +@Internal public abstract class ThreadLocalCache<K, V> { private static final int DEFAULT_CACHE_SIZE = 64; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala index d69a5c9..0bde983 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.table.runtime.functions +import org.apache.flink.table.utils.ThreadLocalCache import org.joda.time.format.DateTimeFormatter import org.joda.time.format.DateTimeFormatterBuilder diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala deleted file mode 100644 index b3a8d7a..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.functions - -import java.util.{LinkedHashMap => JLinkedHashMap} -import java.util.{Map => JMap} - -/** - * Provides a ThreadLocal cache with a maximum cache size per thread. - * Values must not be null. - */ -abstract class ThreadLocalCache[K, V](val maxSizePerThread: Int) { - private val cache = new ThreadLocal[BoundedMap[K, V]] - - protected def getNewInstance(key: K): V - - def get(key: K): V = { - var m = cache.get - if (m == null) { - m = new BoundedMap(maxSizePerThread) - cache.set(m) - } - var v = m.get(key) - if (v == null) { - v = getNewInstance(key) - m.put(key, v) - } - v - } -} - -private class BoundedMap[K, V](val maxSize: Int) extends JLinkedHashMap[K,V] { - override protected def removeEldestEntry(eldest: JMap.Entry[K, V]): Boolean = size > maxSize -} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java index 118dca7..3cd1246 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.table.dataformat.Decimal; import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.utils.ThreadLocalCache; import org.apache.calcite.avatica.util.DateTimeUtils; import org.apache.calcite.avatica.util.TimeUnit; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java index 2e00fa1..6ae9b8b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.table.dataformat.BinaryStringUtil; import org.apache.flink.table.dataformat.Decimal; import org.apache.flink.table.runtime.util.JsonUtils; import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.table.utils.ThreadLocalCache; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger;