[GitHub] [flink] AHeise commented on a change in pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the
AHeise commented on a change in pull request #17692: URL: https://github.com/apache/flink/pull/17692#discussion_r764695828 ## File path: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java ## @@ -37,12 +43,25 @@ import static org.junit.Assert.assertTrue; /** Tests for {@link SharedBuffer}. */ +@RunWith(Parameterized.class) public class SharedBufferTest extends TestLogger { +public static final SharedBufferCacheConfig MINI_CACHE_CONFIG = +new SharedBufferCacheConfig(1, 1, Duration.ofSeconds(1L)); +public static final SharedBufferCacheConfig BIG_CACHE_CONFIG = +new SharedBufferCacheConfig(10240, 10240, Duration.ofSeconds(1L)); + +@Parameterized.Parameter public SharedBufferCacheConfig jobParameters; Review comment: nit: rename to cache config. ## File path: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java ## @@ -289,7 +308,7 @@ public void testSharedBufferCountersClearing() throws Exception { @Test public void testSharedBufferAccessor() throws Exception { TestSharedBuffer sharedBuffer = - TestSharedBuffer.createTestBuffer(Event.createTypeSerializer()); + TestSharedBuffer.createTestBuffer(Event.createTypeSerializer(), BIG_CACHE_CONFIG); Review comment: This should probably use `jobParameters`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the
AHeise commented on a change in pull request #17692: URL: https://github.com/apache/flink/pull/17692#discussion_r764120930 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/configuration/SharedBufferCacheConfig.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.configuration; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; + +import java.io.Serializable; +import java.time.Duration; + +import static org.apache.flink.cep.configuration.CEPCacheOptions.CEP_CACHE_STATISTICS_INTERVAL; +import static org.apache.flink.cep.configuration.CEPCacheOptions.CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS; +import static org.apache.flink.cep.configuration.CEPCacheOptions.CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS; + +/** Configuration immutable class. */ +public final class SharedBufferCacheConfig implements Serializable { +private final int eventsBufferCacheSlots; +private final int entryCacheSlots; +private final Duration cacheStatisticsInterval; + +public int getEventsBufferCacheSlots() { +return eventsBufferCacheSlots; +} + +public int getEntryCacheSlots() { +return entryCacheSlots; +} + +public Duration getCacheStatisticsInterval() { +return cacheStatisticsInterval; +} + +public SharedBufferCacheConfig() { +this.cacheStatisticsInterval = CEP_CACHE_STATISTICS_INTERVAL.defaultValue(); +this.entryCacheSlots = CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS.defaultValue(); +this.eventsBufferCacheSlots = CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS.defaultValue(); +} + +public SharedBufferCacheConfig( +final int eventsBufferCacheSlots, +final int entryCacheSlots, +final Duration cacheStatisticsInterval) { +this.cacheStatisticsInterval = cacheStatisticsInterval; +this.entryCacheSlots = entryCacheSlots; +this.eventsBufferCacheSlots = eventsBufferCacheSlots; +} + +public static SharedBufferCacheConfig of( +ExecutionConfig.GlobalJobParameters globalJobParameters) { Review comment: Please use `ReadableConfig` here. `GlobalJobParameters` is only for pure user code. ## File path: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java ## @@ -18,31 +18,47 @@ package org.apache.flink.cep.nfa.sharedbuffer; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.cep.Event; import org.apache.flink.cep.nfa.DeweyNumber; import org.apache.flink.cep.utils.TestSharedBuffer; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.TestLogger; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.cep.utils.TestSharedBuffer.BIG_CACHE_CONFIG; +import static org.apache.flink.cep.utils.TestSharedBuffer.MINI_CACHE_CONFIG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** Tests for {@link SharedBuffer}. */ +@RunWith(Parameterized.class) public class SharedBufferTest extends TestLogger { +@Parameterized.Parameter public ExecutionConfig.GlobalJobParameters jobParameters; Review comment: Can we directly use `SharedBufferCacheConfig` here? ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java ## @@ -180,7 +181,12 @@ public void initializeState(StateInitializationContext context) throws Exception new ValueStateDescriptor<>( NFA_STATE_NAME, new NFAStateSerializer())); -partialMatches = new SharedBuffer<>(context.getKeyedStateStore(), inputSerializer); +partialMatches = +new SharedBuffer<>( +
[GitHub] [flink] AHeise commented on a change in pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the
AHeise commented on a change in pull request #17692: URL: https://github.com/apache/flink/pull/17692#discussion_r754103757 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -73,33 +88,105 @@ private final MapState> entries; /** The cache of eventsBuffer State. */ -private final Map> eventsBufferCache = new HashMap<>(); +private final Cache> eventsBufferCache; /** The cache of sharedBufferNode. */ -private final Map> entryCache = new HashMap<>(); +private final Cache> entryCache; + +private final Timer cacheStatisticsTimer; +@VisibleForTesting public SharedBuffer(KeyedStateStore stateStore, TypeSerializer valueSerializer) { +this(stateStore, valueSerializer, new Configuration()); +} + +public SharedBuffer( +KeyedStateStore stateStore, +TypeSerializer valueSerializer, +ExecutionConfig.GlobalJobParameters globalJobParameters) { Review comment: Instead of passing `GlobalJobParameters`, you should pass only the setting values directly. Alternatively, you can extract a small immutable class that captures only these values. Using `GlobalJobParameters` makes testing harder and couples the classes closer than they need to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise commented on a change in pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the
AHeise commented on a change in pull request #17692: URL: https://github.com/apache/flink/pull/17692#discussion_r747261841 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/configuration/CEPCacheOptions.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.configuration; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.util.TimeUtils; + +import java.time.Duration; + +/** CEP Cache Options. */ +public class CEPCacheOptions { + +private CEPCacheOptions() {} + +public static final ConfigOption CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS = + ConfigOptions.key("pipeline.global-job-parameters.cep.sharedbuffer.event-cache-slots") Review comment: The cache options seem to be described way too technical. How should an end-user understand what's going on? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org