KYLIN-2529 fix implementation switch for threadLocal KylinConfig

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d659bade
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d659bade
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d659bade

Branch: refs/heads/master
Commit: d659bade9283ecbf99e31285e620e880ef6a06b1
Parents: 51f22e2
Author: lidongsjtu <[email protected]>
Authored: Thu Jun 15 18:31:02 2017 +0800
Committer: liyang-gmt8 <[email protected]>
Committed: Fri Jun 16 10:11:21 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/EngineFactory.java  | 21 ++---
 .../org/apache/kylin/job/SchedulerFactory.java  | 23 ++---
 .../org/apache/kylin/source/SourceFactory.java  | 22 +++--
 .../apache/kylin/storage/StorageFactory.java    | 16 ++--
 .../kylin/storage/StorageFactoryTest.java       | 93 ++++++++++++++++++++
 5 files changed, 133 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java 
b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index acaa7da..78b1efe 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.engine;
 
-import java.util.Map;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.cube.CubeSegment;
@@ -30,14 +28,17 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 public class EngineFactory {
 
-    private static ImplementationSwitch<IBatchCubingEngine> batchEngines;
-    static {
-        Map<Integer, String> impls = 
KylinConfig.getInstanceFromEnv().getJobEngines();
-        batchEngines = new ImplementationSwitch<>(impls, 
IBatchCubingEngine.class);
-    }
+    // Use thread-local because KylinConfig can be thread-local and 
implementation might be different among multiple threads.
+    private static ThreadLocal<ImplementationSwitch<IBatchCubingEngine>> 
engines = new ThreadLocal<>();
 
     public static IBatchCubingEngine batchEngine(IEngineAware aware) {
-        return batchEngines.get(aware.getEngineType());
+        ImplementationSwitch<IBatchCubingEngine> current = engines.get();
+        if (current == null) {
+            current = new 
ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getJobEngines(),
+                    IBatchCubingEngine.class);
+            engines.set(current);
+        }
+        return current.get(aware.getEngineType());
     }
 
     /** Mark deprecated to indicate for test purpose only */
@@ -45,11 +46,11 @@ public class EngineFactory {
     public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc 
cubeDesc) {
         return batchEngine(cubeDesc).getJoinedFlatTableDesc(cubeDesc);
     }
-    
+
     public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment 
newSegment) {
         return batchEngine(newSegment).getJoinedFlatTableDesc(newSegment);
     }
-    
+
     /** Build a new cube segment, typically its time range appends to the end 
of current cube. */
     public static DefaultChainedExecutable createBatchCubingJob(CubeSegment 
newSegment, String submitter) {
         return batchEngine(newSegment).createBatchCubingJob(newSegment, 
submitter);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java 
b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
index 4eb76d1..1bf8942 100644
--- a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
@@ -17,27 +17,20 @@
 */
 package org.apache.kylin.job;
 
-import java.util.Map;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ImplementationSwitch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-/**
- */
 public class SchedulerFactory {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(SchedulerFactory.class);
-    private static ImplementationSwitch<Scheduler> schedulers;
-
-    static {
-        Map<Integer, String> impls = 
KylinConfig.getInstanceFromEnv().getSchedulers();
-        schedulers = new ImplementationSwitch<Scheduler>(impls, 
Scheduler.class);
-    }
+    // Use thread-local because KylinConfig can be thread-local and 
implementation might be different among multiple threads.
+    private static ThreadLocal<ImplementationSwitch<Scheduler>> schedulers = 
new ThreadLocal<>();
 
     public static Scheduler scheduler(int schedulerType) {
-        return schedulers.get(schedulerType);
+        ImplementationSwitch<Scheduler> current = schedulers.get();
+        if (current == null) {
+            current = new 
ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getSchedulers(), 
Scheduler.class);
+            schedulers.set(current);
+        }
+        return current.get(schedulerType);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java 
b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index 86f89b8..365b505 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.source;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ImplementationSwitch;
@@ -28,19 +27,24 @@ import org.apache.kylin.metadata.model.TableDesc;
 
 public class SourceFactory {
 
-    private static ImplementationSwitch<ISource> sources;
-    static {
-        Map<Integer, String> impls = 
KylinConfig.getInstanceFromEnv().getSourceEngines();
-        sources = new ImplementationSwitch<>(impls, ISource.class);
+    // Use thread-local because KylinConfig can be thread-local and 
implementation might be different among multiple threads.
+    private static ThreadLocal<ImplementationSwitch<ISource>> sources = new 
ThreadLocal<>();
+
+    private static ISource getSource(int sourceType) {
+        ImplementationSwitch<ISource> current = sources.get();
+        if (current == null) {
+            current = new 
ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getSourceEngines(), 
ISource.class);
+            sources.set(current);
+        }
+        return current.get(sourceType);
     }
-    
+
     public static ISource getDefaultSource() {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        return sources.get(config.getDefaultSource());
+        return getSource(KylinConfig.getInstanceFromEnv().getDefaultSource());
     }
 
     public static ISource getSource(ISourceAware aware) {
-        return sources.get(aware.getSourceType());
+        return getSource(aware.getSourceType());
     }
 
     public static IReadableTable createReadableTable(TableDesc table) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java 
b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index e7dd53c..79b93fe 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.storage;
 
-import java.util.Map;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.metadata.model.IStorageAware;
@@ -29,14 +27,16 @@ import org.apache.kylin.metadata.realization.IRealization;
  */
 public class StorageFactory {
 
-    private static ImplementationSwitch<IStorage> storages;
-    static {
-        Map<Integer, String> impls = 
KylinConfig.getInstanceFromEnv().getStorageEngines();
-        storages = new ImplementationSwitch<IStorage>(impls, IStorage.class);
-    }
+    // Use thread-local because KylinConfig can be thread-local and 
implementation might be different among multiple threads.
+    private static ThreadLocal<ImplementationSwitch<IStorage>> storages = new 
ThreadLocal<>();
 
     public static IStorage storage(IStorageAware aware) {
-        return storages.get(aware.getStorageType());
+        ImplementationSwitch<IStorage> current = storages.get();
+        if (storages.get() == null) {
+            current = new 
ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getStorageEngines(), 
IStorage.class);
+            storages.set(current);
+        }
+        return current.get(aware.getStorageType());
     }
 
     public static IStorageQuery createQuery(IRealization realization) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d659bade/core-storage/src/test/java/org/apache/kylin/storage/StorageFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/test/java/org/apache/kylin/storage/StorageFactoryTest.java 
b/core-storage/src/test/java/org/apache/kylin/storage/StorageFactoryTest.java
new file mode 100644
index 0000000..c6ba032
--- /dev/null
+++ 
b/core-storage/src/test/java/org/apache/kylin/storage/StorageFactoryTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import java.util.Properties;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StorageFactoryTest {
+    @Before
+    public void setUp() throws Exception {
+        Properties props = new Properties();
+        props.setProperty("kylin.storage.provider.0", 
MockupStorageEngine.class.getName());
+
+        KylinConfig.setKylinConfigInEnvIfMissing(props);
+    }
+
+    @Test
+    public void testSingleThread() {
+        IStorage s1 = StorageFactory.storage(new MockupStorageAware());
+        IStorage s2 = StorageFactory.storage(new MockupStorageAware());
+
+        Assert.assertSame(s1, s2);
+    }
+
+    @Test
+    public void testMultipleThread() throws InterruptedException {
+        final IStorage[] s = new IStorage[2];
+
+        // thread 1
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                s[0] = StorageFactory.storage(new MockupStorageAware());
+            }
+        });
+        t.start();
+        t.join();
+
+        // thread 2
+        t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                s[1] = StorageFactory.storage(new MockupStorageAware());
+            }
+        });
+        t.start();
+        t.join();
+
+        Assert.assertNotSame(s[0], s[1]);
+    }
+
+    class MockupStorageAware implements IStorageAware {
+        @Override
+        public int getStorageType() {
+            return 0;
+        }
+    }
+
+    public static class MockupStorageEngine implements IStorage {
+
+        @Override
+        public IStorageQuery createQuery(IRealization realization) {
+            return null;
+        }
+
+        @Override
+        public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+            return null;
+        }
+    }
+}

Reply via email to