http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java new file mode 100644 index 0000000..d0aa0ed --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java @@ -0,0 +1,729 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This file is licensed to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package org.apache.kylin.rest.security; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.TreeMap; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +/** + * MockHTable. + * + * original MockHTable (by agaoglu) : https://gist.github.com/agaoglu/613217#file_mock_h_table.java + * + * Modifications + * + * <ul> + * <li>fix filter (by k-mack) : https://gist.github.com/k-mack/4600133</li> + * <li>fix batch() : implement all mutate operation and fix result[] count.</li> + * <li>fix exists()</li> + * <li>fix increment() : wrong return value</li> + * <li>check columnFamily</li> + * <li>implement mutateRow()</li> + * <li>implement getTableName()</li> + * <li>implement getTableDescriptor()</li> + * <li>throws RuntimeException when unimplemented method was called.</li> + * <li>remove some methods for loading data, checking values ...</li> + * </ul> + */ +public class MockHTable implements HTableInterface { + private final String tableName; + private final List<String> columnFamilies = new ArrayList<>(); + + private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data = new TreeMap<>(Bytes.BYTES_COMPARATOR); + + private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) { + return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions); + } + + public MockHTable(String tableName) { + this.tableName = tableName; + } + + public MockHTable(String tableName, String... columnFamilies) { + this.tableName = tableName; + this.columnFamilies.addAll(Arrays.asList(columnFamilies)); + } + + public void addColumnFamily(String columnFamily) { + this.columnFamilies.add(columnFamily); + } + + /** + * {@inheritDoc} + */ + @Override + public byte[] getTableName() { + return tableName.getBytes(); + } + + @Override + public TableName getName() { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getConfiguration() { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + /** + * {@inheritDoc} + */ + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + HTableDescriptor table = new HTableDescriptor(tableName); + for (String columnFamily : columnFamilies) { + table.addFamily(new HColumnDescriptor(columnFamily)); + } + return table; + } + + /** + * {@inheritDoc} + */ + @Override + public void mutateRow(RowMutations rm) throws IOException { + // currently only support Put and Delete + for (Mutation mutation : rm.getMutations()) { + if (mutation instanceof Put) { + put((Put) mutation); + } else if (mutation instanceof Delete) { + delete((Delete) mutation); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public Result append(Append append) throws IOException { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) { + List<KeyValue> ret = new ArrayList<KeyValue>(); + for (byte[] family : rowdata.keySet()) + for (byte[] qualifier : rowdata.get(family).keySet()) { + int versionsAdded = 0; + for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) { + if (versionsAdded++ == maxVersions) + break; + Long timestamp = tsToVal.getKey(); + if (timestamp < timestampStart) + continue; + if (timestamp > timestampEnd) + continue; + byte[] value = tsToVal.getValue(); + ret.add(new KeyValue(row, family, qualifier, timestamp, value)); + } + } + return ret; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean exists(Get get) throws IOException { + Result result = get(get); + return result != null && result.isEmpty() == false; + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + return new Boolean[0]; + } + + /** + * {@inheritDoc} + */ + @Override + public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { + results = batch(actions); + } + + /** + * {@inheritDoc} + */ + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { + Object[] results = new Object[actions.size()]; // same size. + for (int i = 0; i < actions.size(); i++) { + Row r = actions.get(i); + if (r instanceof Delete) { + delete((Delete) r); + results[i] = new Result(); + } + if (r instanceof Put) { + put((Put) r); + results[i] = new Result(); + } + if (r instanceof Get) { + Result result = get((Get) r); + results[i] = result; + } + if (r instanceof Increment) { + Result result = increment((Increment) r); + results[i] = result; + } + if (r instanceof Append) { + Result result = append((Append) r); + results[i] = result; + } + } + return results; + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { + + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException { + return new Object[0]; + } + + /** + * {@inheritDoc} + */ + @Override + public Result get(Get get) throws IOException { + if (!data.containsKey(get.getRow())) + return new Result(); + byte[] row = get.getRow(); + List<KeyValue> kvs = new ArrayList<KeyValue>(); + if (!get.hasFamilies()) { + kvs = toKeyValue(row, data.get(row), get.getMaxVersions()); + } else { + for (byte[] family : get.getFamilyMap().keySet()) { + if (data.get(row).get(family) == null) + continue; + NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family); + if (qualifiers == null || qualifiers.isEmpty()) + qualifiers = data.get(row).get(family).navigableKeySet(); + for (byte[] qualifier : qualifiers) { + if (qualifier == null) + qualifier = "".getBytes(); + if (!data.get(row).containsKey(family) || !data.get(row).get(family).containsKey(qualifier) || data.get(row).get(family).get(qualifier).isEmpty()) + continue; + Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry(); + kvs.add(new KeyValue(row, family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue())); + } + } + } + Filter filter = get.getFilter(); + if (filter != null) { + kvs = filter(filter, kvs); + } + + return new Result(kvs); + } + + /** + * {@inheritDoc} + */ + @Override + public Result[] get(List<Get> gets) throws IOException { + List<Result> results = new ArrayList<Result>(); + for (Get g : gets) { + results.add(get(g)); + } + return results.toArray(new Result[results.size()]); + } + + /** + * {@inheritDoc} + */ + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + // FIXME: implement + return null; + } + + /** + * {@inheritDoc} + */ + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + final List<Result> ret = new ArrayList<Result>(); + byte[] st = scan.getStartRow(); + byte[] sp = scan.getStopRow(); + Filter filter = scan.getFilter(); + + for (byte[] row : data.keySet()) { + // if row is equal to startRow emit it. When startRow (inclusive) and + // stopRow (exclusive) is the same, it should not be excluded which would + // happen w/o this control. + if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) != 0) { + // if row is before startRow do not emit, pass to next row + if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) > 0) + continue; + // if row is equal to stopRow or after it do not emit, stop iteration + if (sp != null && sp.length > 0 && Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0) + break; + } + + List<KeyValue> kvs = null; + if (!scan.hasFamilies()) { + kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions()); + } else { + kvs = new ArrayList<KeyValue>(); + for (byte[] family : scan.getFamilyMap().keySet()) { + if (data.get(row).get(family) == null) + continue; + NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family); + if (qualifiers == null || qualifiers.isEmpty()) + qualifiers = data.get(row).get(family).navigableKeySet(); + for (byte[] qualifier : qualifiers) { + if (data.get(row).get(family).get(qualifier) == null) + continue; + for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()) { + if (timestamp < scan.getTimeRange().getMin()) + continue; + if (timestamp > scan.getTimeRange().getMax()) + continue; + byte[] value = data.get(row).get(family).get(qualifier).get(timestamp); + kvs.add(new KeyValue(row, family, qualifier, timestamp, value)); + if (kvs.size() == scan.getMaxVersions()) { + break; + } + } + } + } + } + if (filter != null) { + kvs = filter(filter, kvs); + // Check for early out optimization + if (filter.filterAllRemaining()) { + break; + } + } + if (!kvs.isEmpty()) { + ret.add(new Result(kvs)); + } + } + + return new ResultScanner() { + private final Iterator<Result> iterator = ret.iterator(); + + public Iterator<Result> iterator() { + return iterator; + } + + public Result[] next(int nbRows) throws IOException { + ArrayList<Result> resultSets = new ArrayList<Result>(nbRows); + for (int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } + + public Result next() throws IOException { + try { + return iterator().next(); + } catch (NoSuchElementException e) { + return null; + } + } + + public void close() { + } + }; + } + + /** + * Follows the logical flow through the filter methods for a single row. + * + * @param filter HBase filter. + * @param kvs List of a row's KeyValues + * @return List of KeyValues that were not filtered. + */ + private List<KeyValue> filter(Filter filter, List<KeyValue> kvs) throws IOException { + filter.reset(); + + List<KeyValue> tmp = new ArrayList<KeyValue>(kvs.size()); + tmp.addAll(kvs); + + /* + * Note. Filter flow for a single row. Adapted from + * "HBase: The Definitive Guide" (p. 163) by Lars George, 2011. + * See Figure 4-2 on p. 163. + */ + boolean filteredOnRowKey = false; + List<KeyValue> nkvs = new ArrayList<KeyValue>(tmp.size()); + for (KeyValue kv : tmp) { + if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) { + filteredOnRowKey = true; + break; + } + Filter.ReturnCode filterResult = filter.filterKeyValue(kv); + if (filterResult == Filter.ReturnCode.INCLUDE) { + nkvs.add(kv); + } else if (filterResult == Filter.ReturnCode.NEXT_ROW) { + break; + } else if (filterResult == Filter.ReturnCode.NEXT_COL || filterResult == Filter.ReturnCode.SKIP) { + continue; + } + /* + * Ignoring next key hint which is a optimization to reduce file + * system IO + */ + } + if (filter.hasFilterRow() && !filteredOnRowKey) { + filter.filterRow(nkvs); + } + if (filter.filterRow() || filteredOnRowKey) { + nkvs.clear(); + } + tmp = nkvs; + return tmp; + } + + /** + * {@inheritDoc} + */ + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + Scan scan = new Scan(); + scan.addFamily(family); + return getScanner(scan); + } + + /** + * {@inheritDoc} + */ + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + Scan scan = new Scan(); + scan.addColumn(family, qualifier); + return getScanner(scan); + } + + private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject) { + V data = map.get(key); + if (data == null) { + data = newObject; + map.put(key, data); + } + return data; + } + + /** + * {@inheritDoc} + */ + @Override + public void put(Put put) throws IOException { + byte[] row = put.getRow(); + NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR)); + for (byte[] family : put.getFamilyMap().keySet()) { + if (columnFamilies.contains(new String(family)) == false) { + throw new RuntimeException("Not Exists columnFamily : " + new String(family)); + } + NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR)); + for (KeyValue kv : put.getFamilyMap().get(family)) { + kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis())); + byte[] qualifier = kv.getQualifier(); + NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>()); + qualifierData.put(kv.getTimestamp(), kv.getValue()); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void put(List<Put> puts) throws IOException { + for (Put put : puts) { + put(put); + } + + } + + private boolean check(byte[] row, byte[] family, byte[] qualifier, byte[] value) { + if (value == null || value.length == 0) + return !data.containsKey(row) || !data.get(row).containsKey(family) || !data.get(row).get(family).containsKey(qualifier); + else + return data.containsKey(row) && data.get(row).containsKey(family) && data.get(row).get(family).containsKey(qualifier) && !data.get(row).get(family).get(qualifier).isEmpty() && Arrays.equals(data.get(row).get(family).get(qualifier).lastEntry().getValue(), value); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { + if (check(row, family, qualifier, value)) { + put(put); + return true; + } + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public void delete(Delete delete) throws IOException { + byte[] row = delete.getRow(); + if (data.get(row) == null) + return; + if (delete.getFamilyMap().size() == 0) { + data.remove(row); + return; + } + for (byte[] family : delete.getFamilyMap().keySet()) { + if (data.get(row).get(family) == null) + continue; + if (delete.getFamilyMap().get(family).isEmpty()) { + data.get(row).remove(family); + continue; + } + for (KeyValue kv : delete.getFamilyMap().get(family)) { + if (kv.isDeleteFamily()) { + data.get(row).get(kv.getFamily()).clear(); + } else { + data.get(row).get(kv.getFamily()).remove(kv.getQualifier()); + } + } + if (data.get(row).get(family).isEmpty()) { + data.get(row).remove(family); + } + } + if (data.get(row).isEmpty()) { + data.remove(row); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void delete(List<Delete> deletes) throws IOException { + for (Delete delete : deletes) { + delete(delete); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException { + if (check(row, family, qualifier, value)) { + delete(delete); + return true; + } + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public Result increment(Increment increment) throws IOException { + throw new NotImplementedException(); + } + + /** + * {@inheritDoc} + */ + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { + return incrementColumnValue(row, family, qualifier, amount, true); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException { + return 0; + } + + /** + * {@inheritDoc} + */ + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException { + if (check(row, family, qualifier, null)) { + Put put = new Put(row); + put.add(family, qualifier, Bytes.toBytes(amount)); + put(put); + return amount; + } + long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount; + data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), Bytes.toBytes(newValue)); + return newValue; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAutoFlush() { + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public void flushCommits() throws IOException { + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + throw new NotImplementedException(); + + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable { + throw new NotImplementedException(); + + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws ServiceException, Throwable { + throw new NotImplementedException(); + + } + + /** + * {@inheritDoc} + */ + @Override + public void setAutoFlush(boolean autoFlush) { + throw new NotImplementedException(); + + } + + /** + * {@inheritDoc} + */ + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + throw new NotImplementedException(); + + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + throw new NotImplementedException(); + } + + /** + * {@inheritDoc} + */ + @Override + public long getWriteBufferSize() { + throw new NotImplementedException(); + } + + /** + * {@inheritDoc} + */ + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + throw new NotImplementedException(); + + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { + throw new NotImplementedException(); + + } + + @Override + public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws ServiceException, Throwable { + throw new NotImplementedException(); + + } + + //@Override (only since 0.98.8) + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException { + throw new NotImplementedException(); + + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/security/PasswordPlaceholderConfigurer.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/PasswordPlaceholderConfigurer.java b/server-base/src/main/java/org/apache/kylin/rest/security/PasswordPlaceholderConfigurer.java new file mode 100644 index 0000000..2692da9 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/security/PasswordPlaceholderConfigurer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.security; + +import java.util.Properties; + +import javax.crypto.Cipher; +import javax.crypto.spec.SecretKeySpec; + +import org.apache.commons.codec.binary.Base64; +import org.apache.kylin.common.KylinConfig; +import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer; +import org.springframework.core.io.InputStreamResource; +import org.springframework.core.io.Resource; +import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; + +/** + * @author xduo + * + */ +public class PasswordPlaceholderConfigurer extends PropertyPlaceholderConfigurer { + + private static byte[] key = { 0x74, 0x68, 0x69, 0x73, 0x49, 0x73, 0x41, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x4b, 0x65, 0x79 }; + + public PasswordPlaceholderConfigurer() { + Resource[] resources = new Resource[1]; + resources[0] = new InputStreamResource(KylinConfig.getKylinPropertiesAsInputStream()); + this.setLocations(resources); + } + + public static String encrypt(String strToEncrypt) { + try { + Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); + final SecretKeySpec secretKey = new SecretKeySpec(key, "AES"); + cipher.init(Cipher.ENCRYPT_MODE, secretKey); + final String encryptedString = Base64.encodeBase64String(cipher.doFinal(strToEncrypt.getBytes())); + return encryptedString; + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static String decrypt(String strToDecrypt) { + try { + Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5PADDING"); + final SecretKeySpec secretKey = new SecretKeySpec(key, "AES"); + cipher.init(Cipher.DECRYPT_MODE, secretKey); + final String decryptedString = new String(cipher.doFinal(Base64.decodeBase64(strToDecrypt))); + return decryptedString; + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + protected String resolvePlaceholder(String placeholder, Properties props) { + if (placeholder.toLowerCase().contains("password")) { + return decrypt(props.getProperty(placeholder)); + } else { + return props.getProperty(placeholder); + } + } + + private static void printUsage() { + System.out.println("Usage: java org.apache.kylin.rest.security.PasswordPlaceholderConfigurer <EncryptMethod> <your_password>"); + System.out.println("EncryptMethod: AES or BCrypt"); + } + + public static void main(String[] args) { + if (args.length != 2) { + printUsage(); + System.exit(1); + } + + String encryptMethod = args[0]; + String passwordTxt = args[1]; + if ("AES".equalsIgnoreCase(encryptMethod)) { + // for encrypt password like LDAP password + System.out.println(encryptMethod + " encrypted password is: "); + System.out.println(encrypt(passwordTxt)); + } else if ("BCrypt".equalsIgnoreCase(encryptMethod)) { + // for encrypt the predefined user password, like ADMIN, MODELER. + BCryptPasswordEncoder bCryptPasswordEncoder = new BCryptPasswordEncoder(); + System.out.println(encryptMethod + " encrypted password is: "); + System.out.println(bCryptPasswordEncoder.encode(passwordTxt)); + } else { + printUsage(); + System.exit(1); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java new file mode 100644 index 0000000..27d9720 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.security; + +import java.io.IOException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.rest.service.AclService; +import org.apache.kylin.rest.service.UserService; +import org.apache.kylin.storage.hbase.HBaseConnection; + +/** + */ +public class RealAclHBaseStorage implements AclHBaseStorage { + + private String hbaseUrl; + private String aclTableName; + private String userTableName; + + @Override + public String prepareHBaseTable(Class<?> clazz) throws IOException { + String metadataUrl = KylinConfig.getInstanceFromEnv().getMetadataUrl(); + int cut = metadataUrl.indexOf('@'); + String tableNameBase = cut < 0 ? DEFAULT_TABLE_PREFIX : metadataUrl.substring(0, cut); + hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); + + if (clazz == AclService.class) { + aclTableName = tableNameBase + ACL_TABLE_NAME; + HBaseConnection.createHTableIfNeeded(hbaseUrl, aclTableName, ACL_INFO_FAMILY, ACL_ACES_FAMILY); + return aclTableName; + } else if (clazz == UserService.class) { + userTableName = tableNameBase + USER_TABLE_NAME; + HBaseConnection.createHTableIfNeeded(hbaseUrl, userTableName, USER_AUTHORITY_FAMILY); + return userTableName; + } else { + throw new IllegalStateException("prepareHBaseTable for unknown class: " + clazz); + } + } + + @Override + public HTableInterface getTable(String tableName) throws IOException { + if (StringUtils.equals(tableName, aclTableName)) { + return HBaseConnection.get(hbaseUrl).getTable(aclTableName); + } else if (StringUtils.equals(tableName, userTableName)) { + return HBaseConnection.get(hbaseUrl).getTable(userTableName); + } else { + throw new IllegalStateException("getTable failed" + tableName); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/security/SAMLUserDetailsService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/SAMLUserDetailsService.java b/server-base/src/main/java/org/apache/kylin/rest/security/SAMLUserDetailsService.java new file mode 100644 index 0000000..24f8243 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/security/SAMLUserDetailsService.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.security; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.security.core.userdetails.UserDetails; +import org.springframework.security.core.userdetails.UsernameNotFoundException; +import org.springframework.security.ldap.userdetails.LdapUserDetailsService; +import org.springframework.security.saml.SAMLCredential; + +/** + * An implementation of SAMLUserDetailsService by delegating the query to LdapUserDetailsService. + */ +public class SAMLUserDetailsService implements org.springframework.security.saml.userdetails.SAMLUserDetailsService { + + private static final Logger logger = LoggerFactory.getLogger(SAMLUserDetailsService.class); + private LdapUserDetailsService ldapUserDetailsService; + + public SAMLUserDetailsService(LdapUserDetailsService ldapUserDetailsService) { + this.ldapUserDetailsService = ldapUserDetailsService; + } + + @Override + public Object loadUserBySAML(SAMLCredential samlCredential) throws UsernameNotFoundException { + final String userEmail = samlCredential.getAttributeAsString("email"); + logger.debug("samlCredential.email:" + userEmail); + final String userName = userEmail.substring(0, userEmail.indexOf("@")); + + UserDetails userDetails = null; + try { + userDetails = ldapUserDetailsService.loadUserByUsername(userName); + } catch (org.springframework.security.core.userdetails.UsernameNotFoundException e) { + logger.error("User not found in LDAP, check whether he/she has been added to the groups.", e); + } + logger.debug("userDeail by search ldap with '" + userName + "' is: " + userDetails); + return userDetails; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/security/UnauthorisedEntryPoint.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/UnauthorisedEntryPoint.java b/server-base/src/main/java/org/apache/kylin/rest/security/UnauthorisedEntryPoint.java new file mode 100644 index 0000000..253bae3 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/security/UnauthorisedEntryPoint.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.security; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.springframework.security.core.AuthenticationException; +import org.springframework.security.web.AuthenticationEntryPoint; +import org.springframework.stereotype.Component; + +/** + * Just return 401-unauthorized for every unauthorized request. The client side + * catches this and handles login. + * + * @author xduo + */ +@Component(value = "unauthorisedEntryPoint") +public class UnauthorisedEntryPoint implements AuthenticationEntryPoint { + + public void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException exception) throws IOException, ServletException { + response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Unauthorized"); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/AccessService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AccessService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AccessService.java new file mode 100644 index 0000000..9561fbd --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/AccessService.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.kylin.common.persistence.AclEntity; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.ForbiddenException; +import org.apache.kylin.rest.response.AccessEntryResponse; +import org.apache.kylin.rest.security.AclEntityFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.security.acls.domain.BasePermission; +import org.springframework.security.acls.domain.GrantedAuthoritySid; +import org.springframework.security.acls.domain.ObjectIdentityImpl; +import org.springframework.security.acls.domain.PrincipalSid; +import org.springframework.security.acls.model.AccessControlEntry; +import org.springframework.security.acls.model.Acl; +import org.springframework.security.acls.model.AlreadyExistsException; +import org.springframework.security.acls.model.MutableAcl; +import org.springframework.security.acls.model.NotFoundException; +import org.springframework.security.acls.model.ObjectIdentity; +import org.springframework.security.acls.model.Permission; +import org.springframework.security.acls.model.Sid; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.Assert; + +/** + * @author xduo + * + */ +@Component("accessService") +public class AccessService { + + @Autowired + private AclService aclService; + + @Autowired + UserService userService; + + // ~ Methods to manage acl life circle of domain objects ~ + + @Transactional + public Acl init(AclEntity ae, Permission initPermission) { + Acl acl = null; + ObjectIdentity objectIdentity = new ObjectIdentityImpl(ae.getClass(), ae.getId()); + + try { + // Create acl record for secured domain object. + acl = aclService.createAcl(objectIdentity); + } catch (AlreadyExistsException e) { + acl = (MutableAcl) aclService.readAclById(objectIdentity); + } + + if (null != initPermission) { + Authentication auth = SecurityContextHolder.getContext().getAuthentication(); + PrincipalSid sid = new PrincipalSid(auth); + acl = grant(ae, initPermission, sid); + } + + return acl; + } + + @Transactional + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#ae, 'ADMINISTRATION')") + public Acl grant(AclEntity ae, Permission permission, Sid sid) { + Assert.notNull(ae, "Acl domain object required"); + Assert.notNull(permission, "Acl permission required"); + Assert.notNull(sid, "Sid required"); + + ObjectIdentity objectIdentity = new ObjectIdentityImpl(ae.getClass(), ae.getId()); + MutableAcl acl = null; + + try { + acl = (MutableAcl) aclService.readAclById(objectIdentity); + } catch (NotFoundException e) { + acl = (MutableAcl) init(ae, null); + } + + int indexOfAce = -1; + for (int i = 0; i < acl.getEntries().size(); i++) { + AccessControlEntry ace = acl.getEntries().get(i); + + if (ace.getSid().equals(sid)) { + indexOfAce = i; + } + } + + if (indexOfAce != -1) { + secureOwner(acl, indexOfAce); + acl.updateAce(indexOfAce, permission); + } else { + acl.insertAce(acl.getEntries().size(), permission, sid, true); + } + + acl = aclService.updateAcl(acl); + + return acl; + } + + @Transactional + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#ae, 'ADMINISTRATION')") + public Acl update(AclEntity ae, Long accessEntryId, Permission newPermission) { + Assert.notNull(ae, "Acl domain object required"); + Assert.notNull(accessEntryId, "Ace id required"); + Assert.notNull(newPermission, "Acl permission required"); + + ObjectIdentity objectIdentity = new ObjectIdentityImpl(ae.getClass(), ae.getId()); + MutableAcl acl = (MutableAcl) aclService.readAclById(objectIdentity); + + int indexOfAce = -1; + for (int i = 0; i < acl.getEntries().size(); i++) { + AccessControlEntry ace = acl.getEntries().get(i); + if (ace.getId().equals(accessEntryId)) { + indexOfAce = i; + break; + } + } + + if (indexOfAce != -1) { + secureOwner(acl, indexOfAce); + + try { + acl.updateAce(indexOfAce, newPermission); + acl = aclService.updateAcl(acl); + } catch (NotFoundException e) { + //do nothing? + } + } + + return acl; + } + + @Transactional + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#ae, 'ADMINISTRATION')") + public Acl revoke(AclEntity ae, Long accessEntryId) { + Assert.notNull(ae, "Acl domain object required"); + Assert.notNull(accessEntryId, "Ace id required"); + + ObjectIdentity objectIdentity = new ObjectIdentityImpl(ae.getClass(), ae.getId()); + MutableAcl acl = (MutableAcl) aclService.readAclById(objectIdentity); + int indexOfAce = -1; + + for (int i = 0; i < acl.getEntries().size(); i++) { + AccessControlEntry ace = acl.getEntries().get(i); + if (((Long) ace.getId()).equals(accessEntryId)) { + indexOfAce = i; + break; + } + } + + if (indexOfAce != -1) { + secureOwner(acl, indexOfAce); + + try { + acl.deleteAce(indexOfAce); + acl = aclService.updateAcl(acl); + } catch (NotFoundException e) { + //do nothing? + } + } + + return acl; + } + + @Transactional + public void inherit(AclEntity ae, AclEntity parentAe) { + Assert.notNull(ae, "Acl domain object required"); + Assert.notNull(parentAe, "Parent acl required"); + + ObjectIdentity objectIdentity = new ObjectIdentityImpl(ae.getClass(), ae.getId()); + MutableAcl acl = null; + try { + acl = (MutableAcl) aclService.readAclById(objectIdentity); + } catch (NotFoundException e) { + acl = (MutableAcl) init(ae, null); + } + + ObjectIdentity parentObjectIdentity = new ObjectIdentityImpl(parentAe.getClass(), parentAe.getId()); + MutableAcl parentAcl = null; + try { + parentAcl = (MutableAcl) aclService.readAclById(parentObjectIdentity); + } catch (NotFoundException e) { + parentAcl = (MutableAcl) init(parentAe, null); + } + + if (null == acl || null == parentAcl) { + return; + } + + acl.setEntriesInheriting(true); + acl.setParent(parentAcl); + aclService.updateAcl(acl); + } + + @Transactional + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#ae, 'ADMINISTRATION')") + public void clean(AclEntity ae, boolean deleteChildren) { + Assert.notNull(ae, "Acl domain object required"); + + // For those may have null uuid, like DataModel, won't delete Acl. + if (ae.getId() == null) + return; + + ObjectIdentity objectIdentity = new ObjectIdentityImpl(ae.getClass(), ae.getId()); + + try { + aclService.deleteAcl(objectIdentity, deleteChildren); + } catch (NotFoundException e) { + //do nothing? + } + } + + // ~ Methods to get acl info of domain objects ~ + + public RootPersistentEntity getAclEntity(String entityType, String uuid) { + if (null == uuid) { + return null; + } + + return AclEntityFactory.createAclEntity(entityType, uuid); + } + + public Acl getAcl(AclEntity ae) { + if (null == ae) { + return null; + } + ObjectIdentity objectIdentity = new ObjectIdentityImpl(ae.getClass(), ae.getId()); + Acl acl = null; + + try { + acl = (MutableAcl) aclService.readAclById(objectIdentity); + } catch (NotFoundException e) { + //do nothing? + } + + return acl; + } + + public Sid getSid(String sid, boolean isPrincepal) { + if (isPrincepal) { + return new PrincipalSid(sid); + } else { + return new GrantedAuthoritySid(sid); + } + } + + public List<AccessEntryResponse> generateAceResponses(Acl acl) { + if (null == acl) { + return Collections.emptyList(); + } + List<AccessEntryResponse> accessControlEntities = new ArrayList<AccessEntryResponse>(); + + // Cause there is a circle reference in AccessControlEntry, it needs to + // set acl to null as a workaround. + for (AccessControlEntry ace : acl.getEntries()) { + accessControlEntities.add(new AccessEntryResponse(ace.getId(), ace.getSid(), ace.getPermission(), ace.isGranting())); + } + + return accessControlEntities; + } + + /** + * Protect admin permission granted to acl owner. + * + * @param acl + * @param indexOfAce + */ + private void secureOwner(MutableAcl acl, int indexOfAce) { + // Can't revoke admin permission from domain object owner + if (acl.getOwner().equals(acl.getEntries().get(indexOfAce).getSid()) && BasePermission.ADMINISTRATION.equals(acl.getEntries().get(indexOfAce).getPermission())) { + throw new ForbiddenException("Can't revoke admin permission of owner."); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java new file mode 100644 index 0000000..d693a67 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import javax.annotation.PostConstruct; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.rest.security.AclHBaseStorage; +import org.apache.kylin.rest.util.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.acls.domain.AccessControlEntryImpl; +import org.springframework.security.acls.domain.AclAuthorizationStrategy; +import org.springframework.security.acls.domain.AclImpl; +import org.springframework.security.acls.domain.AuditLogger; +import org.springframework.security.acls.domain.GrantedAuthoritySid; +import org.springframework.security.acls.domain.ObjectIdentityImpl; +import org.springframework.security.acls.domain.PermissionFactory; +import org.springframework.security.acls.domain.PrincipalSid; +import org.springframework.security.acls.model.AccessControlEntry; +import org.springframework.security.acls.model.Acl; +import org.springframework.security.acls.model.AlreadyExistsException; +import org.springframework.security.acls.model.ChildrenExistException; +import org.springframework.security.acls.model.MutableAcl; +import org.springframework.security.acls.model.MutableAclService; +import org.springframework.security.acls.model.NotFoundException; +import org.springframework.security.acls.model.ObjectIdentity; +import org.springframework.security.acls.model.PermissionGrantingStrategy; +import org.springframework.security.acls.model.Sid; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.security.util.FieldUtils; +import org.springframework.stereotype.Component; +import org.springframework.util.Assert; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; + +/** + * @author xduo + * + */ +@Component("aclService") +public class AclService implements MutableAclService { + + private static final Logger logger = LoggerFactory.getLogger(AclService.class); + + private static String ACL_INFO_FAMILY_TYPE_COLUMN = "t"; + private static String ACL_INFO_FAMILY_OWNER_COLUMN = "o"; + private static String ACL_INFO_FAMILY_PARENT_COLUMN = "p"; + private static String ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN = "i"; + + private Serializer<SidInfo> sidSerializer = new Serializer<SidInfo>(SidInfo.class); + private Serializer<DomainObjectInfo> domainObjSerializer = new Serializer<DomainObjectInfo>(DomainObjectInfo.class); + private Serializer<AceInfo> aceSerializer = new Serializer<AceInfo>(AceInfo.class); + + private String aclTableName = null; + + private final Field fieldAces = FieldUtils.getField(AclImpl.class, "aces"); + private final Field fieldAcl = FieldUtils.getField(AccessControlEntryImpl.class, "acl"); + + @Autowired + protected PermissionGrantingStrategy permissionGrantingStrategy; + + @Autowired + protected PermissionFactory aclPermissionFactory; + + @Autowired + protected AclAuthorizationStrategy aclAuthorizationStrategy; + + @Autowired + protected AuditLogger auditLogger; + + @Autowired + protected AclHBaseStorage aclHBaseStorage; + + public AclService() throws IOException { + fieldAces.setAccessible(true); + fieldAcl.setAccessible(true); + } + + @PostConstruct + public void init() throws IOException { + aclTableName = aclHBaseStorage.prepareHBaseTable(AclService.class); + } + + @Override + public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) { + List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>(); + HTableInterface htable = null; + try { + htable = aclHBaseStorage.getTable(aclTableName); + + Scan scan = new Scan(); + SingleColumnValueFilter parentFilter = new SingleColumnValueFilter(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), CompareOp.EQUAL, domainObjSerializer.serialize(new DomainObjectInfo(parentIdentity))); + parentFilter.setFilterIfMissing(true); + scan.setFilter(parentFilter); + + ResultScanner scanner = htable.getScanner(scan); + for (Result result = scanner.next(); result != null; result = scanner.next()) { + String id = Bytes.toString(result.getRow()); + String type = Bytes.toString(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN))); + + oids.add(new ObjectIdentityImpl(type, id)); + } + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + IOUtils.closeQuietly(htable); + } + + return oids; + } + + @Override + public Acl readAclById(ObjectIdentity object) throws NotFoundException { + Map<ObjectIdentity, Acl> aclsMap = readAclsById(Arrays.asList(object), null); + // Assert.isTrue(aclsMap.containsKey(object), "There should have been an Acl entry for ObjectIdentity " + object); + + return aclsMap.get(object); + } + + @Override + public Acl readAclById(ObjectIdentity object, List<Sid> sids) throws NotFoundException { + Map<ObjectIdentity, Acl> aclsMap = readAclsById(Arrays.asList(object), sids); + Assert.isTrue(aclsMap.containsKey(object), "There should have been an Acl entry for ObjectIdentity " + object); + + return aclsMap.get(object); + } + + @Override + public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> objects) throws NotFoundException { + return readAclsById(objects, null); + } + + @Override + public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException { + Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>(); + HTableInterface htable = null; + Result result = null; + try { + htable = aclHBaseStorage.getTable(aclTableName); + + for (ObjectIdentity oid : oids) { + result = htable.get(new Get(Bytes.toBytes(String.valueOf(oid.getIdentifier())))); + + if (null != result && !result.isEmpty()) { + SidInfo owner = sidSerializer.deserialize(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN))); + Sid ownerSid = (null == owner) ? null : (owner.isPrincipal() ? new PrincipalSid(owner.getSid()) : new GrantedAuthoritySid(owner.getSid())); + boolean entriesInheriting = Bytes.toBoolean(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN))); + + Acl parentAcl = null; + DomainObjectInfo parentInfo = domainObjSerializer.deserialize(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN))); + if (null != parentInfo) { + ObjectIdentity parentObj = new ObjectIdentityImpl(parentInfo.getType(), parentInfo.getId()); + parentAcl = readAclById(parentObj, null); + } + + AclImpl acl = new AclImpl(oid, oid.getIdentifier(), aclAuthorizationStrategy, permissionGrantingStrategy, parentAcl, null, entriesInheriting, ownerSid); + genAces(sids, result, acl); + + aclMaps.put(oid, acl); + } else { + throw new NotFoundException("Unable to find ACL information for object identity '" + oid + "'"); + } + } + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + IOUtils.closeQuietly(htable); + } + + return aclMaps; + } + + @Override + public MutableAcl createAcl(ObjectIdentity objectIdentity) throws AlreadyExistsException { + Acl acl = null; + + try { + acl = readAclById(objectIdentity); + } catch (NotFoundException e) { + //do nothing? + } + if (null != acl) { + throw new AlreadyExistsException("ACL of " + objectIdentity + " exists!"); + } + + Authentication auth = SecurityContextHolder.getContext().getAuthentication(); + PrincipalSid sid = new PrincipalSid(auth); + + HTableInterface htable = null; + try { + htable = aclHBaseStorage.getTable(aclTableName); + + Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier()))); + put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType())); + put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid))); + put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true)); + + htable.put(put); + htable.flushCommits(); + + logger.debug("ACL of " + objectIdentity + " created successfully."); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + IOUtils.closeQuietly(htable); + } + + return (MutableAcl) readAclById(objectIdentity); + } + + @Override + public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException { + HTableInterface htable = null; + try { + htable = aclHBaseStorage.getTable(aclTableName); + + Delete delete = new Delete(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier()))); + + List<ObjectIdentity> children = findChildren(objectIdentity); + if (!deleteChildren && children.size() > 0) { + throw new ChildrenExistException("Children exists for " + objectIdentity); + } + + for (ObjectIdentity oid : children) { + deleteAcl(oid, deleteChildren); + } + + htable.delete(delete); + htable.flushCommits(); + + logger.debug("ACL of " + objectIdentity + " deleted successfully."); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + IOUtils.closeQuietly(htable); + } + } + + @Override + public MutableAcl updateAcl(MutableAcl acl) throws NotFoundException { + try { + readAclById(acl.getObjectIdentity()); + } catch (NotFoundException e) { + throw e; + } + + HTableInterface htable = null; + try { + htable = aclHBaseStorage.getTable(aclTableName); + + Delete delete = new Delete(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier()))); + delete.deleteFamily(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY)); + htable.delete(delete); + + Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier()))); + + if (null != acl.getParentAcl()) { + put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity()))); + } + + for (AccessControlEntry ace : acl.getEntries()) { + AceInfo aceInfo = new AceInfo(ace); + put.add(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo)); + } + + if (!put.isEmpty()) { + htable.put(put); + htable.flushCommits(); + + logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully."); + } + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + IOUtils.closeQuietly(htable); + } + + return (MutableAcl) readAclById(acl.getObjectIdentity()); + } + + private void genAces(List<Sid> sids, Result result, AclImpl acl) throws JsonParseException, JsonMappingException, IOException { + List<AceInfo> aceInfos = new ArrayList<AceInfo>(); + if (null != sids) { + // Just return aces in sids + for (Sid sid : sids) { + String sidName = null; + if (sid instanceof PrincipalSid) { + sidName = ((PrincipalSid) sid).getPrincipal(); + } else if (sid instanceof GrantedAuthoritySid) { + sidName = ((GrantedAuthoritySid) sid).getGrantedAuthority(); + } + + AceInfo aceInfo = aceSerializer.deserialize(result.getValue(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(sidName))); + if (null != aceInfo) { + aceInfos.add(aceInfo); + } + } + } else { + NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY)); + for (byte[] qualifier : familyMap.keySet()) { + AceInfo aceInfo = aceSerializer.deserialize(familyMap.get(qualifier)); + + if (null != aceInfo) { + aceInfos.add(aceInfo); + } + } + } + + List<AccessControlEntry> newAces = new ArrayList<AccessControlEntry>(); + for (int i = 0; i < aceInfos.size(); i++) { + AceInfo aceInfo = aceInfos.get(i); + + if (null != aceInfo) { + Sid sid = aceInfo.getSidInfo().isPrincipal() ? new PrincipalSid(aceInfo.getSidInfo().getSid()) : new GrantedAuthoritySid(aceInfo.getSidInfo().getSid()); + AccessControlEntry ace = new AccessControlEntryImpl(Long.valueOf(i), acl, sid, aclPermissionFactory.buildFromMask(aceInfo.getPermissionMask()), true, false, false); + newAces.add(ace); + } + } + + this.setAces(acl, newAces); + } + + private void setAces(AclImpl acl, List<AccessControlEntry> aces) { + try { + fieldAces.set(acl, aces); + } catch (IllegalAccessException e) { + throw new IllegalStateException("Could not set AclImpl entries", e); + } + } + + protected static class DomainObjectInfo { + private String id; + private String type; + + public DomainObjectInfo() { + } + + public DomainObjectInfo(ObjectIdentity oid) { + super(); + this.id = (String) oid.getIdentifier(); + this.type = oid.getType(); + } + + public Serializable getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + } + + protected static class SidInfo { + private String sid; + private boolean isPrincipal; + + public SidInfo() { + } + + public SidInfo(Sid sid) { + if (sid instanceof PrincipalSid) { + this.sid = ((PrincipalSid) sid).getPrincipal(); + this.isPrincipal = true; + } else if (sid instanceof GrantedAuthoritySid) { + this.sid = ((GrantedAuthoritySid) sid).getGrantedAuthority(); + this.isPrincipal = false; + } + } + + public String getSid() { + return sid; + } + + public void setSid(String sid) { + this.sid = sid; + } + + public boolean isPrincipal() { + return isPrincipal; + } + + public void setPrincipal(boolean isPrincipal) { + this.isPrincipal = isPrincipal; + } + } + + protected static class AceInfo { + private SidInfo sidInfo; + private int permissionMask; + + public AceInfo() { + } + + public AceInfo(AccessControlEntry ace) { + super(); + this.sidInfo = new SidInfo(ace.getSid()); + this.permissionMask = ace.getPermission().getMask(); + } + + public SidInfo getSidInfo() { + return sidInfo; + } + + public void setSidInfo(SidInfo sidInfo) { + this.sidInfo = sidInfo; + } + + public int getPermissionMask() { + return permissionMask; + } + + public void setPermissionMask(int permissionMask) { + this.permissionMask = permissionMask; + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java new file mode 100644 index 0000000..6c85898 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.storage.hbase.util.StorageCleanupJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.stereotype.Component; + +/** + * @author jianliu + */ +@Component("adminService") +public class AdminService extends BasicService { + private static final Logger logger = LoggerFactory.getLogger(AdminService.class); + + /** + * Get Java Env info as string + * + * @return + */ + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public String getEnv() { + logger.debug("Get Kylin Runtime environment"); + PropertiesConfiguration tempConfig = new PropertiesConfiguration(); + + // Add Java Env + + try { + String content = ""; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + // env + Map<String, String> env = System.getenv(); + + for (Map.Entry<String, String> entry : env.entrySet()) { + tempConfig.addProperty(entry.getKey(), entry.getValue()); + } + + // properties + Properties proterties = System.getProperties(); + + for (Map.Entry<Object, Object> entry : proterties.entrySet()) { + tempConfig.setProperty((String) entry.getKey(), entry.getValue()); + } + + // do save + tempConfig.save(baos); + content = baos.toString(); + return content; + } catch (ConfigurationException e) { + throw new InternalErrorException("Failed to get Kylin env Config", e); + } + } + + /** + * Get Java config info as String + * + * @return + */ + // @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public String getConfigAsString() { + logger.debug("Get Kylin Runtime Config"); + + try { + return KylinConfig.getInstanceFromEnv().getConfigAsString(); + } catch (IOException e) { + throw new InternalErrorException("Failed to get Kylin Runtime Config", e); + } + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public void cleanupStorage() { + StorageCleanupJob job = new StorageCleanupJob(); + String[] args = new String[] { "-delete", "true" }; + try { + ToolRunner.run(job, args); + } catch (Exception e) { + throw new InternalErrorException(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1a124e68/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java new file mode 100644 index 0000000..79e1d4a --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentMap; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.badquery.BadQueryHistoryManager; +import org.apache.kylin.rest.request.SQLRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class BadQueryDetector extends Thread { + + private static final Logger logger = LoggerFactory.getLogger(BadQueryDetector.class); + + private final ConcurrentMap<Thread, Entry> runningQueries = Maps.newConcurrentMap(); + private final long detectionInterval; + private final int alertMB; + private final int alertRunningSec; + private KylinConfig kylinConfig; + + private ArrayList<Notifier> notifiers = new ArrayList<Notifier>(); + + public BadQueryDetector() { + super("BadQueryDetector"); + this.setDaemon(true); + this.kylinConfig = KylinConfig.getInstanceFromEnv(); + this.detectionInterval = kylinConfig.getBadQueryDefaultDetectIntervalSeconds() * 1000; + this.alertMB = 100; + this.alertRunningSec = kylinConfig.getBadQueryDefaultAlertingSeconds(); + + initNotifiers(); + } + + public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec) { + super("BadQueryDetector"); + this.setDaemon(true); + this.detectionInterval = detectionInterval; + this.alertMB = alertMB; + this.alertRunningSec = alertRunningSec; + this.kylinConfig = KylinConfig.getInstanceFromEnv(); + + initNotifiers(); + } + + private void initNotifiers() { + this.notifiers.add(new LoggerNotifier()); + if (kylinConfig.getBadQueryPersistentEnabled()) { + this.notifiers.add(new PersistenceNotifier()); + } + } + + public void registerNotifier(Notifier notifier) { + notifiers.add(notifier); + } + + private void notify(String adj, float runningSec, long startTime, String project, String sql, Thread t) { + for (Notifier notifier : notifiers) { + try { + notifier.badQueryFound(adj, runningSec, startTime, project, sql, t); + } catch (Exception e) { + logger.error("", e); + } + } + } + + public interface Notifier { + void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t); + } + + private class LoggerNotifier implements Notifier { + @Override + public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t) { + logger.info(adj + " query has been running " + runningSec + " seconds (project:" + project + ", thread: 0x" + Long.toHexString(t.getId()) + ") -- " + sql); + } + } + + private class PersistenceNotifier implements Notifier { + BadQueryHistoryManager badQueryManager = BadQueryHistoryManager.getInstance(kylinConfig); + String serverHostname; + NavigableSet<Pair<Long, String>> cacheQueue = new TreeSet<>(new Comparator<Pair<Long, String>>() { + @Override + public int compare(Pair<Long, String> o1, Pair<Long, String> o2) { + if (o1.equals(o2)) { + return 0; + } else if (o1.getFirst().equals(o2.getFirst())) { + return o2.getSecond().compareTo(o2.getSecond()); + } else { + return (int) (o1.getFirst() - o2.getFirst()); + } + } + }); + + public PersistenceNotifier() { + try { + serverHostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + serverHostname = "Unknow"; + logger.warn("Error in get current hostname.", e); + } + } + + @Override + public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t) { + try { + long cachingSeconds = (kylinConfig.getBadQueryDefaultAlertingSeconds() + 1) * 30; + Pair<Long, String> sqlPair = new Pair<>(startTime, sql); + if (!cacheQueue.contains(sqlPair)) { + badQueryManager.addEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), project); + cacheQueue.add(sqlPair); + while (!cacheQueue.isEmpty() && (System.currentTimeMillis() - cacheQueue.first().getFirst() > cachingSeconds * 1000 || cacheQueue.size() > kylinConfig.getBadQueryHistoryNum() * 3)) { + cacheQueue.pollFirst(); + } + } else { + badQueryManager.updateEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), project); + } + } catch (IOException e) { + logger.error("Error in bad query persistence.", e); + } + } + } + + public void queryStart(Thread thread, SQLRequest sqlRequest) { + runningQueries.put(thread, new Entry(sqlRequest, thread)); + } + + public void queryEnd(Thread thread) { + runningQueries.remove(thread); + } + + private class Entry implements Comparable<Entry> { + final SQLRequest sqlRequest; + final long startTime; + final Thread thread; + + Entry(SQLRequest sqlRequest, Thread thread) { + this.sqlRequest = sqlRequest; + this.startTime = System.currentTimeMillis(); + this.thread = thread; + } + + @Override + public int compareTo(Entry o) { + return (int) (this.startTime - o.startTime); + } + } + + public void run() { + while (true) { + try { + Thread.sleep(detectionInterval); + } catch (InterruptedException e) { + // stop detection and exit + return; + } + + try { + detectBadQuery(); + } catch (Exception ex) { + logger.error("", ex); + } + } + } + + private void detectBadQuery() { + long now = System.currentTimeMillis(); + ArrayList<Entry> entries = new ArrayList<Entry>(runningQueries.values()); + Collections.sort(entries); + + // report if query running long + for (Entry e : entries) { + float runningSec = (float) (now - e.startTime) / 1000; + if (runningSec >= alertRunningSec) { + notify("Slow", runningSec, e.startTime, e.sqlRequest.getProject(), e.sqlRequest.getSql(), e.thread); + dumpStackTrace(e.thread); + } else { + break; // entries are sorted by startTime + } + } + + // report if low memory + if (getSystemAvailMB() < alertMB) { + logger.info("System free memory less than " + alertMB + " MB. " + entries.size() + " queries running."); + } + } + + // log the stack trace of bad query thread for further analysis + private void dumpStackTrace(Thread t) { + int maxStackTraceDepth = kylinConfig.getBadQueryStackTraceDepth(); + int current = 0; + + StackTraceElement[] stackTrace = t.getStackTrace(); + StringBuilder buf = new StringBuilder("Problematic thread 0x" + Long.toHexString(t.getId())); + buf.append("\n"); + for (StackTraceElement e : stackTrace) { + if (++current > maxStackTraceDepth) { + break; + } + buf.append("\t").append("at ").append(e.toString()).append("\n"); + } + logger.info(buf.toString()); + } + + public static final int ONE_MB = 1024 * 1024; + + public static long getSystemAvailBytes() { + Runtime runtime = Runtime.getRuntime(); + long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process + long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free + long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting + long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using + long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used + return availableMemory; + } + + public static int getSystemAvailMB() { + return (int) (getSystemAvailBytes() / ONE_MB); + } + +}
