[
https://issues.apache.org/jira/browse/BAHIR-144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260233#comment-16260233
]
ASF GitHub Bot commented on BAHIR-144:
--------------------------------------
Github user haoch commented on a diff in the pull request:
https://github.com/apache/bahir-flink/pull/22#discussion_r152175023
--- Diff:
flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
---
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import
org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import
org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.apache.flink.streaming.siddhi.source.RandomEventSource;
+import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
+import org.apache.flink.streaming.siddhi.source.RandomWordSource;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Flink-siddhi library integration test cases
+ */
+public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase
implements Serializable {
+
+ @Rule
+ public transient TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testSimpleWriteAndRead() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.fromElements(
+ Event.of(1, "start", 1.0),
+ Event.of(2, "middle", 2.0),
+ Event.of(3, "end", 3.0),
+ Event.of(4, "start", 4.0),
+ Event.of(5, "middle", 5.0),
+ Event.of(6, "end", 6.0)
+ );
+
+ String path = tempFolder.newFile().toURI().toString();
+ input.transform("transformer", TypeInformation.of(Event.class),
new StreamMap<>(new MapFunction<Event, Event>() {
+ @Override
+ public Event map(Event event) throws Exception {
+ return event;
+ }
+ })).writeAsText(path);
+ env.execute();
+ Assert.assertEquals(6, getLineCount(path));
+ }
+
+ @Test
+ public void testSimplePojoStreamAndReturnPojo() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.fromElements(
+ Event.of(1, "start", 1.0),
+ Event.of(2, "middle", 2.0),
+ Event.of(3, "end", 3.0),
+ Event.of(4, "start", 4.0),
+ Event.of(5, "middle", 5.0),
+ Event.of(6, "end", 6.0)
+ );
+
+ DataStream<Event> output = SiddhiCEP
+ .define("inputStream", input, "id", "name", "price")
+ .cql("from inputStream insert into outputStream")
+ .returns("outputStream", Event.class);
+ String path = tempFolder.newFile().toURI().toString();
+ output.print();
+ env.execute();
+ // Assert.assertEquals(6, getLineCount(path));
--- End diff --
Resolved
> Add Siddhi CEP integration with Flink streaming
> -----------------------------------------------
>
> Key: BAHIR-144
> URL: https://issues.apache.org/jira/browse/BAHIR-144
> Project: Bahir
> Issue Type: New Feature
> Components: Flink Streaming Connectors
> Reporter: Hao Chen
>
> Moved from:
> * https://issues.apache.org/jira/browse/FLINK-4520
> * https://github.com/apache/flink/pull/2487
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event
> Processing Engine (CEP) released as a Java Library under `Apache Software
> License v2.0`. Siddhi CEP processes events which are generated by various
> event sources, analyses them and notifies appropriate complex events
> according to the user specified queries.
> **It would be very helpful for flink users (especially streaming application
> developer) to provide a library to run Siddhi CEP query directly in Flink
> streaming application.**
> # Features
> - Integrate Siddhi CEP as an stream operator (i.e.
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> - Filter
> - Join
> - Aggregation
> - Group by
> - Having
> - Window
> - Conditions and Expressions
> - Pattern processing
> - Sequence processing
> - Event Tables
> ...
> - Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See
> `SiddhiCEP` and `SiddhiStream`)
> - Register Flink DataStream associating native type information with Siddhi
> Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> - Connect with single or multiple Flink DataStreams with Siddhi CEP
> Execution Plan
> - Return output stream as DataStream with type intelligently inferred from
> Siddhi Stream Schema
> - Integrate siddhi runtime state management with Flink state (See
> `AbstractSiddhiOperator`)
> - Support siddhi plugin management to extend CEP functions. (See
> `SiddhiCEP#registerExtension`)
> # Test Cases
> - [`org.apache.flink.contrib.siddhi.
> SiddhiCEPITCase`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)
> # Example
> ```
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
> cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
> cep.registerStream("inputStream1", input1, "id", "name",
> "price","timestamp");
> cep.registerStream("inputStream2", input2, "id", "name",
> "price","timestamp");
> DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
> .from("inputStream1").union("inputStream2")
> .sql(
> "from every s1 = inputStream1[id == 2] "
> + " -> s2 = inputStream2[id == 3] "
> + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name
> as name_2 , custom:plus(s1.price,s2.price) as price"
> + "insert into outputStream"
> )
> .returns("outputStream");
> env.execute();
> ```
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)