This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9aa7a3cdbb6bca1746c046dd5536026e43065cec Author: Zakelly <zakelly....@gmail.com> AuthorDate: Fri Dec 15 12:37:01 2023 +0800 [FLINK-20772][State] Tests for null value in TtlValueState#update --- .../flink/runtime/state/ttl/TtlStateTestBase.java | 19 +++++++++++++++++++ .../runtime/state/ttl/TtlValueStateTestContext.java | 16 ++++++++-------- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java index 5eb5c6c2f8f..bd658dbea44 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java @@ -187,6 +187,25 @@ public abstract class TtlStateTestBase { .isEqualTo(ctx().emptyValue); } + @TestTemplate + void testValueSetNull() throws Exception { + // Only test this on value state + assumeThat(ctx()).isInstanceOf(TtlValueStateTestContext.class); + + initTest( + StateTtlConfig.UpdateType.OnCreateAndWrite, + StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp); + + ctx().update(ctx().updateUnexpired); + assertThat(ctx().get()) + .withFailMessage(UPDATED_UNEXPIRED_AVAIL) + .isEqualTo(ctx().getUnexpired); + + // Update null and we get empty. + ctx().update(null); + assertThat(ctx().get()).withFailMessage(EXPIRED_UNAVAIL).isEqualTo(ctx().emptyValue); + } + @TestTemplate void testExactExpirationOnWrite() throws Exception { initTest( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java index 1e22d404957..1dfdf339852 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java @@ -21,14 +21,14 @@ package org.apache.flink.runtime.state.ttl; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; /** Test suite for {@link TtlValueState}. */ class TtlValueStateTestContext - extends TtlStateTestContextBase<TtlValueState<?, String, String>, String, String> { - private static final String TEST_VAL1 = "test value1"; - private static final String TEST_VAL2 = "test value2"; - private static final String TEST_VAL3 = "test value3"; + extends TtlStateTestContextBase<TtlValueState<?, String, Long>, Long, Long> { + private static final Long TEST_VAL1 = 11L; + private static final Long TEST_VAL2 = 21L; + private static final Long TEST_VAL3 = 31L; @Override void initTestValues() { @@ -45,16 +45,16 @@ class TtlValueStateTestContext @Override public <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() { return (StateDescriptor<US, SV>) - new ValueStateDescriptor<>(getName(), StringSerializer.INSTANCE); + new ValueStateDescriptor<>(getName(), LongSerializer.INSTANCE); } @Override - public void update(String value) throws Exception { + public void update(Long value) throws Exception { ttlState.update(value); } @Override - public String get() throws Exception { + public Long get() throws Exception { return ttlState.value(); }