[GitHub] [flink] AHeise commented on a change in pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the

2021-12-08 Thread GitBox


AHeise commented on a change in pull request #17692:
URL: https://github.com/apache/flink/pull/17692#discussion_r764695828



##
File path: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
##
@@ -37,12 +43,25 @@
 import static org.junit.Assert.assertTrue;
 
 /** Tests for {@link SharedBuffer}. */
+@RunWith(Parameterized.class)
 public class SharedBufferTest extends TestLogger {
 
+public static final SharedBufferCacheConfig MINI_CACHE_CONFIG =
+new SharedBufferCacheConfig(1, 1, Duration.ofSeconds(1L));
+public static final SharedBufferCacheConfig BIG_CACHE_CONFIG =
+new SharedBufferCacheConfig(10240, 10240, Duration.ofSeconds(1L));
+
+@Parameterized.Parameter public SharedBufferCacheConfig jobParameters;

Review comment:
   nit: rename to cache config.

##
File path: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
##
@@ -289,7 +308,7 @@ public void testSharedBufferCountersClearing() throws 
Exception {
 @Test
 public void testSharedBufferAccessor() throws Exception {
 TestSharedBuffer sharedBuffer =
-
TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+
TestSharedBuffer.createTestBuffer(Event.createTypeSerializer(), 
BIG_CACHE_CONFIG);

Review comment:
   This should probably use `jobParameters`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the

2021-12-07 Thread GitBox


AHeise commented on a change in pull request #17692:
URL: https://github.com/apache/flink/pull/17692#discussion_r764120930



##
File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/configuration/SharedBufferCacheConfig.java
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.configuration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+import static 
org.apache.flink.cep.configuration.CEPCacheOptions.CEP_CACHE_STATISTICS_INTERVAL;
+import static 
org.apache.flink.cep.configuration.CEPCacheOptions.CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS;
+import static 
org.apache.flink.cep.configuration.CEPCacheOptions.CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS;
+
+/** Configuration immutable class. */
+public final class SharedBufferCacheConfig implements Serializable {
+private final int eventsBufferCacheSlots;
+private final int entryCacheSlots;
+private final Duration cacheStatisticsInterval;
+
+public int getEventsBufferCacheSlots() {
+return eventsBufferCacheSlots;
+}
+
+public int getEntryCacheSlots() {
+return entryCacheSlots;
+}
+
+public Duration getCacheStatisticsInterval() {
+return cacheStatisticsInterval;
+}
+
+public SharedBufferCacheConfig() {
+this.cacheStatisticsInterval = 
CEP_CACHE_STATISTICS_INTERVAL.defaultValue();
+this.entryCacheSlots = 
CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS.defaultValue();
+this.eventsBufferCacheSlots = 
CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS.defaultValue();
+}
+
+public SharedBufferCacheConfig(
+final int eventsBufferCacheSlots,
+final int entryCacheSlots,
+final Duration cacheStatisticsInterval) {
+this.cacheStatisticsInterval = cacheStatisticsInterval;
+this.entryCacheSlots = entryCacheSlots;
+this.eventsBufferCacheSlots = eventsBufferCacheSlots;
+}
+
+public static SharedBufferCacheConfig of(
+ExecutionConfig.GlobalJobParameters globalJobParameters) {

Review comment:
   Please use `ReadableConfig` here. `GlobalJobParameters` is only for pure 
user code.

##
File path: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
##
@@ -18,31 +18,47 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.nfa.DeweyNumber;
 import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.cep.utils.TestSharedBuffer.BIG_CACHE_CONFIG;
+import static org.apache.flink.cep.utils.TestSharedBuffer.MINI_CACHE_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /** Tests for {@link SharedBuffer}. */
+@RunWith(Parameterized.class)
 public class SharedBufferTest extends TestLogger {
 
+@Parameterized.Parameter public ExecutionConfig.GlobalJobParameters 
jobParameters;

Review comment:
   Can we directly use `SharedBufferCacheConfig` here?

##
File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
##
@@ -180,7 +181,12 @@ public void initializeState(StateInitializationContext 
context) throws Exception
 new ValueStateDescriptor<>(
 NFA_STATE_NAME, new 
NFAStateSerializer()));
 
-partialMatches = new SharedBuffer<>(context.getKeyedStateStore(), 
inputSerializer);
+partialMatches =
+new SharedBuffer<>(
+  

[GitHub] [flink] AHeise commented on a change in pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the

2021-12-06 Thread GitBox


AHeise commented on a change in pull request #17692:
URL: https://github.com/apache/flink/pull/17692#discussion_r754103757



##
File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
##
@@ -73,33 +88,105 @@
 private final MapState> entries;
 
 /** The cache of eventsBuffer State. */
-private final Map> eventsBufferCache = new 
HashMap<>();
+private final Cache> eventsBufferCache;
 
 /** The cache of sharedBufferNode. */
-private final Map> entryCache = new 
HashMap<>();
+private final Cache> entryCache;
+
+private final Timer cacheStatisticsTimer;
 
+@VisibleForTesting
 public SharedBuffer(KeyedStateStore stateStore, TypeSerializer 
valueSerializer) {
+this(stateStore, valueSerializer, new Configuration());
+}
+
+public SharedBuffer(
+KeyedStateStore stateStore,
+TypeSerializer valueSerializer,
+ExecutionConfig.GlobalJobParameters globalJobParameters) {

Review comment:
   Instead of passing `GlobalJobParameters`, you should pass only the 
setting values directly.
   Alternatively, you can extract a small immutable class that captures only 
these values. Using `GlobalJobParameters` makes testing harder and couples the 
classes closer than they need to.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise commented on a change in pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the

2021-11-10 Thread GitBox


AHeise commented on a change in pull request #17692:
URL: https://github.com/apache/flink/pull/17692#discussion_r747261841



##
File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/configuration/CEPCacheOptions.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.util.TimeUtils;
+
+import java.time.Duration;
+
+/** CEP Cache Options. */
+public class CEPCacheOptions {
+
+private CEPCacheOptions() {}
+
+public static final ConfigOption 
CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS =
+
ConfigOptions.key("pipeline.global-job-parameters.cep.sharedbuffer.event-cache-slots")

Review comment:
   The cache options seem to be described way too technical. How should an 
end-user understand what's going on? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org