[BAHIR-144] Add flink-library-siddhi This closes #22
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/2f47eedc Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/2f47eedc Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/2f47eedc Branch: refs/heads/BAHIR-144-merge Commit: 2f47eedc03250b07bc75880b5700e56386ef64b5 Parents: 4f0179a Author: Hao Chen <[email protected]> Authored: Sat Nov 18 22:15:32 2017 +0800 Committer: Robert Metzger <[email protected]> Committed: Sun Nov 26 16:13:11 2017 +0100 ---------------------------------------------------------------------- flink-library-siddhi/pom.xml | 113 ++++++ .../flink/streaming/siddhi/SiddhiCEP.java | 231 +++++++++++ .../flink/streaming/siddhi/SiddhiStream.java | 279 +++++++++++++ .../exception/DuplicatedStreamException.java | 24 ++ .../exception/UndefinedStreamException.java | 24 ++ .../siddhi/operator/AbstractSiddhiOperator.java | 338 ++++++++++++++++ .../siddhi/operator/SiddhiOperatorContext.java | 227 +++++++++++ .../siddhi/operator/SiddhiStreamOperator.java | 80 ++++ .../operator/StreamInMemOutputHandler.java | 104 +++++ .../siddhi/operator/StreamOutputHandler.java | 101 +++++ .../siddhi/operator/StreamRecordComparator.java | 41 ++ .../flink/streaming/siddhi/package-info.java | 78 ++++ .../siddhi/schema/SiddhiStreamSchema.java | 72 ++++ .../streaming/siddhi/schema/StreamSchema.java | 173 ++++++++ .../siddhi/schema/StreamSerializer.java | 76 ++++ .../siddhi/utils/SiddhiStreamFactory.java | 33 ++ .../siddhi/utils/SiddhiTupleFactory.java | 128 ++++++ .../siddhi/utils/SiddhiTypeFactory.java | 136 +++++++ .../flink/streaming/siddhi/SiddhiCEPITCase.java | 403 +++++++++++++++++++ .../extension/CustomPlusFunctionExtension.java | 107 +++++ .../siddhi/operator/SiddhiSyntaxTest.java | 83 ++++ .../schema/SiddhiExecutionPlanSchemaTest.java | 49 +++ .../siddhi/schema/StreamSchemaTest.java | 94 +++++ .../siddhi/schema/StreamSerializerTest.java | 40 ++ .../flink/streaming/siddhi/source/Event.java | 110 +++++ .../siddhi/source/RandomEventSource.java | 72 ++++ .../siddhi/source/RandomTupleSource.java | 74 ++++ .../siddhi/source/RandomWordSource.java | 111 +++++ .../siddhi/utils/SiddhiTupleFactoryTest.java | 46 +++ .../siddhi/utils/SiddhiTypeFactoryTest.java | 50 +++ .../src/test/resources/log4j-test.properties | 27 ++ .../src/test/resources/logback-test.xml | 34 ++ pom.xml | 1 + 33 files changed, 3559 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/pom.xml ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml new file mode 100644 index 0000000..91c6797 --- /dev/null +++ b/flink-library-siddhi/pom.xml @@ -0,0 +1,113 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-flink-parent_2.11</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-library-siddhi_2.11</artifactId> + <name>flink-library-siddhi</name> + + <packaging>jar</packaging> + + <properties> + <siddhi.version>4.0.0-M120</siddhi.version> + </properties> + + <dependencies> + <!-- core dependencies --> + <dependency> + <groupId>org.wso2.siddhi</groupId> + <artifactId>siddhi-core</artifactId> + <version>${siddhi.version}</version> + <exclusions> + <exclusion> <!-- declare the exclusion here --> + <groupId>org.apache.directory.jdbm</groupId> + <artifactId>apacheds-jdbm1</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.wso2.siddhi</groupId> + <artifactId>siddhi-query-api</artifactId> + <version>${siddhi.version}</version> + <exclusions> + <exclusion> <!-- declare the exclusion here --> + <groupId>org.apache.directory.jdbm</groupId> + <artifactId>apacheds-jdbm1</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Core streaming API --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <repositories> + <repository> + <id>wso2-maven2-repository</id> + <name>WSO2 Maven2 Repository</name> + <url>http://maven.wso2.org/nexus/content/repositories/releases/</url> + </repository> + </repositories> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java new file mode 100644 index 0000000..a63dbf6 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException; +import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator; +import org.apache.flink.streaming.siddhi.schema.StreamSchema; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; + +/** + * <p> + * Siddhi CEP Environment, provides utility methods to + * + * <ul> + * <li>Initialize SiddhiCEP environment based on {@link StreamExecutionEnvironment}</li> + * <li>Register {@link SiddhiStream} with field-based StreamSchema and bind with physical source {@link DataStream}</li> + * <li>Define rich-featured Siddhi CEP execution plan with SQL-Like query for SiddhiStreamOperator</li> + * <li>Transform and connect source DataStream to SiddhiStreamOperator</li> + * <li>Register customizable siddhi plugins to extend built-in CEP functions</li> + * </ul> + * </p> + * + * @see SiddhiStream + * @see StreamSchema + * @see SiddhiStreamOperator + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final Map<String, DataStream<?>> dataStreams = new HashMap<>(); + private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>(); + private final Map<String, Class<?>> extensions = new HashMap<>(); + + /** + * @param streamExecutionEnvironment Stream Execution Environment + */ + private SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + } + + /** + * @see DataStream + * @return Siddhi streamId and source DataStream mapping. + */ + public Map<String, DataStream<?>> getDataStreams() { + return this.dataStreams; + } + + /** + * @see SiddhiStreamSchema + * @return Siddhi streamId and stream schema mapping. + */ + public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() { + return this.dataStreamSchemas; + } + + /** + * @param streamId Siddhi streamId to check. + * @return whether the given streamId is defined in current SiddhiCEP environment. + */ + public boolean isStreamDefined(String streamId) { + Preconditions.checkNotNull(streamId,"streamId"); + return dataStreams.containsKey(streamId); + } + + /** + * @return Registered siddhi extensions. + */ + public Map<String, Class<?>> getExtensions() { + return this.extensions; + } + + /** + * Check whether given streamId has been defined, if not, throw {@link UndefinedStreamException} + * @param streamId Siddhi streamId to check. + * @throws UndefinedStreamException throws if given streamId is not defined + */ + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + Preconditions.checkNotNull(streamId,"streamId"); + if (!isStreamDefined(streamId)) { + throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined"); + } + } + + /** + * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema, + * and select as initial source stream to connect to siddhi operator. + * + * @param streamId Unique siddhi streamId + * @param dataStream DataStream to bind to the siddhi stream. + * @param fieldNames Siddhi stream schema field names + * + * @see #registerStream(String, DataStream, String...) + * @see #from(String) + */ + public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> dataStream, String... fieldNames) { + Preconditions.checkNotNull(streamId,"streamId"); + Preconditions.checkNotNull(dataStream,"dataStream"); + Preconditions.checkNotNull(fieldNames,"fieldNames"); + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(dataStream.getExecutionEnvironment()); + return environment.from(streamId, dataStream, fieldNames); + } + + /** + * Register stream with unique <code>streaId</code>, source <code>dataStream</code> and schema fields, + * and select the registered stream as initial stream to connect to Siddhi Runtime. + * + * @see #registerStream(String, DataStream, String...) + * @see #from(String) + */ + public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> dataStream, String... fieldNames) { + Preconditions.checkNotNull(streamId,"streamId"); + Preconditions.checkNotNull(dataStream,"dataStream"); + Preconditions.checkNotNull(fieldNames,"fieldNames"); + this.registerStream(streamId, dataStream, fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + /** + * Select stream by streamId as initial stream to connect to Siddhi Runtime. + * + * @param streamId Siddhi Stream Name + * @param <T> Stream Generic Type + */ + public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId) { + Preconditions.checkNotNull(streamId,"streamId"); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + /** + * Select one stream and union other streams by streamId to connect to Siddhi Stream Operator. + * + * @param firstStreamId First siddhi streamId, which should be predefined in SiddhiCEP context. + * @param unionStreamIds Other siddhi streamIds to union, which should be predefined in SiddhiCEP context. + * + * @return The UnionSiddhiStream Builder + */ + public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId, String... unionStreamIds) { + Preconditions.checkNotNull(firstStreamId,"firstStreamId"); + Preconditions.checkNotNull(unionStreamIds,"unionStreamIds"); + return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId, this).union(unionStreamIds); + } + + /** + * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema. + * + * @param streamId Unique siddhi streamId + * @param dataStream DataStream to bind to the siddhi stream. + * @param fieldNames Siddhi stream schema field names + */ + public <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) { + Preconditions.checkNotNull(streamId,"streamId"); + Preconditions.checkNotNull(dataStream,"dataStream"); + Preconditions.checkNotNull(fieldNames,"fieldNames"); + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + } + + /** + * @return Current StreamExecutionEnvironment. + */ + public StreamExecutionEnvironment getExecutionEnvironment() { + return executionEnvironment; + } + + /** + * Register Siddhi CEP Extensions + * + * @see <a href="https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi">https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi</a> + * @param extensionName Unique siddhi extension name + * @param extensionClass Siddhi Extension class + */ + public void registerExtension(String extensionName, Class<?> extensionClass) { + if (extensions.containsKey(extensionName)) { + throw new IllegalArgumentException("Extension named " + extensionName + " already registered"); + } + extensions.put(extensionName, extensionClass); + } + + /** + * Get registered source DataStream with Siddhi streamId. + * + * @param streamId Siddhi streamId + * @return The source DataStream registered with Siddhi streamId + */ + public <T> DataStream<T> getDataStream(String streamId) { + if (this.dataStreams.containsKey(streamId)) { + return (DataStream<T>) this.dataStreams.get(streamId); + } else { + throw new UndefinedStreamException("Undefined stream " + streamId); + } + } + + /** + * Create new SiddhiCEP instance. + * + * @param streamExecutionEnvironment StreamExecutionEnvironment + * @return New SiddhiCEP instance. + */ + public static SiddhiCEP getSiddhiEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) { + return new SiddhiCEP(streamExecutionEnvironment); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java new file mode 100644 index 0000000..43d7436 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext; +import org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory; +import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Siddhi CEP Stream API + */ +@PublicEvolving +public abstract class SiddhiStream { + private final SiddhiCEP cepEnvironment; + + /** + * @param cepEnvironment SiddhiCEP cepEnvironment. + */ + public SiddhiStream(SiddhiCEP cepEnvironment) { + Preconditions.checkNotNull(cepEnvironment,"SiddhiCEP cepEnvironment is null"); + this.cepEnvironment = cepEnvironment; + } + + /** + * @return current SiddhiCEP cepEnvironment. + */ + protected SiddhiCEP getCepEnvironment() { + return this.cepEnvironment; + } + + /** + * @return Transform SiddhiStream to physical DataStream + */ + protected abstract DataStream<Tuple2<String, Object>> toDataStream(); + + /** + * Convert DataStream<T> to DataStream<Tuple2<String,T>>. + * If it's KeyedStream. pass through original keySelector + */ + protected <T> DataStream<Tuple2<String, Object>> convertDataStream(DataStream<T> dataStream, String streamId) { + final String streamIdInClosure = streamId; + DataStream<Tuple2<String, Object>> resultStream = dataStream.map(new MapFunction<T, Tuple2<String, Object>>() { + @Override + public Tuple2<String, Object> map(T value) throws Exception { + return Tuple2.of(streamIdInClosure, (Object) value); + } + }); + if (dataStream instanceof KeyedStream) { + final KeySelector<T, Object> keySelector = ((KeyedStream<T, Object>) dataStream).getKeySelector(); + final KeySelector<Tuple2<String, Object>, Object> keySelectorInClosure = new KeySelector<Tuple2<String, Object>, Object>() { + @Override + public Object getKey(Tuple2<String, Object> value) throws Exception { + return keySelector.getKey((T) value.f1); + } + }; + return resultStream.keyBy(keySelectorInClosure); + } else { + return resultStream; + } + } + + /** + * ExecutableStream context to define execution logic, i.e. SiddhiCEP execution plan. + */ + public abstract static class ExecutableStream extends SiddhiStream { + public ExecutableStream(SiddhiCEP environment) { + super(environment); + } + + /** + * Siddhi Continuous Query Language (CQL) + * + * @param executionPlan Siddhi SQL-Like execution plan query + * @return ExecutionSiddhiStream context + */ + public ExecutionSiddhiStream cql(String executionPlan) { + Preconditions.checkNotNull(executionPlan,"executionPlan"); + return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, getCepEnvironment()); + } + } + + /** + * Initial Single Siddhi Stream Context + */ + public static class SingleSiddhiStream<T> extends ExecutableStream { + private final String streamId; + + public SingleSiddhiStream(String streamId, SiddhiCEP environment) { + super(environment); + environment.checkStreamDefined(streamId); + this.streamId = streamId; + } + + + /** + * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and as the first stream of {@link UnionSiddhiStream} + * + * @param streamId Unique siddhi streamId + * @param dataStream DataStream to bind to the siddhi stream. + * @param fieldNames Siddhi stream schema field names + * + * @return {@link UnionSiddhiStream} context + */ + public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) { + getCepEnvironment().registerStream(streamId, dataStream, fieldNames); + return union(streamId); + } + + /** + * @param streamIds Defined siddhi streamIds to union + * @return {@link UnionSiddhiStream} context + */ + public UnionSiddhiStream<T> union(String... streamIds) { + Preconditions.checkNotNull(streamIds,"streamIds"); + return new UnionSiddhiStream<T>(this.streamId, Arrays.asList(streamIds), this.getCepEnvironment()); + } + + @Override + protected DataStream<Tuple2<String, Object>> toDataStream() { + return convertDataStream(getCepEnvironment().getDataStream(this.streamId), this.streamId); + } + } + + public static class UnionSiddhiStream<T> extends ExecutableStream { + private String firstStreamId; + private List<String> unionStreamIds; + + public UnionSiddhiStream(String firstStreamId, List<String> unionStreamIds, SiddhiCEP environment) { + super(environment); + Preconditions.checkNotNull(firstStreamId,"firstStreamId"); + Preconditions.checkNotNull(unionStreamIds,"unionStreamIds"); + environment.checkStreamDefined(firstStreamId); + for (String unionStreamId : unionStreamIds) { + environment.checkStreamDefined(unionStreamId); + } + this.firstStreamId = firstStreamId; + this.unionStreamIds = unionStreamIds; + } + + /** + * Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and continue to union it with current stream. + * + * @param streamId Unique siddhi streamId + * @param dataStream DataStream to bind to the siddhi stream. + * @param fieldNames Siddhi stream schema field names + * + * @return {@link UnionSiddhiStream} context + */ + public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) { + Preconditions.checkNotNull(streamId,"streamId"); + Preconditions.checkNotNull(dataStream,"dataStream"); + Preconditions.checkNotNull(fieldNames,"fieldNames"); + getCepEnvironment().registerStream(streamId, dataStream, fieldNames); + return union(streamId); + } + + /** + * @param streamId another defined streamId to union with. + * @return {@link UnionSiddhiStream} context + */ + public UnionSiddhiStream<T> union(String... streamId) { + List<String> newUnionStreamIds = new LinkedList<>(); + newUnionStreamIds.addAll(unionStreamIds); + newUnionStreamIds.addAll(Arrays.asList(streamId)); + return new UnionSiddhiStream<T>(this.firstStreamId, newUnionStreamIds, this.getCepEnvironment()); + } + + @Override + protected DataStream<Tuple2<String, Object>> toDataStream() { + final String localFirstStreamId = firstStreamId; + final List<String> localUnionStreamIds = this.unionStreamIds; + DataStream<Tuple2<String, Object>> dataStream = convertDataStream(getCepEnvironment().<T>getDataStream(localFirstStreamId), this.firstStreamId); + for (String unionStreamId : localUnionStreamIds) { + dataStream = dataStream.union(convertDataStream(getCepEnvironment().<T>getDataStream(unionStreamId), unionStreamId)); + } + return dataStream; + } + } + + public static class ExecutionSiddhiStream { + private final DataStream<Tuple2<String, Object>> dataStream; + private final SiddhiCEP environment; + private final String executionPlan; + + public ExecutionSiddhiStream(DataStream<Tuple2<String, Object>> dataStream, String executionPlan, SiddhiCEP environment) { + this.executionPlan = executionPlan; + this.dataStream = dataStream; + this.environment = environment; + } + + /** + * @param outStreamId The <code>streamId</code> to return as data stream. + * @param <T> Type information should match with stream definition. + * During execution phase, it will automatically build type information based on stream definition. + * @return Return output stream as Tuple + * @see SiddhiTypeFactory + */ + public <T extends Tuple> DataStream<T> returns(String outStreamId) { + SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext(); + siddhiContext.setExecutionPlan(executionPlan); + siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas()); + siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic()); + siddhiContext.setOutputStreamId(outStreamId); + siddhiContext.setExtensions(environment.getExtensions()); + siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig()); + TypeInformation<T> typeInformation = + SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getFinalExecutionPlan(), outStreamId); + siddhiContext.setOutputStreamType(typeInformation); + return returnsInternal(siddhiContext); + } + + /** + * @return Return output stream as <code>DataStream<Map<String,Object>></code>, + * out type is <code>LinkedHashMap<String,Object></code> and guarantee field order + * as defined in siddhi execution plan + * @see java.util.LinkedHashMap + */ + public DataStream<Map<String, Object>> returnAsMap(String outStreamId) { + return this.returnsInternal(outStreamId, SiddhiTypeFactory.getMapTypeInformation()); + } + + /** + * @param outStreamId OutStreamId + * @param outType Output type class + * @param <T> Output type + * @return Return output stream as POJO class. + */ + public <T> DataStream<T> returns(String outStreamId, Class<T> outType) { + TypeInformation<T> typeInformation = TypeExtractor.getForClass(outType); + return returnsInternal(outStreamId, typeInformation); + } + + private <T> DataStream<T> returnsInternal(String outStreamId, TypeInformation<T> typeInformation) { + SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext(); + siddhiContext.setExecutionPlan(executionPlan); + siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas()); + siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic()); + siddhiContext.setOutputStreamId(outStreamId); + siddhiContext.setOutputStreamType(typeInformation); + siddhiContext.setExtensions(environment.getExtensions()); + siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig()); + return returnsInternal(siddhiContext); + } + + private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext) { + return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream); + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java new file mode 100644 index 0000000..f65cc81 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/DuplicatedStreamException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.exception; + +public class DuplicatedStreamException extends RuntimeException { + public DuplicatedStreamException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java new file mode 100644 index 0000000..26254c2 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/exception/UndefinedStreamException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.exception; + +public class UndefinedStreamException extends RuntimeException { + public UndefinedStreamException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java new file mode 100755 index 0000000..8cb6d67 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.operator; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.PriorityQueue; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException; +import org.apache.flink.streaming.siddhi.schema.StreamSchema; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.SiddhiAppRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; + +/** + * <h1>Siddhi Runtime Operator</h1> + * + * A flink Stream Operator to integrate with native siddhi execution runtime, extension and type schema mechanism/ + * + * <ul> + * <li> + * Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle. + * </li> + * <li> + * Connect Flink DataStreams with predefined Siddhi Stream according to unique streamId + * </li> + * <li> + * Convert native {@link StreamRecord} to Siddhi {@link org.wso2.siddhi.core.event.Event} according to {@link StreamSchema}, and send to Siddhi Runtime. + * </li> + * <li> + * Listen output callback event and convert as expected output type according to output {@link org.apache.flink.api.common.typeinfo.TypeInformation}, then output as typed DataStream. + * </li> + * </li> + * <li> + * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) + * </li> + * <li> + * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) + * </li> + * </ul> + * + * @param <IN> Input Element Type + * @param <OUT> Output Element Type + */ +public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<IN, OUT> { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class); + private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; + private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState"; + private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState"; + + private final SiddhiOperatorContext siddhiPlan; + private final String executionExpression; + private final boolean isProcessingTime; + private final Map<String, StreamElementSerializer<IN>> streamRecordSerializers; + + private transient SiddhiManager siddhiManager; + private transient SiddhiAppRuntime siddhiRuntime; + private transient Map<String, InputHandler> inputStreamHandlers; + + // queue to buffer out of order stream records + private transient PriorityQueue<StreamRecord<IN>> priorityQueue; + + private transient ListState<byte[]> siddhiRuntimeState; + private transient ListState<byte[]> queuedRecordsState; + + /** + * @param siddhiPlan Siddhi CEP Execution Plan + */ + public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) { + validate(siddhiPlan); + this.executionExpression = siddhiPlan.getFinalExecutionPlan(); + this.siddhiPlan = siddhiPlan; + this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + this.streamRecordSerializers = new HashMap<>(); + + registerStreamRecordSerializers(); + } + + /** + * Register StreamRecordSerializer based on {@link StreamSchema} + */ + private void registerStreamRecordSerializers() { + for (String streamId : this.siddhiPlan.getInputStreams()) { + streamRecordSerializers.put(streamId, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig())); + } + } + + protected abstract StreamElementSerializer<IN> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig); + + protected StreamElementSerializer<IN> getStreamRecordSerializer(String streamId) { + if (streamRecordSerializers.containsKey(streamId)) { + return streamRecordSerializers.get(streamId); + } else { + throw new UndefinedStreamException("Stream " + streamId + " not defined"); + } + } + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + String streamId = getStreamId(element.getValue()); + StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId); + + if (isProcessingTime) { + processEvent(streamId, schema, element.getValue(), System.currentTimeMillis()); + this.checkpointSiddhiRuntimeState(); + } else { + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + // event time processing + // we have to buffer the elements until we receive the proper watermark + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp())); + } else { + priorityQueue.offer(element); + } + this.checkpointRecordQueueState(); + } + } + + protected abstract void processEvent(String streamId, StreamSchema<IN> schema, IN value, long timestamp) throws Exception; + + @Override + public void processWatermark(Watermark mark) throws Exception { + while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) { + StreamRecord<IN> streamRecord = priorityQueue.poll(); + String streamId = getStreamId(streamRecord.getValue()); + long timestamp = streamRecord.getTimestamp(); + StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId); + processEvent(streamId, schema, streamRecord.getValue(), timestamp); + } + output.emitWatermark(mark); + } + + public abstract String getStreamId(IN record); + + public PriorityQueue<StreamRecord<IN>> getPriorityQueue() { + return priorityQueue; + } + + protected SiddhiAppRuntime getSiddhiRuntime() { + return this.siddhiRuntime; + } + + public InputHandler getSiddhiInputHandler(String streamId) { + return inputStreamHandlers.get(streamId); + } + + protected SiddhiOperatorContext getSiddhiPlan() { + return this.siddhiPlan; + } + + @Override + public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { + super.setup(containingTask, config, output); + if (priorityQueue == null) { + priorityQueue = new PriorityQueue<>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>()); + } + startSiddhiRuntime(); + } + + /** + * Send input data to siddhi runtime + */ + protected void send(String streamId, Object[] data, long timestamp) throws InterruptedException { + this.getSiddhiInputHandler(streamId).send(timestamp, data); + } + + /** + * Validate execution plan during building DAG before submitting to execution environment and fail-fast. + */ + private static void validate(final SiddhiOperatorContext siddhiPlan) { + SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager(); + try { + siddhiManager.validateSiddhiApp(siddhiPlan.getFinalExecutionPlan()); + } finally { + siddhiManager.shutdown(); + } + } + + /** + * Create and start execution runtime + */ + private void startSiddhiRuntime() { + if (this.siddhiRuntime == null) { + this.siddhiManager = this.siddhiPlan.createSiddhiManager(); + for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) { + this.siddhiManager.setExtension(entry.getKey(), entry.getValue()); + } + this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(executionExpression); + this.siddhiRuntime.start(); + registerInputAndOutput(this.siddhiRuntime); + LOGGER.info("Siddhi {} started", siddhiRuntime.getName()); + } else { + throw new IllegalStateException("Siddhi has already been initialized"); + } + } + + + private void shutdownSiddhiRuntime() { + if (this.siddhiRuntime != null) { + this.siddhiRuntime.shutdown(); + LOGGER.info("Siddhi {} shutdown", this.siddhiRuntime.getName()); + this.siddhiRuntime = null; + this.siddhiManager.shutdown(); + this.siddhiManager = null; + this.inputStreamHandlers = null; + } else { + throw new IllegalStateException("Siddhi has already shutdown"); + } + } + + @SuppressWarnings("unchecked") + private void registerInputAndOutput(SiddhiAppRuntime runtime) { + AbstractDefinition definition = this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId()); + runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler<>(this.siddhiPlan.getOutputStreamType(), definition, this.output)); + inputStreamHandlers = new HashMap<>(); + for (String inputStreamId : this.siddhiPlan.getInputStreams()) { + inputStreamHandlers.put(inputStreamId, runtime.getInputHandler(inputStreamId)); + } + } + + @Override + public void dispose() throws Exception { + shutdownSiddhiRuntime(); + this.siddhiRuntimeState.clear(); + super.dispose(); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + checkpointSiddhiRuntimeState(); + checkpointRecordQueueState(); + } + + private void restoreState() throws Exception { + LOGGER.info("Restore siddhi state"); + final Iterator<byte[]> siddhiState = siddhiRuntimeState.get().iterator(); + if (siddhiState.hasNext()) { + this.siddhiRuntime.restore(siddhiState.next()); + } + + LOGGER.info("Restore queued records state"); + final Iterator<byte[]> queueState = queuedRecordsState.get().iterator(); + if (queueState.hasNext()) { + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(queueState.next()); + final DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper(byteArrayInputStream); + try { + this.priorityQueue = restoreQueuerState(dataInputView); + } finally { + dataInputView.close(); + byteArrayInputStream.close(); + } + } + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + if (siddhiRuntimeState == null) { + siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(SIDDHI_RUNTIME_STATE_NAME, + new BytePrimitiveArraySerializer())); + } + if (queuedRecordsState == null) { + queuedRecordsState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>(QUEUED_RECORDS_STATE_NAME, new BytePrimitiveArraySerializer())); + } + if (context.isRestored()) { + restoreState(); + } + } + + + private void checkpointSiddhiRuntimeState() throws Exception { + this.siddhiRuntimeState.clear(); + this.siddhiRuntimeState.add(this.siddhiRuntime.snapshot()); + this.queuedRecordsState.clear(); + } + + private void checkpointRecordQueueState() throws Exception { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper(byteArrayOutputStream); + try { + snapshotQueueState(this.priorityQueue, dataOutputView); + this.queuedRecordsState.clear(); + this.queuedRecordsState.add(byteArrayOutputStream.toByteArray()); + } finally { + dataOutputView.close(); + byteArrayOutputStream.close(); + } + } + + protected abstract void snapshotQueueState(PriorityQueue<StreamRecord<IN>> queue, DataOutputView dataOutputView) throws IOException; + + protected abstract PriorityQueue<StreamRecord<IN>> restoreQueuerState(DataInputView dataInputView) throws IOException; +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java new file mode 100644 index 0000000..f760938 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException; +import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.siddhi.schema.StreamSchema; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.util.Preconditions; +import org.wso2.siddhi.core.SiddhiManager; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * SiddhiCEP Operator Context Metadata including input/output stream (streamId, TypeInformation) as well execution plan query, + * and execution environment context like TimeCharacteristic and ExecutionConfig. + */ +public class SiddhiOperatorContext implements Serializable { + private ExecutionConfig executionConfig; + private Map<String, SiddhiStreamSchema<?>> inputStreamSchemas; + private final Map<String, Class<?>> siddhiExtensions; + private String outputStreamId; + private TypeInformation outputStreamType; + private TimeCharacteristic timeCharacteristic; + private String name; + private String executionPlan; + + public SiddhiOperatorContext() { + inputStreamSchemas = new HashMap<>(); + siddhiExtensions = new HashMap<>(); + } + + /** + * @param extensions siddhi extensions to register + */ + public void setExtensions(Map<String, Class<?>> extensions) { + Preconditions.checkNotNull(extensions,"extensions"); + siddhiExtensions.putAll(extensions); + } + + /** + * @return registered siddhi extensions + */ + public Map<String, Class<?>> getExtensions() { + return siddhiExtensions; + } + + /** + * @return Siddhi Stream Operator Name in format of "Siddhi: execution query ... (query length)" + */ + public String getName() { + if (this.name == null) { + if (executionPlan.length() > 100) { + return String.format("Siddhi: %s ... (%s)", executionPlan.substring(0, 100), executionPlan.length() - 100); + } else { + return String.format("Siddhi: %s", executionPlan); + } + } else { + return this.name; + } + } + + /** + * @return Source siddhi stream IDs + */ + public List<String> getInputStreams() { + Object[] keys = this.inputStreamSchemas.keySet().toArray(); + List<String> result = new ArrayList<>(keys.length); + for (Object key : keys) { + result.add((String) key); + } + return result; + } + + /** + * @return Siddhi CEP cql-like execution plan + */ + public String getExecutionPlan() { + return executionPlan; + } + + /** + * Stream definition + execution expression + */ + public String getFinalExecutionPlan() { + Preconditions.checkNotNull(executionPlan, "Execution plan is not set"); + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, SiddhiStreamSchema<?>> entry : inputStreamSchemas.entrySet()) { + sb.append(entry.getValue().getStreamDefinitionExpression(entry.getKey())); + } + sb.append(this.getExecutionPlan()); + return sb.toString(); + } + + /** + * @return Siddhi Stream Operator output type information + */ + public TypeInformation getOutputStreamType() { + return outputStreamType; + } + + /** + * @return Siddhi output streamId for callback + */ + public String getOutputStreamId() { + return outputStreamId; + } + + /** + * @param inputStreamId Siddhi streamId + * @return StreamSchema for given siddhi streamId + * + * @throws UndefinedStreamException throws if stream is not defined + */ + @SuppressWarnings("unchecked") + public <IN> StreamSchema<IN> getInputStreamSchema(String inputStreamId) { + Preconditions.checkNotNull(inputStreamId,"inputStreamId"); + + if (!inputStreamSchemas.containsKey(inputStreamId)) { + throw new UndefinedStreamException("Input stream: " + inputStreamId + " is not found"); + } + return (StreamSchema<IN>) inputStreamSchemas.get(inputStreamId); + } + + /** + * @param outputStreamId Siddhi output streamId, which must exist in siddhi execution plan + */ + public void setOutputStreamId(String outputStreamId) { + Preconditions.checkNotNull(outputStreamId,"outputStreamId"); + this.outputStreamId = outputStreamId; + } + + /** + * @param outputStreamType Output stream TypeInformation + */ + public void setOutputStreamType(TypeInformation outputStreamType) { + Preconditions.checkNotNull(outputStreamType,"outputStreamType"); + this.outputStreamType = outputStreamType; + } + + /** + * @return Returns execution environment TimeCharacteristic + */ + public TimeCharacteristic getTimeCharacteristic() { + return timeCharacteristic; + } + + public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { + Preconditions.checkNotNull(timeCharacteristic,"timeCharacteristic"); + this.timeCharacteristic = timeCharacteristic; + } + + /** + * @param executionPlan Siddhi SQL-Like exeuction plan query + */ + public void setExecutionPlan(String executionPlan) { + Preconditions.checkNotNull(executionPlan,"executionPlan"); + this.executionPlan = executionPlan; + } + + /** + * @return Returns input stream ID and schema mapping + */ + public Map<String, SiddhiStreamSchema<?>> getInputStreamSchemas() { + return inputStreamSchemas; + } + + /** + * @param inputStreamSchemas input stream ID and schema mapping + */ + public void setInputStreamSchemas(Map<String, SiddhiStreamSchema<?>> inputStreamSchemas) { + Preconditions.checkNotNull(inputStreamSchemas,"inputStreamSchemas"); + this.inputStreamSchemas = inputStreamSchemas; + } + + public void setName(String name) { + Preconditions.checkNotNull(name,"name"); + this.name = name; + } + + /** + * @return Created new SiddhiManager instance with registered siddhi extensions + */ + public SiddhiManager createSiddhiManager() { + SiddhiManager siddhiManager = new SiddhiManager(); + for (Map.Entry<String, Class<?>> entry : getExtensions().entrySet()) { + siddhiManager.setExtension(entry.getKey(), entry.getValue()); + } + return siddhiManager; + } + + /** + * @return StreamExecutionEnvironment ExecutionConfig + */ + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + /** + * @param executionConfig StreamExecutionEnvironment ExecutionConfig + */ + public void setExecutionConfig(ExecutionConfig executionConfig) { + Preconditions.checkNotNull(executionConfig,"executionConfig"); + this.executionConfig = executionConfig; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java new file mode 100755 index 0000000..5c54ad8 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.operator; + +import java.io.IOException; +import java.util.PriorityQueue; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.siddhi.schema.StreamSchema; +import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * Wrap input event in generic type of <code>IN</code> as Tuple2<String,IN> + */ +public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> { + + public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan) { + super(siddhiPlan); + } + + @Override + protected StreamElementSerializer<Tuple2<String, IN>> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig) { + TypeInformation<Tuple2<String, IN>> tuple2TypeInformation = SiddhiTypeFactory.getStreamTupleTypeInformation((TypeInformation<IN>) streamSchema.getTypeInfo()); + return new StreamElementSerializer<>(tuple2TypeInformation.createSerializer(executionConfig)); + } + + @Override + protected void processEvent(String streamId, StreamSchema<Tuple2<String, IN>> schema, Tuple2<String, IN> value, long timestamp) throws InterruptedException { + send(value.f0, getSiddhiPlan().getInputStreamSchema(value.f0).getStreamSerializer().getRow(value.f1), timestamp); + } + + @Override + public String getStreamId(Tuple2<String, IN> record) { + return record.f0; + } + + @Override + protected void snapshotQueueState(PriorityQueue<StreamRecord<Tuple2<String, IN>>> queue, DataOutputView dataOutputView) throws IOException { + dataOutputView.writeInt(queue.size()); + for (StreamRecord<Tuple2<String, IN>> record : queue) { + String streamId = record.getValue().f0; + dataOutputView.writeUTF(streamId); + this.getStreamRecordSerializer(streamId).serialize(record, dataOutputView); + } + } + + @Override + protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException { + int sizeOfQueue = dataInputView.readInt(); + PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<>(sizeOfQueue); + for (int i = 0; i < sizeOfQueue; i++) { + String streamId = dataInputView.readUTF(); + StreamElement streamElement = getStreamRecordSerializer(streamId).deserialize(dataInputView); + priorityQueue.offer(streamElement.<Tuple2<String, IN>>asRecord()); + } + return priorityQueue; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java new file mode 100755 index 0000000..7af37ce --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.operator; + +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.stream.output.StreamCallback; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type, + * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition} + */ +public class StreamInMemOutputHandler<R> extends StreamCallback { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamInMemOutputHandler.class); + + private final AbstractDefinition definition; + private final TypeInformation<R> typeInfo; + private final ObjectMapper objectMapper; + + + private final LinkedList<StreamRecord<R>> collectedRecords; + + public StreamInMemOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition) { + this.typeInfo = typeInfo; + this.definition = definition; + this.objectMapper = new ObjectMapper(); + this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + this.collectedRecords = new LinkedList<>(); + } + + @Override + public void receive(Event[] events) { + for (Event event : events) { + if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) { + collectedRecords.add(new StreamRecord<R>((R) toMap(event), event.getTimestamp())); + } else if (typeInfo.isTupleType()) { + Tuple tuple = this.toTuple(event); + collectedRecords.add(new StreamRecord<R>((R) tuple, event.getTimestamp())); + } else if (typeInfo instanceof PojoTypeInfo) { + R obj; + try { + obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass()); + } catch (IllegalArgumentException ex) { + LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex); + throw ex; + } + collectedRecords.add(new StreamRecord<R>(obj, event.getTimestamp())); + } else { + throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo); + } + } + } + + + @Override + public synchronized void stopProcessing() { + super.stopProcessing(); + this.collectedRecords.clear(); + } + + private Map<String, Object> toMap(Event event) { + Map<String, Object> map = new LinkedHashMap<>(); + for (int i = 0; i < definition.getAttributeNameArray().length; i++) { + map.put(definition.getAttributeNameArray()[i], event.getData(i)); + } + return map; + } + + private <T extends Tuple> T toTuple(Event event) { + return SiddhiTupleFactory.newTuple(event.getData()); + } + + public LinkedList<StreamRecord<R>> getCollectedRecords() { + return collectedRecords; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java new file mode 100755 index 0000000..8840dac --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.operator; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.stream.output.StreamCallback; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type, + * according to output {@link TypeInformation} and siddhi schema {@link AbstractDefinition} + */ +public class StreamOutputHandler<R> extends StreamCallback { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamOutputHandler.class); + + private final AbstractDefinition definition; + private final Output<StreamRecord<R>> output; + private final TypeInformation<R> typeInfo; + private final ObjectMapper objectMapper; + + public StreamOutputHandler(TypeInformation<R> typeInfo, AbstractDefinition definition, Output<StreamRecord<R>> output) { + this.typeInfo = typeInfo; + this.definition = definition; + this.output = output; + this.objectMapper = new ObjectMapper(); + this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public void receive(Event[] events) { + StreamRecord<R> reusableRecord = new StreamRecord<>(null, 0L); + for (Event event : events) { + if (typeInfo == null || Map.class.isAssignableFrom(typeInfo.getTypeClass())) { + reusableRecord.replace(toMap(event), event.getTimestamp()); + output.collect(reusableRecord); + } else if (typeInfo.isTupleType()) { + Tuple tuple = this.toTuple(event); + reusableRecord.replace(tuple, event.getTimestamp()); + output.collect(reusableRecord); + } else if (typeInfo instanceof PojoTypeInfo) { + R obj; + try { + obj = objectMapper.convertValue(toMap(event), typeInfo.getTypeClass()); + } catch (IllegalArgumentException ex) { + LOGGER.error("Failed to map event: " + event + " into type: " + typeInfo, ex); + throw ex; + } + reusableRecord.replace(obj, event.getTimestamp()); + output.collect(reusableRecord); + } else { + throw new IllegalArgumentException("Unable to format " + event + " as type " + typeInfo); + } + } + } + + + @Override + public synchronized void stopProcessing() { + super.stopProcessing(); + } + + private Map<String, Object> toMap(Event event) { + Map<String, Object> map = new LinkedHashMap<>(); + for (int i = 0; i < definition.getAttributeNameArray().length; i++) { + map.put(definition.getAttributeNameArray()[i], event.getData(i)); + } + return map; + } + + private <T extends Tuple> T toTuple(Event event) { + return SiddhiTupleFactory.newTuple(event.getData()); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java new file mode 100644 index 0000000..049681c --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamRecordComparator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.operator; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * Stream Record Timestamp Comparator + */ +public class StreamRecordComparator<IN> implements Comparator<StreamRecord<IN>>, Serializable { + private static final long serialVersionUID = 1581054988433915305L; + + @Override + public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) { + if (o1.getTimestamp() < o2.getTimestamp()) { + return -1; + } else if (o1.getTimestamp() > o2.getTimestamp()) { + return 1; + } else { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java new file mode 100644 index 0000000..6b1ceae --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/package-info.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <h1> Features </h1> + * <ul> + * <li> + * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like + * <ul> + * <li>Filter</li> + * <li>Join</li> + * <li>Aggregation</li> + * <li>Group by</li> + * <li>Having</li> + * <li>Window</li> + * <li>Conditions and Expressions</li> + * <li>Pattern processing</li> + * <li>Sequence processing</li> + * <li>Event Tables</li> + * <li>...</li> + * </ul> + * </li> + * <li> + * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) + * <ul> + * <li>Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.</li> + * <li>Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan</li> + * <li>Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema</li> + * </ul> + * </li> + * <li> + * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) + * </li> + * <li> + * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) + * </li> + * </ul> + * <p/> + * <h1>Example</h1> + * <pre> + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); + * + * cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); + * + * cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); + * cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); + * + * DataStream<Tuple4<Integer,String,Integer,String>> output = cep + * .from("inputStream1").union("inputStream2") + * .cql( + * "from every s1 = inputStream1[id == 2] " + * + " -> s2 = inputStream2[id == 3] " + * + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 " + * + "insert into outputStream" + * ) + * .returns("outputStream"); + * + * env.execute(); + * </pre> + * + * @see <a href="https://github.com/wso2/siddhi">https://github.com/wso2/siddhi</a> + */ +package org.apache.flink.streaming.siddhi; http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2f47eedc/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java new file mode 100644 index 0000000..2a3a04c --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.schema; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory; +import org.apache.flink.util.Preconditions; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.StreamDefinition; + +import java.util.ArrayList; +import java.util.List; + +/** + * Siddhi specific Stream Schema. + * + * @param <T> Siddhi stream element type + */ +public class SiddhiStreamSchema<T> extends StreamSchema<T> { + private static final String DEFINE_STREAM_TEMPLATE = "define stream %s (%s);"; + + public SiddhiStreamSchema(TypeInformation<T> typeInfo, String... fieldNames) { + super(typeInfo, fieldNames); + } + + public SiddhiStreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) { + super(typeInfo, fieldIndexes, fieldNames); + } + + public StreamDefinition getStreamDefinition(String streamId) { + StreamDefinition streamDefinition = StreamDefinition.id(streamId); + for (int i = 0; i < getFieldNames().length; i++) { + streamDefinition.attribute(getFieldNames()[i], SiddhiTypeFactory.getAttributeType(getFieldTypes()[i])); + } + return streamDefinition; + } + + public String getStreamDefinitionExpression(StreamDefinition streamDefinition) { + List<String> columns = new ArrayList<>(); + Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null"); + for (Attribute attribute : streamDefinition.getAttributeList()) { + columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase())); + } + return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ",")); + } + + public String getStreamDefinitionExpression(String streamId) { + StreamDefinition streamDefinition = getStreamDefinition(streamId); + List<String> columns = new ArrayList<>(); + Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null"); + for (Attribute attribute : streamDefinition.getAttributeList()) { + columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase())); + } + return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ",")); + } +}
