KYLIN-2529 fix implementation switch for threadLocal KylinConfig
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d659bade Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d659bade Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d659bade Branch: refs/heads/master Commit: d659bade9283ecbf99e31285e620e880ef6a06b1 Parents: 51f22e2 Author: lidongsjtu <[email protected]> Authored: Thu Jun 15 18:31:02 2017 +0800 Committer: liyang-gmt8 <[email protected]> Committed: Fri Jun 16 10:11:21 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/engine/EngineFactory.java | 21 ++--- .../org/apache/kylin/job/SchedulerFactory.java | 23 ++--- .../org/apache/kylin/source/SourceFactory.java | 22 +++-- .../apache/kylin/storage/StorageFactory.java | 16 ++-- .../kylin/storage/StorageFactoryTest.java | 93 ++++++++++++++++++++ 5 files changed, 133 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java index acaa7da..78b1efe 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java +++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java @@ -18,8 +18,6 @@ package org.apache.kylin.engine; -import java.util.Map; - import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ImplementationSwitch; import org.apache.kylin.cube.CubeSegment; @@ -30,14 +28,17 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; public class EngineFactory { - private static ImplementationSwitch<IBatchCubingEngine> batchEngines; - static { - Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getJobEngines(); - batchEngines = new ImplementationSwitch<>(impls, IBatchCubingEngine.class); - } + // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads. + private static ThreadLocal<ImplementationSwitch<IBatchCubingEngine>> engines = new ThreadLocal<>(); public static IBatchCubingEngine batchEngine(IEngineAware aware) { - return batchEngines.get(aware.getEngineType()); + ImplementationSwitch<IBatchCubingEngine> current = engines.get(); + if (current == null) { + current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getJobEngines(), + IBatchCubingEngine.class); + engines.set(current); + } + return current.get(aware.getEngineType()); } /** Mark deprecated to indicate for test purpose only */ @@ -45,11 +46,11 @@ public class EngineFactory { public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) { return batchEngine(cubeDesc).getJoinedFlatTableDesc(cubeDesc); } - + public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) { return batchEngine(newSegment).getJoinedFlatTableDesc(newSegment); } - + /** Build a new cube segment, typically its time range appends to the end of current cube. */ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter); http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java index 4eb76d1..1bf8942 100644 --- a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java +++ b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java @@ -17,27 +17,20 @@ */ package org.apache.kylin.job; -import java.util.Map; - import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ImplementationSwitch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** - */ public class SchedulerFactory { - - private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class); - private static ImplementationSwitch<Scheduler> schedulers; - - static { - Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getSchedulers(); - schedulers = new ImplementationSwitch<Scheduler>(impls, Scheduler.class); - } + // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads. + private static ThreadLocal<ImplementationSwitch<Scheduler>> schedulers = new ThreadLocal<>(); public static Scheduler scheduler(int schedulerType) { - return schedulers.get(schedulerType); + ImplementationSwitch<Scheduler> current = schedulers.get(); + if (current == null) { + current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getSchedulers(), Scheduler.class); + schedulers.set(current); + } + return current.get(schedulerType); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java index 86f89b8..365b505 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java @@ -19,7 +19,6 @@ package org.apache.kylin.source; import java.util.List; -import java.util.Map; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ImplementationSwitch; @@ -28,19 +27,24 @@ import org.apache.kylin.metadata.model.TableDesc; public class SourceFactory { - private static ImplementationSwitch<ISource> sources; - static { - Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getSourceEngines(); - sources = new ImplementationSwitch<>(impls, ISource.class); + // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads. + private static ThreadLocal<ImplementationSwitch<ISource>> sources = new ThreadLocal<>(); + + private static ISource getSource(int sourceType) { + ImplementationSwitch<ISource> current = sources.get(); + if (current == null) { + current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getSourceEngines(), ISource.class); + sources.set(current); + } + return current.get(sourceType); } - + public static ISource getDefaultSource() { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - return sources.get(config.getDefaultSource()); + return getSource(KylinConfig.getInstanceFromEnv().getDefaultSource()); } public static ISource getSource(ISourceAware aware) { - return sources.get(aware.getSourceType()); + return getSource(aware.getSourceType()); } public static IReadableTable createReadableTable(TableDesc table) { http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java index e7dd53c..79b93fe 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java @@ -18,8 +18,6 @@ package org.apache.kylin.storage; -import java.util.Map; - import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ImplementationSwitch; import org.apache.kylin.metadata.model.IStorageAware; @@ -29,14 +27,16 @@ import org.apache.kylin.metadata.realization.IRealization; */ public class StorageFactory { - private static ImplementationSwitch<IStorage> storages; - static { - Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getStorageEngines(); - storages = new ImplementationSwitch<IStorage>(impls, IStorage.class); - } + // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads. + private static ThreadLocal<ImplementationSwitch<IStorage>> storages = new ThreadLocal<>(); public static IStorage storage(IStorageAware aware) { - return storages.get(aware.getStorageType()); + ImplementationSwitch<IStorage> current = storages.get(); + if (storages.get() == null) { + current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getStorageEngines(), IStorage.class); + storages.set(current); + } + return current.get(aware.getStorageType()); } public static IStorageQuery createQuery(IRealization realization) { http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-storage/src/test/java/org/apache/kylin/storage/StorageFactoryTest.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/StorageFactoryTest.java b/core-storage/src/test/java/org/apache/kylin/storage/StorageFactoryTest.java new file mode 100644 index 0000000..c6ba032 --- /dev/null +++ b/core-storage/src/test/java/org/apache/kylin/storage/StorageFactoryTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage; + +import java.util.Properties; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.metadata.realization.IRealization; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class StorageFactoryTest { + @Before + public void setUp() throws Exception { + Properties props = new Properties(); + props.setProperty("kylin.storage.provider.0", MockupStorageEngine.class.getName()); + + KylinConfig.setKylinConfigInEnvIfMissing(props); + } + + @Test + public void testSingleThread() { + IStorage s1 = StorageFactory.storage(new MockupStorageAware()); + IStorage s2 = StorageFactory.storage(new MockupStorageAware()); + + Assert.assertSame(s1, s2); + } + + @Test + public void testMultipleThread() throws InterruptedException { + final IStorage[] s = new IStorage[2]; + + // thread 1 + Thread t = new Thread(new Runnable() { + @Override + public void run() { + s[0] = StorageFactory.storage(new MockupStorageAware()); + } + }); + t.start(); + t.join(); + + // thread 2 + t = new Thread(new Runnable() { + @Override + public void run() { + s[1] = StorageFactory.storage(new MockupStorageAware()); + } + }); + t.start(); + t.join(); + + Assert.assertNotSame(s[0], s[1]); + } + + class MockupStorageAware implements IStorageAware { + @Override + public int getStorageType() { + return 0; + } + } + + public static class MockupStorageEngine implements IStorage { + + @Override + public IStorageQuery createQuery(IRealization realization) { + return null; + } + + @Override + public <I> I adaptToBuildEngine(Class<I> engineInterface) { + return null; + } + } +}
