http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java deleted file mode 100644 index a84b5dc..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization; - -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.serialization.impl.*; - -import java.util.HashMap; -import java.util.Map; - -public class Serializers { - private static final Map<StreamColumn.Type, Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>(); - - public static <T> void register(StreamColumn.Type type, Serializer<T> serializer) { - if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) { - throw new IllegalArgumentException("Duplicated column type: " + type); - } - COLUMN_TYPE_SER_MAPPING.put(type, serializer); - } - - @SuppressWarnings("unchecked") - public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type) { - if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) { - return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type); - } else { - throw new IllegalArgumentException("Serializer of type: " + type + " not found"); - } - } - - public static PartitionedEventSerializer newPartitionedEventSerializer(SerializationMetadataProvider metadataProvider) { - return new PartitionedEventSerializerImpl(metadataProvider); - } - - static { - register(StreamColumn.Type.STRING, new StringSerializer()); - register(StreamColumn.Type.INT, new IntegerSerializer()); - register(StreamColumn.Type.LONG, new LongSerializer()); - register(StreamColumn.Type.FLOAT, new FloatSerializer()); - register(StreamColumn.Type.DOUBLE, new DoubleSerializer()); - register(StreamColumn.Type.BOOL, new BooleanSerializer()); - register(StreamColumn.Type.OBJECT, new JavaObjectSerializer()); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java deleted file mode 100644 index 1e90569..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.serialization.Serializer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class BooleanSerializer implements Serializer<Boolean> { - @Override - public void serialize(Boolean value, DataOutput dataOutput) throws IOException { - dataOutput.writeBoolean(value); - } - - @Override - public Boolean deserialize(DataInput dataInput) throws IOException { - return dataInput.readBoolean(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java deleted file mode 100644 index df56124..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.serialization.Serializer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class DoubleSerializer implements Serializer<Object> { - @Override - public void serialize(Object value, DataOutput dataOutput) throws IOException { - if (value instanceof Number) { - value = ((Number)value).doubleValue(); - } - dataOutput.writeDouble((double)value); - } - - @Override - public Object deserialize(DataInput dataInput) throws IOException { - return dataInput.readDouble(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java deleted file mode 100644 index 0ae48e3..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.serialization.Serializer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class FloatSerializer implements Serializer<Object> { - @Override - public void serialize(Object value, DataOutput dataOutput) throws IOException { - if (value instanceof Number) { - value = ((Number)value).floatValue(); - } - dataOutput.writeFloat((float)value); - } - - @Override - public Object deserialize(DataInput dataInput) throws IOException { - return dataInput.readFloat(); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java deleted file mode 100644 index b698167..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.serialization.Serializer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class IntegerSerializer implements Serializer<Object> { - @Override - public void serialize(Object value, DataOutput dataOutput) throws IOException { - if (value instanceof Number) { - value = ((Number) value).intValue(); - } - dataOutput.writeInt((int) value); - } - - @Override - public Object deserialize(DataInput dataInput) throws IOException { - return dataInput.readInt(); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java deleted file mode 100644 index 14d9ea5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.serialization.Serializer; -import org.apache.commons.lang3.SerializationUtils; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; - -public class JavaObjectSerializer implements Serializer<Object> { - @Override - public void serialize(Object value, DataOutput dataOutput) throws IOException { - byte[] bytes = SerializationUtils.serialize((Serializable) value); - dataOutput.writeInt(bytes.length); - dataOutput.write(bytes); - } - - @Override - public Object deserialize(DataInput dataInput) throws IOException { - int len = dataInput.readInt(); - byte[] bytes = new byte[len]; - dataInput.readFully(bytes); - return SerializationUtils.deserialize(bytes); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java deleted file mode 100644 index efe7e3a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.serialization.Serializer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -public class LongSerializer implements Serializer<Object> { - @Override - public void serialize(Object value, DataOutput dataOutput) throws IOException { - if (value instanceof Number) { - value = ((Number) value).longValue(); - } - dataOutput.writeLong((long) value); - } - - @Override - public Long deserialize(DataInput dataInput) throws IOException { - return dataInput.readLong(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java deleted file mode 100644 index 2b0140f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; -import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; -import org.apache.eagle.alert.engine.serialization.Serializer; -import org.apache.eagle.alert.engine.utils.CompressionUtils; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * Stream Metadata Cached Serializer - * - * <p> Performance: - * - * 1) VS Kryo Direct: reduce 73.4% space (bytes) and 42.5 % time (ms). - * 2) VS Java Native: reduce 92.5% space (bytes) and 94.2% time (ms) - * </p> - * - * <p>Tips: - * 1) Without-compression performs better than with compression for small event - * </p> - * - * <p>TODO: Cache Partition would save little space but almost half of serialization time, how to balance the performance?</p> - * - * @see PartitionedEvent - */ -public class PartitionedEventSerializerImpl implements Serializer<PartitionedEvent>, PartitionedEventSerializer { - private final StreamEventSerializer streamEventSerializer; - private final Serializer<StreamPartition> streamPartitionSerializer; - private final boolean compress; - - /** - * @param serializationMetadataProvider metadata provider. - * @param compress false by default. - */ - public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider, boolean compress) { - this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider); - this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE; - this.compress = compress; - } - - public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider) { - this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider); - this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE; - this.compress = false; - } - - @Override - public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException { - dataOutput.writeLong(entity.getPartitionKey()); - streamEventSerializer.serialize(entity.getEvent(), dataOutput); - streamPartitionSerializer.serialize(entity.getPartition(), dataOutput); - } - - @Override - public byte[] serialize(PartitionedEvent entity) throws IOException { - ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput(); - this.serialize(entity, dataOutput); - return compress ? CompressionUtils.compress(dataOutput.toByteArray()) : dataOutput.toByteArray(); - } - - @Override - public PartitionedEvent deserialize(DataInput dataInput) throws IOException { - PartitionedEvent event = new PartitionedEvent(); - event.setPartitionKey(dataInput.readLong()); - StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput); - StreamPartition partition = streamPartitionSerializer.deserialize(dataInput); - event.setEvent(streamEvent); - partition.setStreamId(streamEvent.getStreamId()); - event.setPartition(partition); - return event; - } - - - @Override - public PartitionedEvent deserialize(byte[] bytes) throws IOException { - return this.deserialize(ByteStreams.newDataInput(compress ? CompressionUtils.decompress(bytes) : bytes)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java deleted file mode 100644 index 8ffcb83..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; -import org.apache.eagle.alert.engine.serialization.Serializer; -import org.apache.eagle.alert.engine.serialization.Serializers; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.BitSet; - -/** - * StreamEventSerializer. - * - * @see StreamEvent - */ -public class StreamEventSerializer implements Serializer<StreamEvent> { - private final SerializationMetadataProvider serializationMetadataProvider; - - public StreamEventSerializer(SerializationMetadataProvider serializationMetadataProvider) { - this.serializationMetadataProvider = serializationMetadataProvider; - } - - private BitSet isNullBitSet(Object[] objects) { - BitSet bitSet = new BitSet(); - int i = 0; - for (Object obj : objects) { - bitSet.set(i, obj == null); - i++; - } - return bitSet; - } - - @Override - public void serialize(StreamEvent event, DataOutput dataOutput) throws IOException { - // Bryant: here "metaVersion/streamId" writes to dataOutputUTF - String metaVersion = event.getMetaVersion(); - String streamId = event.getStreamId(); - String metaVersionStreamId = String.format("%s/%s", metaVersion, streamId); - - dataOutput.writeUTF(metaVersionStreamId); - dataOutput.writeLong(event.getTimestamp()); - if (event.getData() == null || event.getData().length == 0) { - dataOutput.writeInt(0); - } else { - BitSet isNullIndex = isNullBitSet(event.getData()); - byte[] isNullBytes = isNullIndex.toByteArray(); - dataOutput.writeInt(isNullBytes.length); - dataOutput.write(isNullBytes); - int i = 0; - StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId()); - if (definition == null) { - throw new IOException("StreamDefinition not found: " + event.getStreamId()); - } - if (event.getData().length != definition.getColumns().size()) { - throw new IOException("Event :" + event + " doesn't match with schema: " + definition); - } - for (StreamColumn column : definition.getColumns()) { - if (!isNullIndex.get(i)) { - Serializers.getColumnSerializer(column.getType()).serialize(event.getData()[i], dataOutput); - } - i++; - } - } - } - - @Override - public StreamEvent deserialize(DataInput dataInput) throws IOException { - StreamEvent event = new StreamEvent(); - String metaVersionStreamId = dataInput.readUTF(); - String streamId = metaVersionStreamId.split("/")[1]; - String metaVersion = metaVersionStreamId.split("/")[0]; - // sometimes metaVersionStreamId will be "null/id", then metaVersion will be "null" rather than null - // need to handle it for future use - if (metaVersion.equals("null")) { - metaVersion = null; - } - - event.setStreamId(streamId); - event.setMetaVersion(metaVersion); - - StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId()); - event.setTimestamp(dataInput.readLong()); - int isNullBytesLen = dataInput.readInt(); - byte[] isNullBytes = new byte[isNullBytesLen]; - dataInput.readFully(isNullBytes); - BitSet isNullIndex = BitSet.valueOf(isNullBytes); - Object[] attributes = new Object[definition.getColumns().size()]; - int i = 0; - for (StreamColumn column : definition.getColumns()) { - if (!isNullIndex.get(i)) { - attributes[i] = Serializers.getColumnSerializer(column.getType()).deserialize(dataInput); - } - i++; - } - event.setData(attributes); - return event; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java deleted file mode 100644 index 6a47f1e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.serialization.Serializer; - -import java.io.*; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -/** - * Don't serialize streamId. - * - * @see StreamPartition - */ -public class StreamPartitionDigestSerializer implements Serializer<StreamPartition> { - public static final StreamPartitionDigestSerializer INSTANCE = new StreamPartitionDigestSerializer(); - - private final Map<DigestBytes, StreamPartition> checkSumPartitionMap = new HashMap<>(); - private final Map<StreamPartition, DigestBytes> partitionCheckSumMap = new HashMap<>(); - - @Override - public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException { - DigestBytes checkSum = partitionCheckSumMap.get(partition); - if (checkSum == null) { - try { - checkSum = digestCheckSum(partition); - partitionCheckSumMap.put(partition, checkSum); - checkSumPartitionMap.put(checkSum, partition); - } catch (NoSuchAlgorithmException e) { - throw new IOException(e); - } - } - dataOutput.writeInt(checkSum.size()); - dataOutput.write(checkSum.toByteArray()); - } - - @Override - public StreamPartition deserialize(DataInput dataInput) throws IOException { - int checkSumLen = dataInput.readInt(); - byte[] checksum = new byte[checkSumLen]; - dataInput.readFully(checksum); - StreamPartition partition = checkSumPartitionMap.get(new DigestBytes(checksum)); - if (partition == null) { - throw new IOException("Illegal partition checksum: " + checksum); - } - return partition; - } - - private class DigestBytes { - private final byte[] data; - - public DigestBytes(byte[] bytes) { - this.data = bytes; - } - - @Override - public boolean equals(Object other) { - return other instanceof DigestBytes && Arrays.equals(data, ((DigestBytes) other).data); - } - - @Override - public int hashCode() { - return Arrays.hashCode(data); - } - - public int size() { - return data.length; - } - - public byte[] toByteArray() { - return data; - } - } - - private DigestBytes digestCheckSum(Object obj) throws IOException, NoSuchAlgorithmException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(obj); - oos.close(); - MessageDigest m = MessageDigest.getInstance("SHA1"); - m.update(baos.toByteArray()); - return new DigestBytes(m.digest()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java deleted file mode 100644 index 411368f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.serialization.Serializer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * Don't serialize streamId. - * - * @see StreamPartition - */ -public class StreamPartitionSerializer implements Serializer<StreamPartition> { - public static final StreamPartitionSerializer INSTANCE = new StreamPartitionSerializer(); - - @Override - public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException { - dataOutput.writeUTF(partition.getType().toString()); - if (partition.getColumns() == null || partition.getColumns().size() == 0) { - dataOutput.writeInt(0); - } else { - dataOutput.writeInt(partition.getColumns().size()); - for (String column : partition.getColumns()) { - dataOutput.writeUTF(column); - } - } - if (partition.getSortSpec() == null) { - dataOutput.writeByte(0); - } else { - dataOutput.writeByte(1); - dataOutput.writeUTF(partition.getSortSpec().getWindowPeriod()); - dataOutput.writeInt(partition.getSortSpec().getWindowMargin()); - } - } - - @Override - public StreamPartition deserialize(DataInput dataInput) throws IOException { - StreamPartition partition = new StreamPartition(); - partition.setType(StreamPartition.Type.locate(dataInput.readUTF())); - int colSize = dataInput.readInt(); - if (colSize > 0) { - List<String> columns = new ArrayList<>(colSize); - for (int i = 0; i < colSize; i++) { - columns.add(dataInput.readUTF()); - } - partition.setColumns(columns); - } - if (dataInput.readByte() == 1) { - String period = dataInput.readUTF(); - int margin = dataInput.readInt(); - - StreamSortSpec sortSpec = new StreamSortSpec(); - sortSpec.setWindowPeriod(period); - sortSpec.setWindowMargin(margin); - partition.setSortSpec(sortSpec); - } - return partition; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java deleted file mode 100644 index 2a1541a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.eagle.alert.engine.serialization.impl; - -import org.apache.eagle.alert.engine.serialization.Serializer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class StringSerializer implements Serializer<String> { - @Override - public void serialize(String value, DataOutput dataOutput) throws IOException { - dataOutput.writeUTF(value); - } - - @Override - public String deserialize(DataInput dataInput) throws IOException { - return dataInput.readUTF(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java deleted file mode 100644 index 599f349..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.siddhi.extension; - -import com.google.common.collect.ImmutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.config.ExecutionPlanContext; -import org.wso2.siddhi.core.executor.ExpressionExecutor; -import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.definition.Attribute.Type; - -import java.util.LinkedList; - -/** - * @since Apr 1, 2016. - */ -public class AttributeCollectAggregator extends AttributeAggregator { - - private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class); - - private LinkedList<Object> value; - - public AttributeCollectAggregator() { - value = new LinkedList<Object>(); - } - - @Override - public void start() { - } - - @Override - public void stop() { - } - - @Override - public Object[] currentState() { - return value.toArray(); - } - - @Override - public void restoreState(Object[] arg0) { - value = new LinkedList<Object>(); - if (arg0 != null) { - for (Object o : arg0) { - value.add(o); - } - } - } - - @Override - public Type getReturnType() { - return Attribute.Type.OBJECT; - } - - @Override - protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) { - // TODO: Support max of elements? - } - - @Override - public Object processAdd(Object arg0) { - value.add(arg0); - if (LOG.isDebugEnabled()) { - LOG.debug("processAdd: current values are : " + value); - } - return ImmutableList.copyOf(value); - } - - @Override - public Object processAdd(Object[] arg0) { - value.add(arg0); - if (LOG.isDebugEnabled()) { - LOG.debug("processAdd: current values are : " + value); - } - return ImmutableList.copyOf(value); - } - - // / NOTICE: non O(1) - @Override - public Object processRemove(Object arg0) { - value.remove(arg0); - if (LOG.isDebugEnabled()) { - LOG.debug("processRemove: current values are : " + value); - } - return ImmutableList.copyOf(value); - } - - // / NOTICE: non O(1) - @Override - public Object processRemove(Object[] arg0) { - value.remove(arg0); - LOG.info("processRemove: current values are : " + value); - return ImmutableList.copyOf(value); - } - - @Override - public Object reset() { - value.clear(); - return value; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java deleted file mode 100644 index 101d05b..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.siddhi.extension; - -import com.google.common.collect.ImmutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.config.ExecutionPlanContext; -import org.wso2.siddhi.core.executor.ExpressionExecutor; -import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.definition.Attribute.Type; - -import java.util.LinkedList; - -public class AttributeCollectWithDistinctAggregator extends AttributeAggregator { - - private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class); - - private LinkedList<Object> value; - - public AttributeCollectWithDistinctAggregator() { - value = new LinkedList<Object>(); - } - - @Override - public void start() { - } - - @Override - public void stop() { - } - - @Override - public Object[] currentState() { - return value.toArray(); - } - - @Override - public void restoreState(Object[] arg0) { - value = new LinkedList<Object>(); - if (arg0 != null) { - for (Object o : arg0) { - value.add(o); - } - } - } - - @Override - public Type getReturnType() { - return Attribute.Type.OBJECT; - } - - @Override - protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) { - // TODO: Support max of elements? - } - - @Override - public Object processAdd(Object arg0) { - // AttributeAggregator.process is already synchronized - if (value.contains(arg0)) { - value.remove(arg0); - } - value.add(arg0); - if (LOG.isDebugEnabled()) { - LOG.debug("processAdd: current values are : " + value); - } - return ImmutableList.copyOf(value); - } - - @Override - public Object processAdd(Object[] arg0) { - // AttributeAggregator.process is already synchronized - if (value.contains(arg0)) { - value.remove(arg0); - } - value.add(arg0); - if (LOG.isDebugEnabled()) { - LOG.debug("processAdd: current values are : " + value); - } - return ImmutableList.copyOf(value); - } - - // / NOTICE: non O(1) - @Override - public Object processRemove(Object arg0) { - value.remove(arg0); - if (LOG.isDebugEnabled()) { - LOG.debug("processRemove: current values are : " + value); - } - return ImmutableList.copyOf(value); - } - - // / NOTICE: non O(1) - @Override - public Object processRemove(Object[] arg0) { - value.remove(arg0); - LOG.info("processRemove: current values are : " + value); - return ImmutableList.copyOf(value); - } - - @Override - public Object reset() { - value.clear(); - return value; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java deleted file mode 100644 index 27df63b..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.siddhi.extension; - -import org.wso2.siddhi.core.config.ExecutionPlanContext; -import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; -import org.wso2.siddhi.core.executor.ExpressionExecutor; -import org.wso2.siddhi.core.executor.function.FunctionExecutor; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; - -public class ContainsIgnoreCaseExtension extends FunctionExecutor { - - Attribute.Type returnType = Attribute.Type.BOOL; - - @Override - protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { - if (attributeExpressionExecutors.length != 2) { - throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, " - + "but found " + attributeExpressionExecutors.length); - } - if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, " - + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString()); - } - if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, " - + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString()); - } - } - - @Override - protected Object execute(Object[] data) { - if (data[0] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. First argument cannot be null"); - } - if (data[1] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null"); - } - String str1 = (String) data[0]; - String str2 = (String) data[1]; - return str1.toUpperCase().contains(str2.toUpperCase()); - } - - @Override - protected Object execute(Object data) { - return null; //Since the containsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented. - } - - @Override - public void start() { - //Nothing to start - } - - @Override - public void stop() { - //Nothing to stop - } - - @Override - public Attribute.Type getReturnType() { - return returnType; - } - - @Override - public Object[] currentState() { - return new Object[] {}; - } - - @Override - public void restoreState(Object[] state) { - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java deleted file mode 100644 index 1292e05..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.siddhi.extension; - -import org.wso2.siddhi.core.config.ExecutionPlanContext; -import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; -import org.wso2.siddhi.core.executor.ExpressionExecutor; -import org.wso2.siddhi.core.executor.function.FunctionExecutor; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; - -public class EqualsIgnoreCaseExtension extends FunctionExecutor { - - Attribute.Type returnType = Attribute.Type.BOOL; - - @Override - protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { - if (attributeExpressionExecutors.length != 2) { - throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, " - + "but found " + attributeExpressionExecutors.length); - } - if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, " - + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString()); - } - if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, " - + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString()); - } - } - - @Override - protected Object execute(Object[] data) { - if (data[0] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. First argument cannot be null"); - } - if (data[1] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null"); - } - String str1 = (String) data[0]; - String str2 = (String) data[1]; - return str1.equalsIgnoreCase(str2); - } - - @Override - protected Object execute(Object data) { - return null; //Since the equalsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented. - } - - @Override - public void start() { - //Nothing to start - } - - @Override - public void stop() { - //Nothing to stop - } - - @Override - public Attribute.Type getReturnType() { - return returnType; - } - - @Override - public Object[] currentState() { - return new Object[] {}; - } - - @Override - public void restoreState(Object[] state) { - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java deleted file mode 100644 index fe2280f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.siddhi.extension; - -import org.wso2.siddhi.core.config.ExecutionPlanContext; -import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; -import org.wso2.siddhi.core.executor.ConstantExpressionExecutor; -import org.wso2.siddhi.core.executor.ExpressionExecutor; -import org.wso2.siddhi.extension.string.RegexpFunctionExtension; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * regexpIgnoreCase(string, regex) - * Tells whether or not this 'string' matches the given regular expression 'regex'. - * Accept Type(s): (STRING,STRING) - * Return Type(s): BOOLEAN - */ -public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension { - - //state-variables - boolean isRegexConstant = false; - String regexConstant; - Pattern patternConstant; - - @Override - protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { - if (attributeExpressionExecutors.length != 2) { - throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, " - + "but found " + attributeExpressionExecutors.length); - } - if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, " - + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString()); - } - if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, " - + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString()); - } - if (attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor) { - isRegexConstant = true; - regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue(); - patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE); - } - } - - @Override - protected Object execute(Object[] data) { - String regex; - Pattern pattern; - Matcher matcher; - - if (data[0] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. First argument cannot be null"); - } - if (data[1] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. Second argument cannot be null"); - } - String source = (String) data[0]; - - if (!isRegexConstant) { - regex = (String) data[1]; - pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE); - matcher = pattern.matcher(source); - return matcher.matches(); - - } else { - matcher = patternConstant.matcher(source); - return matcher.matches(); - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java deleted file mode 100644 index a72f728..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.common.DateTimeUtil; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -/** - * TODO: Make sure thread-safe. - * TODO: Leverage Off-Heap Memory to persist append-only events collection. - */ -public abstract class BaseStreamWindow implements StreamWindow { - private final long endTime; - private final long startTime; - private final long margin; - private final AtomicBoolean expired; - private final long createdTime; - private static final Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class); - private PartitionedEventCollector collector; - private final AtomicLong lastFlushedStreamTime; - private final AtomicLong lastFlushedSystemTime; - - public BaseStreamWindow(long startTime, long endTime, long marginTime) { - if (startTime >= endTime) { - throw new IllegalArgumentException("startTime: " + startTime + " >= endTime: " + endTime + ", expected: startTime < endTime"); - } - if (marginTime > endTime - startTime) { - throw new IllegalArgumentException("marginTime: " + marginTime + " > endTime: " + endTime + " - startTime " + startTime + ", expected: marginTime < endTime - startTime"); - } - this.startTime = startTime; - this.endTime = endTime; - this.margin = marginTime; - this.expired = new AtomicBoolean(false); - this.createdTime = System.currentTimeMillis(); - this.lastFlushedStreamTime = new AtomicLong(0); - this.lastFlushedSystemTime = new AtomicLong(this.createdTime); - } - - @Override - public void register(PartitionedEventCollector collector) { - if (this.collector != null) { - throw new IllegalArgumentException("Duplicated collector error"); - } - this.collector = collector; - } - - @Override - public long createdTime() { - return createdTime; - } - - public long startTime() { - return this.startTime; - } - - @Override - public long rejectTime() { - return this.lastFlushedStreamTime.get(); - } - - @Override - public long margin() { - return this.margin; - } - - public long endTime() { - return this.endTime; - } - - public boolean accept(final long eventTime) { - return !expired() && eventTime >= startTime && eventTime < endTime - && eventTime >= lastFlushedStreamTime.get(); // dropped - } - - public boolean expired() { - return expired.get(); - } - - @Override - public boolean alive() { - return !expired.get(); - } - - /** - * Expire when - * 1) If stream time >= endTime + marginTime, then flush and expire - * 2) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime >= endTime, then flush and expire. - * 3) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime < endTime, then flush but not expire. - * 4) else do nothing - * - * @param clock stream time clock - * @param globalSystemTime system time clock - */ - @Override - public synchronized void onTick(StreamTimeClock clock, long globalSystemTime) { - if (!expired()) { - if (clock.getTime() >= endTime + margin) { - LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}", clock.getStreamId(), - DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this); - lastFlushedStreamTime.set(clock.getTime()); - lastFlushedSystemTime.set(globalSystemTime); - flush(); - expired.set(true); - } else if (globalSystemTime - lastFlushedSystemTime.get() >= endTime + margin - startTime && size() > 0) { - LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}", clock.getStreamId(), - DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime), - DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this); - lastFlushedStreamTime.set(clock.getTime()); - lastFlushedSystemTime.set(globalSystemTime); - flush(); - if (lastFlushedStreamTime.get() >= this.endTime) { - expired.set(true); - } - } - } else { - LOG.warn("Window has already expired, should not tick: {}", this.toString()); - } - } - - public void close() { - flush(); - expired.set(true); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(startTime).append(endTime).append(margin).build(); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof BaseStreamWindow) { - BaseStreamWindow another = (BaseStreamWindow) obj; - return another.startTime == this.startTime && another.endTime == this.endTime && another.margin == this.margin; - } - return false; - } - - @Override - public void flush() { - if (this.collector == null) { - throw new NullPointerException("Collector is not given before window flush"); - } - this.flush(collector); - } - - /** - * @param collector PartitionedEventCollector. - * @return max timestamp. - */ - protected abstract void flush(PartitionedEventCollector collector); - - @Override - public String toString() { - return String.format("StreamWindow[period=[%s,%s), margin=%s ms, size=%s, reject=%s]", - DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime), - DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime), - this.margin, - size(), - this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime) : DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime()) - ); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java deleted file mode 100644 index 13e60d6..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -/** - * The time clock per stream should be thread-safe between getTime and moveForward. - * By default, we currently simple support event timestamp now. - */ -public interface StreamTimeClock { - /** - * Get stream id. - * - * @return stream id - */ - String getStreamId(); - - /** - * Get current time. - * - * @return current timestamp value - */ - long getTime(); - - /** - * @param timestamp move forward current time to given timestamp. - */ - void moveForward(long timestamp); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java deleted file mode 100644 index b88f66e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -public interface StreamTimeClockListener { - /** - * StreamTimeClockListener onTick callback. - * - * @param streamTime - * @param globalSystemTime - * @see StreamWindow - */ - void onTick(StreamTimeClock streamTime, long globalSystemTime); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java deleted file mode 100644 index 08878fd..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import java.io.Serializable; - -/** - * By default, we could keep the current time clock in memory, - * Eventually we may need to consider the global time synchronization across all nodes - * - * <p>TODO: maybe need to synchronize time clock globally</p> - * - * <p>1) When to initialize window according to start time - * 2) When to close expired window according to current time - * 3) Automatically tick periodically as the single place for control lock.</p> - */ -public interface StreamTimeClockManager extends StreamTimeClockTrigger, Serializable { - /** - * @return StreamTimeClock instance. - */ - StreamTimeClock createStreamTimeClock(String streamId); - - StreamTimeClock getStreamTimeClock(String streamId); - - void removeStreamTimeClock(String streamId); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java deleted file mode 100644 index 494ef05..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - - -/** - * Possible implementation: - * - * <p>1) EventTimeClockTrigger (by default). - * 2) SystemTimeClockTrigger.</p> - */ -public interface StreamTimeClockTrigger { - /** - * @param streamId stream id to listen to. - * @param listener to watch on streamId. - */ - void registerListener(String streamId, StreamTimeClockListener listener); - - void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener); - - /** - * @param listener listener to remove. - */ - void removeListener(StreamTimeClockListener listener); - - /** - * Trigger tick of all listeners on certain stream. - * - * @param streamId stream id - */ - void triggerTickOn(String streamId); - - /** - * Update time per new event time on stream. - * - * @param streamId - * @param timestamp - */ - void onTimeUpdate(String streamId, long timestamp); - - void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java deleted file mode 100644 index c30f00f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.model.PartitionedEvent; - -/** - * <h2>Tumbling Window instead Sliding Window</h2> - * We could have time overlap to sort out-of-ordered stream, - * but each window should never have events overlap, otherwise will have logic problem. - * <h2>Ingestion Time Policy</h2> - * Different notions of time, namely processing time, event time, and ingestion time. - * <ol> - * <li> - * In processing time, windows are defined with respect to the wall clock of the machine that builds and processes a window, - * i.e., a one minute processing time window collects elements for exactly one minute. - * </li> - * <li> - * In event time, windows are defined with respect to timestamps that are attached to each event record. This is common for - * many types of events, such as log entries, sensor data, etc, where the timestamp usually represents the time at which the - * event occurred. Event time has several benefits over processing time. First of all, it decouples the program semantics - * from the actual serving speed of the source and the processing performance of system. - * Hence you can process historic data, which is served at maximum speed, and continuously produced data with the same program. - * It also prevents semantically incorrect results in case of backpressure or delays due to failure recovery. - * - * Second, event time windows compute correct results, even if events arrive out-of-order of their timestamp which is common - * if a data stream gathers events from distributed sources. - * </li> - * <li> - * Ingestion time is a hybrid of processing and event time. It assigns wall clock timestamps to records as soon as they arrive - * in the system (at the source) and continues processing with event time semantics based on the attached timestamps. - * </li> - * </ol> - */ -public interface StreamWindow extends StreamTimeClockListener { - /** - * @return Created timestamp. - */ - long createdTime(); - - /** - * Get start time. - */ - long startTime(); - - long margin(); - - /** - * @return reject timestamp < rejectTime(). - */ - long rejectTime(); - - /** - * Get end time. - */ - long endTime(); - - /** - * @param timestamp event time. - * @return true/false in boolean. - */ - boolean accept(long timestamp); - - /** - * Window is expired. - * - * @return whether window is expired - */ - boolean expired(); - - /** - * @return whether window is alive. - */ - boolean alive(); - - boolean add(PartitionedEvent event); - - void flush(); - - /** - * Close window. - */ - void close(); - - void register(PartitionedEventCollector collector); - - int size(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java deleted file mode 100644 index efa1014..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import java.io.Closeable; -import java.util.Collection; - -/** - * TODO: Reuse existing expired window to avoid recreating new windows again and again - * <p>Single stream window manager.</p> - */ -public interface StreamWindowManager extends StreamTimeClockListener, Closeable { - - /** - * addNewWindow. - */ - StreamWindow addNewWindow(long initialTime); - - /** - * removeWindow. - */ - void removeWindow(StreamWindow window); - - /** - * hasWindow. - * - * @return if has window - */ - boolean hasWindow(StreamWindow window); - - /** - * @param timestamp time. - * @return whether window exists for time. - */ - boolean hasWindowFor(long timestamp); - - /** - * @return Internal collection for performance optimization. - */ - Collection<StreamWindow> getWindows(); - - StreamWindow getWindowFor(long timestamp); - - boolean reject(long timestamp); -} \ No newline at end of file
