[4/6] kylin git commit: KYLIN-2072 Cleanup old streaming code
KYLIN-2072 Cleanup old streaming code Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5aee0226 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5aee0226 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5aee0226 Branch: refs/heads/master Commit: 5aee022612c6fa40c41e8c00063714b79b6d5237 Parents: cb2b12b Author: shaofengshiAuthored: Sun Oct 9 13:10:50 2016 +0800 Committer: shaofengshi Committed: Mon Oct 10 13:32:44 2016 +0800 -- assembly/pom.xml| 4 - .../kylin/job/streaming/KafkaDataLoader.java| 79 build/bin/cleanup_streaming_files.sh| 42 -- build/bin/kylin.sh | 61 --- build/bin/streaming_build.sh| 33 -- build/bin/streaming_check.sh| 29 -- build/bin/streaming_fillgap.sh | 26 -- build/bin/streaming_rolllog.sh | 29 -- .../metadata/streaming/StreamingConfig.java | 85 .../metadata/streaming/StreamingManager.java| 248 .../.settings/org.eclipse.core.resources.prefs | 6 - .../.settings/org.eclipse.jdt.core.prefs| 386 --- .../.settings/org.eclipse.jdt.ui.prefs | 7 - engine-streaming/pom.xml| 121 -- .../kylin/engine/streaming/BootstrapConfig.java | 71 .../kylin/engine/streaming/IStreamingInput.java | 30 -- .../engine/streaming/IStreamingOutput.java | 34 -- .../streaming/OneOffStreamingBuilder.java | 71 .../engine/streaming/StreamingBatchBuilder.java | 43 --- .../kylin/engine/streaming/StreamingConfig.java | 85 .../engine/streaming/StreamingManager.java | 248 .../kylin/engine/streaming/cli/MonitorCLI.java | 88 - .../engine/streaming/cli/StreamingCLI.java | 114 -- .../streaming/cube/StreamingCubeBuilder.java| 168 .../diagnose/StreamingLogAnalyzer.java | 96 - .../streaming/monitor/StreamingMonitor.java | 172 - .../engine/streaming/util/StreamingUtils.java | 51 --- .../kylin/provision/BuildCubeWithStream.java| 4 +- pom.xml | 6 - .../rest/controller/StreamingController.java| 2 +- .../kylin/rest/controller/TableController.java | 2 +- .../apache/kylin/rest/service/BasicService.java | 2 +- .../kylin/rest/service/StreamingService.java| 2 +- source-kafka/pom.xml| 6 - .../kafka/ByteBufferBackedInputStream.java | 52 --- .../apache/kylin/source/kafka/KafkaSource.java | 2 +- .../kylin/source/kafka/KafkaStreamingInput.java | 227 --- .../source/kafka/TimedJsonStreamParser.java | 1 + .../kafka/diagnose/KafkaInputAnalyzer.java | 312 --- .../source/kafka/diagnose/KafkaVerify.java | 101 - .../source/kafka/diagnose/TimeHistogram.java| 85 .../kafka/util/ByteBufferBackedInputStream.java | 52 +++ .../kylin/source/kafka/util/KafkaRequester.java | 191 - .../kylin/source/kafka/util/KafkaUtils.java | 173 - .../test/java/TimedJsonStreamParserTest.java| 4 +- storage-hbase/pom.xml | 4 - .../hbase/steps/HBaseStreamingOutput.java | 98 - .../apache/kylin/tool/CubeMetaExtractor.java| 4 +- 48 files changed, 397 insertions(+), 3360 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 0c80afc..e6f83a8 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -47,10 +47,6 @@ org.apache.kylin kylin-engine-mr - -org.apache.kylin -kylin-engine-streaming - http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java -- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java deleted file mode 100644 index 454f6cf..000 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may
[5/6] kylin git commit: KYLIN-2072 further cleanup old streaming code
KYLIN-2072 further cleanup old streaming code Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ed643e6b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ed643e6b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ed643e6b Branch: refs/heads/master Commit: ed643e6b20e31a4c3a45d72dc8e5ff1287584764 Parents: c67fa74 Author: shaofengshiAuthored: Sun Oct 9 22:16:38 2016 +0800 Committer: shaofengshi Committed: Mon Oct 10 13:32:44 2016 +0800 -- .../kylin/common/util/StreamingBatch.java | 44 .../kylin/common/util/StreamingMessage.java | 3 -- .../org/apache/kylin/engine/EngineFactory.java | 14 +-- .../kylin/engine/IStreamingCubingEngine.java| 26 4 files changed, 1 insertion(+), 86 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java -- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java deleted file mode 100644 index e000aa6..000 --- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.common.util; - -import java.util.List; - -/** - */ -public final class StreamingBatch { - -private final List messages; - -private final Pair timeRange; - -public StreamingBatch(List messages, Pair timeRange) { -this.messages = messages; -this.timeRange = timeRange; -} - -public List getMessages() { -return messages; -} - -public Pair getTimeRange() { -return timeRange; -} - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java -- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java index 53ab195..981c8a8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java @@ -18,7 +18,6 @@ package org.apache.kylin.common.util; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -34,8 +33,6 @@ public class StreamingMessage { private Map params; -public static final StreamingMessage EOF = new StreamingMessage(Collections. emptyList(), 0L, 0L, Collections. emptyMap()); - public StreamingMessage(List data, long offset, long timestamp, Map params) { this.data = data; this.offset = offset; http://git-wip-us.apache.org/repos/asf/kylin/blob/ed643e6b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java -- diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java index 7044a3e..acaa7da 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java +++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java @@ -31,23 +31,15 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; public class EngineFactory { private static ImplementationSwitch batchEngines; -private static ImplementationSwitch streamingEngines; static { Map impls = KylinConfig.getInstanceFromEnv().getJobEngines(); -batchEngines = new ImplementationSwitch(impls, IBatchCubingEngine.class); - -impls.clear(); -
[2/6] kylin git commit: KYLIN-2072 Cleanup old streaming code
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java deleted file mode 100644 index 1c579c6..000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.diagnose; - -import java.util.concurrent.atomic.AtomicLong; - -public class TimeHistogram { -private long[] bucketsBoundary; -private AtomicLong[] counters; -private String id; - -private static Object printLock = new Object(); - -/** - * example: [10,20] will generate three buckets: (-â,10), [10,20),[20,+â) - * unit: second - */ -public TimeHistogram(long[] bucketsBoundary, String id) { -this.bucketsBoundary = bucketsBoundary; -this.counters = new AtomicLong[this.bucketsBoundary.length + 1]; -for (int i = 0; i < counters.length; i++) { -this.counters[i] = new AtomicLong(); -} -this.id = id; -} - -/** - * @param second in seconds - */ -public void process(long second) { -for (int i = 0; i < bucketsBoundary.length; ++i) { -if (second < bucketsBoundary[i]) { -counters[i].incrementAndGet(); -return; -} -} - -counters[bucketsBoundary.length].incrementAndGet(); -} - -/** - * @param millis in milli seconds - */ -public void processMillis(long millis) { -process(millis / 1000); -} - -public void printStatus() { -long[] countersSnapshot = new long[counters.length]; -for (int i = 0; i < countersSnapshot.length; i++) { -countersSnapshot[i] = counters[i].get(); -} - -long sum = 0; -for (long counter : countersSnapshot) { -sum += counter; -} - -synchronized (printLock) { -System.out.println("== status of TimeHistogram " + id + " ="); - -for (int i = 0; i < countersSnapshot.length; ++i) { -System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum))); -} - -} -} - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java new file mode 100644 index 000..7a42598 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka.util; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + */
[1/6] kylin git commit: KYLIN-2055 Add an encoder for Boolean type
Repository: kylin Updated Branches: refs/heads/master 71f373507 -> ed643e6b2 KYLIN-2055 Add an encoder for Boolean type Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cb2b12b3 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cb2b12b3 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cb2b12b3 Branch: refs/heads/master Commit: cb2b12b3b619ac86efbb9c7ca708418882683daf Parents: 71f3735 Author: shaofengshiAuthored: Mon Oct 10 13:30:27 2016 +0800 Committer: shaofengshi Committed: Mon Oct 10 13:30:27 2016 +0800 -- .../apache/kylin/dimension/BooleanDimEnc.java | 196 +++ .../dimension/DimensionEncodingFactory.java | 1 + .../kylin/dimension/BooleanDimEncTest.java | 95 + .../cubeDesigner/advanced_settings.html | 2 +- 4 files changed, 293 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/cb2b12b3/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java -- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java new file mode 100644 index 000..f32724c --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dimension; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; + +/** + * Encoding Boolean values to bytes + */ +public class BooleanDimEnc extends DimensionEncoding { +private static final long serialVersionUID = 1L; + +public static final String ENCODING_NAME = "boolean"; + +//NOTE: when add new value, append to the array tail, DO NOT insert! +public static String[] ALLOWED_VALUES = new String[] { "", "true", "false", "TRUE", "FALSE", "True", "False", "t", "f", "T", "F", "yes", "no", "YES", "NO", "Yes", "No", "y", "n", "Y", "N", "1", "0" }; + +public static final Map map = Maps.newHashMap(); + +static { +for (int i = 0; i < ALLOWED_VALUES.length; i++) { +map.put(ALLOWED_VALUES[i], i); +} +} + +public static class Factory extends DimensionEncodingFactory { +@Override +public String getSupportedEncodingName() { +return ENCODING_NAME; +} + +@Override +public DimensionEncoding createDimensionEncoding(String encodingName, String[] args) { +return new BooleanDimEnc(); +} +}; + +// + +private static int fixedLen = 1; + +//no-arg constructor is required for Externalizable +public BooleanDimEnc() { +} + +@Override +public int getLengthOfEncoding() { +return fixedLen; +} + +@Override +public void encode(byte[] value, int valueLen, byte[] output, int outputOffset) { +if (value == null) { +Arrays.fill(output, outputOffset, outputOffset + fixedLen, NULL); +return; +} + +encode(Bytes.toString(value, 0, valueLen), output, outputOffset); +} + +void encode(String valueStr, byte[] output, int outputOffset) { +if (valueStr == null) { +Arrays.fill(output, outputOffset, outputOffset + fixedLen, NULL); +return; +} + +Integer encodeValue = map.get(valueStr); +if (encodeValue == null) { +throw new IllegalArgumentException("Value '" + valueStr + "' is not a recognized boolean value."); +
[6/6] kylin git commit: KYLIN-1726 package rename
KYLIN-1726 package rename Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c67fa740 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c67fa740 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c67fa740 Branch: refs/heads/master Commit: c67fa740d364d372c7a6424fd160570cc7e890c4 Parents: 5aee022 Author: shaofengshiAuthored: Sun Oct 9 13:24:59 2016 +0800 Committer: shaofengshi Committed: Mon Oct 10 13:32:44 2016 +0800 -- .../apache/kylin/source/kafka/KafkaMRInput.java | 3 + .../kylin/source/kafka/MergeOffsetStep.java | 80 --- .../kylin/source/kafka/SeekOffsetStep.java | 140 -- .../source/kafka/StringStreamingParser.java | 51 --- .../apache/kylin/source/kafka/TopicMeta.java| 46 -- .../kylin/source/kafka/UpdateTimeRangeStep.java | 117 --- .../kylin/source/kafka/job/MergeOffsetStep.java | 80 +++ .../kylin/source/kafka/job/SeekOffsetStep.java | 141 +++ .../source/kafka/job/UpdateTimeRangeStep.java | 117 +++ .../kafka/util/ByteBufferBackedInputStream.java | 6 +- 10 files changed, 344 insertions(+), 437 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 6358ee1..4d1f5c9 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -43,6 +43,9 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.job.MergeOffsetStep; +import org.apache.kylin.source.kafka.job.SeekOffsetStep; +import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep; import javax.annotation.Nullable; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java deleted file mode 100644 index 18c959a..000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class MergeOffsetStep extends AbstractExecutable { - -private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class); -public MergeOffsetStep() { -super(); -} - -@Override -protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { -final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); -final CubeInstance cube =
[3/6] kylin git commit: KYLIN-2072 Cleanup old streaming code
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java -- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java deleted file mode 100644 index 271bf41..000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.cachesync.Broadcaster; -import org.apache.kylin.metadata.cachesync.Broadcaster.Event; -import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class StreamingManager { - -private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class); - -// static cached instances -private static final ConcurrentHashMapCACHE = new ConcurrentHashMap (); - -public static final Serializer STREAMING_SERIALIZER = new JsonSerializer(StreamingConfig.class); - -private KylinConfig config; - -// name ==> StreamingConfig -private CaseInsensitiveStringCache streamingMap; - -public static void clearCache() { -CACHE.clear(); -} - -private StreamingManager(KylinConfig config) throws IOException { -this.config = config; -this.streamingMap = new CaseInsensitiveStringCache(config, "streaming"); - -// touch lower level metadata before registering my listener -reloadAllStreaming(); -Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming"); -} - -private class StreamingSyncListener extends Broadcaster.Listener { -@Override -public void onClearAll(Broadcaster broadcaster) throws IOException { -clearCache(); -} - -@Override -public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { -if (event == Event.DROP) -removeStreamingLocal(cacheKey); -else -reloadStreamingConfigLocal(cacheKey); -} -} - -private ResourceStore getStore() { -return ResourceStore.getStore(this.config); -} - -public static StreamingManager getInstance(KylinConfig config) { -StreamingManager r = CACHE.get(config); -if (r != null) { -return r; -} - -synchronized (StreamingManager.class) { -r = CACHE.get(config); -if (r != null) { -return r; -} -try { -r = new StreamingManager(config); -CACHE.put(config, r); -if (CACHE.size() > 1) { -logger.warn("More than one singleton exist"); -} -return r; -} catch (IOException e) { -throw new IllegalStateException("Failed to init StreamingManager from " + config, e); -} -} -} - -private static String formatStreamingConfigPath(String name) { -return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; -} - -private static String formatStreamingOutputPath(String streaming, int partition) { -return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json"; -} - -private static String
[kylin] Git Push Summary
Repository: kylin Updated Branches: refs/heads/KYLIN-2067 [deleted] 41cdadd3b
[2/9] kylin git commit: KYLIN-2073 Tag timestamp in basic information
KYLIN-2073 Tag timestamp in basic information Signed-off-by: lidongsjtuProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bc3cd889 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bc3cd889 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bc3cd889 Branch: refs/heads/KYLIN-2072 Commit: bc3cd889a18ff7f93a04af2f0ad5e9a60edbf161 Parents: 9d42d43 Author: Yifan Zhang Authored: Sun Oct 9 13:27:33 2016 +0800 Committer: lidongsjtu Committed: Sun Oct 9 18:12:54 2016 +0800 -- .../src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/bc3cd889/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java -- diff --git a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java index 5797928..42a6706 100644 --- a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java @@ -141,6 +141,8 @@ public abstract class AbstractInfoExtractor extends AbstractApplication { StringBuilder basicSb = new StringBuilder(); basicSb.append("MetaStoreID: ").append(ToolUtil.getHBaseMetaStoreId()).append("\n"); basicSb.append("PackageType: ").append(packageType.toUpperCase()).append("\n"); +SimpleDateFormat format = new SimpleDateFormat("-MM-dd HH:mm:ss Z"); +basicSb.append("PackageTimestamp: ").append(format.format(new Date())).append("\n"); basicSb.append("Host: ").append(ToolUtil.getHostName()).append("\n"); FileUtils.writeStringToFile(new File(exportDir, "info"), basicSb.toString(), Charset.defaultCharset()); }
[8/9] kylin git commit: KYLIN-1726 package rename
KYLIN-1726 package rename Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1854ad84 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1854ad84 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1854ad84 Branch: refs/heads/KYLIN-2072 Commit: 1854ad84035062d8cc7666ca713314d89ea46db6 Parents: 2200b59 Author: shaofengshiAuthored: Sun Oct 9 13:24:59 2016 +0800 Committer: shaofengshi Committed: Sun Oct 9 22:17:10 2016 +0800 -- .../apache/kylin/source/kafka/KafkaMRInput.java | 3 + .../kylin/source/kafka/MergeOffsetStep.java | 80 --- .../kylin/source/kafka/SeekOffsetStep.java | 140 -- .../source/kafka/StringStreamingParser.java | 51 --- .../apache/kylin/source/kafka/TopicMeta.java| 46 -- .../kylin/source/kafka/UpdateTimeRangeStep.java | 117 --- .../kylin/source/kafka/job/MergeOffsetStep.java | 80 +++ .../kylin/source/kafka/job/SeekOffsetStep.java | 141 +++ .../source/kafka/job/UpdateTimeRangeStep.java | 117 +++ .../kafka/util/ByteBufferBackedInputStream.java | 6 +- 10 files changed, 344 insertions(+), 437 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/1854ad84/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 6358ee1..4d1f5c9 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -43,6 +43,9 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.job.MergeOffsetStep; +import org.apache.kylin.source.kafka.job.SeekOffsetStep; +import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep; import javax.annotation.Nullable; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/kylin/blob/1854ad84/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java deleted file mode 100644 index 18c959a..000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class MergeOffsetStep extends AbstractExecutable { - -private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class); -public MergeOffsetStep() { -super(); -} - -@Override -protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { -final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); -final CubeInstance cube =
[7/9] kylin git commit: KYLIN-2072 Cleanup old streaming code
KYLIN-2072 Cleanup old streaming code Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2200b595 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2200b595 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2200b595 Branch: refs/heads/KYLIN-2072 Commit: 2200b595af05901b04ef8a8b55564e7accedb044 Parents: 71f3735 Author: shaofengshiAuthored: Sun Oct 9 13:10:50 2016 +0800 Committer: shaofengshi Committed: Sun Oct 9 22:17:10 2016 +0800 -- assembly/pom.xml| 4 - .../kylin/job/streaming/KafkaDataLoader.java| 79 build/bin/cleanup_streaming_files.sh| 42 -- build/bin/kylin.sh | 61 --- build/bin/streaming_build.sh| 33 -- build/bin/streaming_check.sh| 29 -- build/bin/streaming_fillgap.sh | 26 -- build/bin/streaming_rolllog.sh | 29 -- .../metadata/streaming/StreamingConfig.java | 85 .../metadata/streaming/StreamingManager.java| 248 .../.settings/org.eclipse.core.resources.prefs | 6 - .../.settings/org.eclipse.jdt.core.prefs| 386 --- .../.settings/org.eclipse.jdt.ui.prefs | 7 - engine-streaming/pom.xml| 121 -- .../kylin/engine/streaming/BootstrapConfig.java | 71 .../kylin/engine/streaming/IStreamingInput.java | 30 -- .../engine/streaming/IStreamingOutput.java | 34 -- .../streaming/OneOffStreamingBuilder.java | 71 .../engine/streaming/StreamingBatchBuilder.java | 43 --- .../kylin/engine/streaming/StreamingConfig.java | 85 .../engine/streaming/StreamingManager.java | 248 .../kylin/engine/streaming/cli/MonitorCLI.java | 88 - .../engine/streaming/cli/StreamingCLI.java | 114 -- .../streaming/cube/StreamingCubeBuilder.java| 168 .../diagnose/StreamingLogAnalyzer.java | 96 - .../streaming/monitor/StreamingMonitor.java | 172 - .../engine/streaming/util/StreamingUtils.java | 51 --- .../kylin/provision/BuildCubeWithStream.java| 4 +- pom.xml | 6 - .../rest/controller/StreamingController.java| 2 +- .../kylin/rest/controller/TableController.java | 2 +- .../apache/kylin/rest/service/BasicService.java | 2 +- .../kylin/rest/service/StreamingService.java| 2 +- source-kafka/pom.xml| 6 - .../kafka/ByteBufferBackedInputStream.java | 52 --- .../apache/kylin/source/kafka/KafkaSource.java | 2 +- .../kylin/source/kafka/KafkaStreamingInput.java | 227 --- .../source/kafka/TimedJsonStreamParser.java | 1 + .../kafka/diagnose/KafkaInputAnalyzer.java | 312 --- .../source/kafka/diagnose/KafkaVerify.java | 101 - .../source/kafka/diagnose/TimeHistogram.java| 85 .../kafka/util/ByteBufferBackedInputStream.java | 52 +++ .../kylin/source/kafka/util/KafkaRequester.java | 191 - .../kylin/source/kafka/util/KafkaUtils.java | 173 - .../test/java/TimedJsonStreamParserTest.java| 4 +- storage-hbase/pom.xml | 4 - .../hbase/steps/HBaseStreamingOutput.java | 98 - .../apache/kylin/tool/CubeMetaExtractor.java| 4 +- 48 files changed, 397 insertions(+), 3360 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 0c80afc..e6f83a8 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -47,10 +47,6 @@ org.apache.kylin kylin-engine-mr - -org.apache.kylin -kylin-engine-streaming - http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java -- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java deleted file mode 100644 index 454f6cf..000 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you
[1/9] kylin git commit: minor, output error message to response of diagnosis [Forced Update!]
Repository: kylin Updated Branches: refs/heads/KYLIN-2072 38b44b4c6 -> c9f27e4dc (forced update) minor, output error message to response of diagnosis Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9d42d430 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9d42d430 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9d42d430 Branch: refs/heads/KYLIN-2072 Commit: 9d42d430f10261cb6b0df21f0c678fedde53b428 Parents: c46b58b Author: lidongsjtuAuthored: Sun Oct 9 12:50:33 2016 +0800 Committer: lidongsjtu Committed: Sun Oct 9 12:52:29 2016 +0800 -- .../org/apache/kylin/rest/controller/DiagnosisController.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/9d42d430/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java -- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java index 826f888..d16547e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java @@ -82,7 +82,7 @@ public class DiagnosisController extends BasicController { try { filePath = dgService.dumpProjectDiagnosisInfo(project); } catch (IOException e) { -throw new InternalErrorException("Failed to dump diagnosis info.", e); +throw new InternalErrorException("Failed to dump project diagnosis info. " + e.getMessage(), e); } setDownloadResponse(filePath, response); @@ -98,7 +98,7 @@ public class DiagnosisController extends BasicController { try { filePath = dgService.dumpJobDiagnosisInfo(jobId); } catch (IOException e) { -throw new InternalErrorException("Failed to dump diagnosis info.", e); +throw new InternalErrorException("Failed to dump job diagnosis info. " + e.getMessage(), e); } setDownloadResponse(filePath, response); @@ -114,7 +114,7 @@ public class DiagnosisController extends BasicController { IOUtils.copyLarge(fileInputStream, output); output.flush(); } catch (IOException e) { -throw new InternalErrorException("Failed to dump diagnosis info.", e); +throw new InternalErrorException("Failed to create download for diagnosis. " + e.getMessage(), e); } } }
[4/9] kylin git commit: Partition-Date-Column-Add-Type-Integer-Kylin
Partition-Date-Column-Add-Type-Integer-Kylin Signed-off-by: JasonProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/71f37350 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/71f37350 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/71f37350 Branch: refs/heads/KYLIN-2072 Commit: 71f3735073d3e29c0620c1e680d41dd495ff5ec0 Parents: 14c680c Author: chenzhx <346839...@qq.com> Authored: Sun Oct 9 11:32:54 2016 +0800 Committer: Jason Committed: Sun Oct 9 19:36:34 2016 +0800 -- webapp/app/js/controllers/modelEdit.js | 4 ++-- webapp/app/partials/modelDesigner/conditions_settings.html | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/71f37350/webapp/app/js/controllers/modelEdit.js -- diff --git a/webapp/app/js/controllers/modelEdit.js b/webapp/app/js/controllers/modelEdit.js index 65e17fa..b843829 100644 --- a/webapp/app/js/controllers/modelEdit.js +++ b/webapp/app/js/controllers/modelEdit.js @@ -35,7 +35,7 @@ KylinApp.controller('ModelEditCtrl', function ($scope, $q, $routeParams, $locati $scope.getPartitonColumns = function(tableName){ var columns = _.filter($scope.getColumnsByTable(tableName),function(column){ -return column.datatype==="date"||column.datatype==="timestamp"||column.datatype==="string"||column.datatype.startsWith("varchar")||column.datatype==="bigint"||column.datatype==="int"; +return column.datatype==="date"||column.datatype==="timestamp"||column.datatype==="string"||column.datatype.startsWith("varchar")||column.datatype==="bigint"||column.datatype==="int"||column.datatype==="integer"; }); return columns; }; @@ -79,7 +79,7 @@ KylinApp.controller('ModelEditCtrl', function ($scope, $q, $routeParams, $locati if(dateColumn==columnName) return _column; }); -if(column[0].datatype==="bigint"||column[0].datatype==="int"){ + if(column[0].datatype==="bigint"||column[0].datatype==="int"||column[0].datatype==="integer"){ $scope.isBigInt=true; $scope.modelsManager.selectedModel.partition_desc.partition_date_format=null;; $scope.partitionColumn.hasSeparateTimeColumn=false; http://git-wip-us.apache.org/repos/asf/kylin/blob/71f37350/webapp/app/partials/modelDesigner/conditions_settings.html -- diff --git a/webapp/app/partials/modelDesigner/conditions_settings.html b/webapp/app/partials/modelDesigner/conditions_settings.html index f0390e5..1de93b4 100644 --- a/webapp/app/partials/modelDesigner/conditions_settings.html +++ b/webapp/app/partials/modelDesigner/conditions_settings.html @@ -164,7 +164,7 @@
- Partition date column not required,leave as default if cube always need full build
-- Column should contain date value (type can be Date, Timestamp, String, VARCHAR, Int, BigInt, etc.)
+- Column should contain date value (type can be Date, Timestamp, String, VARCHAR, Int, Integer, BigInt, etc.)
kylin git commit: Partition-Date-Column-Add-Type-Integer-Kylin
Repository: kylin Updated Branches: refs/heads/master 14c680c94 -> 71f373507 Partition-Date-Column-Add-Type-Integer-Kylin Signed-off-by: JasonProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/71f37350 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/71f37350 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/71f37350 Branch: refs/heads/master Commit: 71f3735073d3e29c0620c1e680d41dd495ff5ec0 Parents: 14c680c Author: chenzhx <346839...@qq.com> Authored: Sun Oct 9 11:32:54 2016 +0800 Committer: Jason Committed: Sun Oct 9 19:36:34 2016 +0800 -- webapp/app/js/controllers/modelEdit.js | 4 ++-- webapp/app/partials/modelDesigner/conditions_settings.html | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/71f37350/webapp/app/js/controllers/modelEdit.js -- diff --git a/webapp/app/js/controllers/modelEdit.js b/webapp/app/js/controllers/modelEdit.js index 65e17fa..b843829 100644 --- a/webapp/app/js/controllers/modelEdit.js +++ b/webapp/app/js/controllers/modelEdit.js @@ -35,7 +35,7 @@ KylinApp.controller('ModelEditCtrl', function ($scope, $q, $routeParams, $locati $scope.getPartitonColumns = function(tableName){ var columns = _.filter($scope.getColumnsByTable(tableName),function(column){ -return column.datatype==="date"||column.datatype==="timestamp"||column.datatype==="string"||column.datatype.startsWith("varchar")||column.datatype==="bigint"||column.datatype==="int"; +return column.datatype==="date"||column.datatype==="timestamp"||column.datatype==="string"||column.datatype.startsWith("varchar")||column.datatype==="bigint"||column.datatype==="int"||column.datatype==="integer"; }); return columns; }; @@ -79,7 +79,7 @@ KylinApp.controller('ModelEditCtrl', function ($scope, $q, $routeParams, $locati if(dateColumn==columnName) return _column; }); -if(column[0].datatype==="bigint"||column[0].datatype==="int"){ + if(column[0].datatype==="bigint"||column[0].datatype==="int"||column[0].datatype==="integer"){ $scope.isBigInt=true; $scope.modelsManager.selectedModel.partition_desc.partition_date_format=null;; $scope.partitionColumn.hasSeparateTimeColumn=false; http://git-wip-us.apache.org/repos/asf/kylin/blob/71f37350/webapp/app/partials/modelDesigner/conditions_settings.html -- diff --git a/webapp/app/partials/modelDesigner/conditions_settings.html b/webapp/app/partials/modelDesigner/conditions_settings.html index f0390e5..1de93b4 100644 --- a/webapp/app/partials/modelDesigner/conditions_settings.html +++ b/webapp/app/partials/modelDesigner/conditions_settings.html @@ -164,7 +164,7 @@
- Partition date column not required,leave as default if cube always need full build
-- Column should contain date value (type can be Date, Timestamp, String, VARCHAR, Int, BigInt, etc.)
+- Column should contain date value (type can be Date, Timestamp, String, VARCHAR, Int, Integer, BigInt, etc.)
[1/2] kylin git commit: KYLIN-2030 bug fix
Repository: kylin Updated Branches: refs/heads/orderedbytes adf1369af -> afdec89fe KYLIN-2030 bug fix Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b0aa327d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b0aa327d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b0aa327d Branch: refs/heads/orderedbytes Commit: b0aa327d23d635fada35b80bb149a2611ad689b5 Parents: adf1369 Author: Hongbin MaAuthored: Sun Oct 9 19:07:52 2016 +0800 Committer: Hongbin Ma Committed: Sun Oct 9 19:07:52 2016 +0800 -- .../kylin/cube/CubeCapabilityChecker.java | 23 ++-- .../kylin/query/relnode/OLAPAggregateRel.java | 22 +++ 2 files changed, 25 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/b0aa327d/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java -- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java index ee21b1c..e509d98 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java @@ -81,7 +81,7 @@ public class CubeCapabilityChecker { //1. dimension as measure if (!unmatchedAggregations.isEmpty()) { -tryDimensionAsMeasures(unmatchedAggregations, digest, cube, result, cube.getDescriptor().listDimensionColumnsIncludingDerived()); +tryDimensionAsMeasures(unmatchedAggregations, result, cube.getDescriptor().listDimensionColumnsIncludingDerived()); } } else { //for non query-on-facttable @@ -92,10 +92,18 @@ public class CubeCapabilityChecker { dimCols.add(columnDesc.getRef()); } -//1. dimension as measure, like max(cal_dt) or count( distinct col) from lookup +//1. all aggregations on lookup table can be done. For distinct count, mark them all DimensionAsMeasures +// so that the measure has a chance to be upgraded to DimCountDistinctMeasureType in org.apache.kylin.metadata.model.FunctionDesc#reInitMeasureType if (!unmatchedAggregations.isEmpty()) { -tryDimensionAsMeasures(unmatchedAggregations, digest, cube, result, dimCols); +Iterator itr = unmatchedAggregations.iterator(); +while (itr.hasNext()) { +FunctionDesc functionDesc = itr.next(); +if (dimCols.containsAll(functionDesc.getParameter().getColRefs())) { +itr.remove(); +} +} } +tryDimensionAsMeasures(Lists.newArrayList(aggrFunctions), result, dimCols); //2. more "dimensions" contributed by snapshot if (!unmatchedDimensions.isEmpty()) { @@ -159,19 +167,12 @@ public class CubeCapabilityChecker { return result; } -private static void tryDimensionAsMeasures(Collection unmatchedAggregations, SQLDigest digest, CubeInstance cube, CapabilityResult result, Set dimCols) { -CubeDesc cubeDesc = cube.getDescriptor(); -Collection cubeFuncs = cubeDesc.listAllFunctions(); +private static void tryDimensionAsMeasures(Collection unmatchedAggregations, CapabilityResult result, Set dimCols) { Iterator it = unmatchedAggregations.iterator(); while (it.hasNext()) { FunctionDesc functionDesc = it.next(); -if (cubeFuncs.contains(functionDesc)) { -it.remove(); -continue; -} - // let calcite handle count if (functionDesc.isCount()) { it.remove(); http://git-wip-us.apache.org/repos/asf/kylin/blob/b0aa327d/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java -- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index 97efb27..c7a1eff 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -285,16 +285,20 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { } private void translateAggregation() { -// now the realization is known, replace aggregations with what's defined on MeasureDesc -
[2/2] kylin git commit: KYLIN-1726 fix 'FileSystem Closed' error
KYLIN-1726 fix 'FileSystem Closed' error Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/afdec89f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/afdec89f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/afdec89f Branch: refs/heads/orderedbytes Commit: afdec89fe09dcb28b368775f8b830c78f74e7489 Parents: b0aa327 Author: shaofengshiAuthored: Sun Oct 9 19:06:07 2016 +0800 Committer: Hongbin Ma Committed: Sun Oct 9 19:08:41 2016 +0800 -- .../apache/kylin/source/kafka/UpdateTimeRangeStep.java | 11 ++- 1 file changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/afdec89f/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java index bb64bf9..9e902d8 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java @@ -21,6 +21,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.fs.FSDataInputStream; @@ -63,7 +64,12 @@ public class UpdateTimeRangeStep extends AbstractExecutable { final Path outputFile = new Path(outputPath, partitionCol.getName()); String minValue = null, maxValue = null, currentValue = null; -try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { +FSDataInputStream inputStream = null; +BufferedReader bufferedReader = null; +try { +FileSystem fs = HadoopUtil.getFileSystem(outputPath); +inputStream = fs.open(outputFile); +bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); minValue = currentValue = bufferedReader.readLine(); while (currentValue != null) { maxValue = currentValue; @@ -72,6 +78,9 @@ public class UpdateTimeRangeStep extends AbstractExecutable { } catch (IOException e) { logger.error("fail to read file " + outputFile, e); return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); +} finally { +IOUtils.closeQuietly(bufferedReader); +IOUtils.closeQuietly(inputStream); } final DataType partitionColType = partitionCol.getType();
kylin git commit: KYLIN-1726 fix 'FileSystem Closed' error
Repository: kylin Updated Branches: refs/heads/master bc3cd889a -> 14c680c94 KYLIN-1726 fix 'FileSystem Closed' error Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/14c680c9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/14c680c9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/14c680c9 Branch: refs/heads/master Commit: 14c680c9444b4a2e1a64baad4ca81498e9c27e47 Parents: bc3cd88 Author: shaofengshiAuthored: Sun Oct 9 19:06:07 2016 +0800 Committer: shaofengshi Committed: Sun Oct 9 19:06:17 2016 +0800 -- .../apache/kylin/source/kafka/UpdateTimeRangeStep.java | 11 ++- 1 file changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/14c680c9/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java index bb64bf9..9e902d8 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java @@ -21,6 +21,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.fs.FSDataInputStream; @@ -63,7 +64,12 @@ public class UpdateTimeRangeStep extends AbstractExecutable { final Path outputFile = new Path(outputPath, partitionCol.getName()); String minValue = null, maxValue = null, currentValue = null; -try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { +FSDataInputStream inputStream = null; +BufferedReader bufferedReader = null; +try { +FileSystem fs = HadoopUtil.getFileSystem(outputPath); +inputStream = fs.open(outputFile); +bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); minValue = currentValue = bufferedReader.readLine(); while (currentValue != null) { maxValue = currentValue; @@ -72,6 +78,9 @@ public class UpdateTimeRangeStep extends AbstractExecutable { } catch (IOException e) { logger.error("fail to read file " + outputFile, e); return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); +} finally { +IOUtils.closeQuietly(bufferedReader); +IOUtils.closeQuietly(inputStream); } final DataType partitionColType = partitionCol.getType();
kylin git commit: KYLIN-2073 Tag timestamp in basic information
Repository: kylin Updated Branches: refs/heads/yang21 a2fb71d76 -> c553782f6 KYLIN-2073 Tag timestamp in basic information Signed-off-by: lidongsjtuProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c553782f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c553782f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c553782f Branch: refs/heads/yang21 Commit: c553782f64815952a33ac08cdb2f4145e283eafa Parents: a2fb71d Author: Yifan Zhang Authored: Sun Oct 9 13:27:33 2016 +0800 Committer: lidongsjtu Committed: Sun Oct 9 18:17:39 2016 +0800 -- .../src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/c553782f/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java -- diff --git a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java index 5797928..42a6706 100644 --- a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java @@ -141,6 +141,8 @@ public abstract class AbstractInfoExtractor extends AbstractApplication { StringBuilder basicSb = new StringBuilder(); basicSb.append("MetaStoreID: ").append(ToolUtil.getHBaseMetaStoreId()).append("\n"); basicSb.append("PackageType: ").append(packageType.toUpperCase()).append("\n"); +SimpleDateFormat format = new SimpleDateFormat("-MM-dd HH:mm:ss Z"); +basicSb.append("PackageTimestamp: ").append(format.format(new Date())).append("\n"); basicSb.append("Host: ").append(ToolUtil.getHostName()).append("\n"); FileUtils.writeStringToFile(new File(exportDir, "info"), basicSb.toString(), Charset.defaultCharset()); }
kylin git commit: KYLIN-2073 Tag timestamp in basic information
Repository: kylin Updated Branches: refs/heads/master 9d42d430f -> bc3cd889a KYLIN-2073 Tag timestamp in basic information Signed-off-by: lidongsjtuProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bc3cd889 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bc3cd889 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bc3cd889 Branch: refs/heads/master Commit: bc3cd889a18ff7f93a04af2f0ad5e9a60edbf161 Parents: 9d42d43 Author: Yifan Zhang Authored: Sun Oct 9 13:27:33 2016 +0800 Committer: lidongsjtu Committed: Sun Oct 9 18:12:54 2016 +0800 -- .../src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/bc3cd889/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java -- diff --git a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java index 5797928..42a6706 100644 --- a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java @@ -141,6 +141,8 @@ public abstract class AbstractInfoExtractor extends AbstractApplication { StringBuilder basicSb = new StringBuilder(); basicSb.append("MetaStoreID: ").append(ToolUtil.getHBaseMetaStoreId()).append("\n"); basicSb.append("PackageType: ").append(packageType.toUpperCase()).append("\n"); +SimpleDateFormat format = new SimpleDateFormat("-MM-dd HH:mm:ss Z"); +basicSb.append("PackageTimestamp: ").append(format.format(new Date())).append("\n"); basicSb.append("Host: ").append(ToolUtil.getHostName()).append("\n"); FileUtils.writeStringToFile(new File(exportDir, "info"), basicSb.toString(), Charset.defaultCharset()); }
[3/4] kylin git commit: KYLIN-2072 Cleanup old streaming code
KYLIN-2072 Cleanup old streaming code Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0f413827 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0f413827 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0f413827 Branch: refs/heads/KYLIN-2072 Commit: 0f413827dfcaa44463697d52a5eeca0a0b57eb6c Parents: c46b58b Author: shaofengshiAuthored: Sun Oct 9 13:10:50 2016 +0800 Committer: shaofengshi Committed: Sun Oct 9 13:10:50 2016 +0800 -- assembly/pom.xml| 4 - .../kylin/job/streaming/KafkaDataLoader.java| 79 build/bin/cleanup_streaming_files.sh| 42 -- build/bin/kylin.sh | 61 --- build/bin/streaming_build.sh| 33 -- build/bin/streaming_check.sh| 29 -- build/bin/streaming_fillgap.sh | 26 -- build/bin/streaming_rolllog.sh | 29 -- .../metadata/streaming/StreamingConfig.java | 85 .../metadata/streaming/StreamingManager.java| 248 .../.settings/org.eclipse.core.resources.prefs | 6 - .../.settings/org.eclipse.jdt.core.prefs| 386 --- .../.settings/org.eclipse.jdt.ui.prefs | 7 - engine-streaming/pom.xml| 121 -- .../kylin/engine/streaming/BootstrapConfig.java | 71 .../kylin/engine/streaming/IStreamingInput.java | 30 -- .../engine/streaming/IStreamingOutput.java | 34 -- .../streaming/OneOffStreamingBuilder.java | 71 .../engine/streaming/StreamingBatchBuilder.java | 43 --- .../kylin/engine/streaming/StreamingConfig.java | 85 .../engine/streaming/StreamingManager.java | 248 .../kylin/engine/streaming/cli/MonitorCLI.java | 88 - .../engine/streaming/cli/StreamingCLI.java | 114 -- .../streaming/cube/StreamingCubeBuilder.java| 168 .../diagnose/StreamingLogAnalyzer.java | 96 - .../streaming/monitor/StreamingMonitor.java | 172 - .../engine/streaming/util/StreamingUtils.java | 51 --- .../kylin/provision/BuildCubeWithStream.java| 4 +- pom.xml | 6 - .../rest/controller/StreamingController.java| 2 +- .../kylin/rest/controller/TableController.java | 2 +- .../apache/kylin/rest/service/BasicService.java | 2 +- .../kylin/rest/service/StreamingService.java| 2 +- source-kafka/pom.xml| 6 - .../kafka/ByteBufferBackedInputStream.java | 52 --- .../apache/kylin/source/kafka/KafkaSource.java | 2 +- .../kylin/source/kafka/KafkaStreamingInput.java | 227 --- .../source/kafka/TimedJsonStreamParser.java | 1 + .../kafka/diagnose/KafkaInputAnalyzer.java | 312 --- .../source/kafka/diagnose/KafkaVerify.java | 101 - .../source/kafka/diagnose/TimeHistogram.java| 85 .../kafka/util/ByteBufferBackedInputStream.java | 52 +++ .../kylin/source/kafka/util/KafkaRequester.java | 191 - .../kylin/source/kafka/util/KafkaUtils.java | 173 - .../test/java/TimedJsonStreamParserTest.java| 4 +- storage-hbase/pom.xml | 4 - .../hbase/steps/HBaseStreamingOutput.java | 98 - .../apache/kylin/tool/CubeMetaExtractor.java| 4 +- 48 files changed, 397 insertions(+), 3360 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/0f413827/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 0c80afc..e6f83a8 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -47,10 +47,6 @@ org.apache.kylin kylin-engine-mr - -org.apache.kylin -kylin-engine-streaming - http://git-wip-us.apache.org/repos/asf/kylin/blob/0f413827/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java -- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java deleted file mode 100644 index 454f6cf..000 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you
[2/4] kylin git commit: KYLIN-2072 Cleanup old streaming code
http://git-wip-us.apache.org/repos/asf/kylin/blob/0f413827/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java -- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java deleted file mode 100644 index 271bf41..000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.cachesync.Broadcaster; -import org.apache.kylin.metadata.cachesync.Broadcaster.Event; -import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class StreamingManager { - -private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class); - -// static cached instances -private static final ConcurrentHashMapCACHE = new ConcurrentHashMap (); - -public static final Serializer STREAMING_SERIALIZER = new JsonSerializer(StreamingConfig.class); - -private KylinConfig config; - -// name ==> StreamingConfig -private CaseInsensitiveStringCache streamingMap; - -public static void clearCache() { -CACHE.clear(); -} - -private StreamingManager(KylinConfig config) throws IOException { -this.config = config; -this.streamingMap = new CaseInsensitiveStringCache(config, "streaming"); - -// touch lower level metadata before registering my listener -reloadAllStreaming(); -Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming"); -} - -private class StreamingSyncListener extends Broadcaster.Listener { -@Override -public void onClearAll(Broadcaster broadcaster) throws IOException { -clearCache(); -} - -@Override -public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { -if (event == Event.DROP) -removeStreamingLocal(cacheKey); -else -reloadStreamingConfigLocal(cacheKey); -} -} - -private ResourceStore getStore() { -return ResourceStore.getStore(this.config); -} - -public static StreamingManager getInstance(KylinConfig config) { -StreamingManager r = CACHE.get(config); -if (r != null) { -return r; -} - -synchronized (StreamingManager.class) { -r = CACHE.get(config); -if (r != null) { -return r; -} -try { -r = new StreamingManager(config); -CACHE.put(config, r); -if (CACHE.size() > 1) { -logger.warn("More than one singleton exist"); -} -return r; -} catch (IOException e) { -throw new IllegalStateException("Failed to init StreamingManager from " + config, e); -} -} -} - -private static String formatStreamingConfigPath(String name) { -return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; -} - -private static String formatStreamingOutputPath(String streaming, int partition) { -return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json"; -} - -private static String
[1/4] kylin git commit: KYLIN-2072 Cleanup old streaming code
Repository: kylin Updated Branches: refs/heads/KYLIN-2072 [created] 38b44b4c6 http://git-wip-us.apache.org/repos/asf/kylin/blob/0f413827/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java deleted file mode 100644 index 1c579c6..000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.diagnose; - -import java.util.concurrent.atomic.AtomicLong; - -public class TimeHistogram { -private long[] bucketsBoundary; -private AtomicLong[] counters; -private String id; - -private static Object printLock = new Object(); - -/** - * example: [10,20] will generate three buckets: (-â,10), [10,20),[20,+â) - * unit: second - */ -public TimeHistogram(long[] bucketsBoundary, String id) { -this.bucketsBoundary = bucketsBoundary; -this.counters = new AtomicLong[this.bucketsBoundary.length + 1]; -for (int i = 0; i < counters.length; i++) { -this.counters[i] = new AtomicLong(); -} -this.id = id; -} - -/** - * @param second in seconds - */ -public void process(long second) { -for (int i = 0; i < bucketsBoundary.length; ++i) { -if (second < bucketsBoundary[i]) { -counters[i].incrementAndGet(); -return; -} -} - -counters[bucketsBoundary.length].incrementAndGet(); -} - -/** - * @param millis in milli seconds - */ -public void processMillis(long millis) { -process(millis / 1000); -} - -public void printStatus() { -long[] countersSnapshot = new long[counters.length]; -for (int i = 0; i < countersSnapshot.length; i++) { -countersSnapshot[i] = counters[i].get(); -} - -long sum = 0; -for (long counter : countersSnapshot) { -sum += counter; -} - -synchronized (printLock) { -System.out.println("== status of TimeHistogram " + id + " ="); - -for (int i = 0; i < countersSnapshot.length; ++i) { -System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum))); -} - -} -} - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/0f413827/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java new file mode 100644 index 000..7a42598 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka.util; + +import
[4/4] kylin git commit: KYLIN-1726 package rename
KYLIN-1726 package rename Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/38b44b4c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/38b44b4c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/38b44b4c Branch: refs/heads/KYLIN-2072 Commit: 38b44b4c67f3edccfdefad45c2ae2392d130996b Parents: 0f41382 Author: shaofengshiAuthored: Sun Oct 9 13:24:59 2016 +0800 Committer: shaofengshi Committed: Sun Oct 9 13:24:59 2016 +0800 -- .../apache/kylin/source/kafka/KafkaMRInput.java | 3 + .../kylin/source/kafka/MergeOffsetStep.java | 80 --- .../kylin/source/kafka/SeekOffsetStep.java | 140 -- .../source/kafka/StringStreamingParser.java | 51 --- .../apache/kylin/source/kafka/TopicMeta.java| 46 -- .../kylin/source/kafka/UpdateTimeRangeStep.java | 108 -- .../kylin/source/kafka/job/MergeOffsetStep.java | 80 +++ .../kylin/source/kafka/job/SeekOffsetStep.java | 141 +++ .../source/kafka/job/UpdateTimeRangeStep.java | 108 ++ .../kafka/util/ByteBufferBackedInputStream.java | 6 +- 10 files changed, 335 insertions(+), 428 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/38b44b4c/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 6358ee1..4d1f5c9 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -43,6 +43,9 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.job.MergeOffsetStep; +import org.apache.kylin.source.kafka.job.SeekOffsetStep; +import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep; import javax.annotation.Nullable; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/kylin/blob/38b44b4c/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java deleted file mode 100644 index 18c959a..000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class MergeOffsetStep extends AbstractExecutable { - -private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class); -public MergeOffsetStep() { -super(); -} - -@Override -protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { -final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); -final CubeInstance cube =