You're probably mixing Flink versions.
From the stack trace we can see that Flink classes are being loaded
from 2 different jars
(rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); I'd
suggest to resolve that first and see if the error persists.
On 23/05/2022 14:32, 李诗君 wrote:
flink version: 1.13.5
java code:
StreamExecutionEnvironment env
=StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings
settings =EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build(); StreamTableEnvironmenttableEnv =
StreamTableEnvironment.create(env,
settings);env.enableCheckpointing(60000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); //
env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints");
env.setStateBackend(new RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints", true)); tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); Configuration configuration =tableEnv.getConfig().getConfiguration(); //
configuration.setString("table.exec.resource.default-parallelism","16");
configuration.setString("table.exec.state.ttl","7200000");
and when I submit this job , I got this:
Sink:
Sink(table=[default_catalog.default_database.rts_board_trans_compute],
fields=[mchnt_id, time_hour, channel, trans_count, trans_amount,
average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched
from INITIALIZING to FAILED on container_1647420330066_0473_01_000002
@ test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError:
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
at
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native
Method) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13)
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at
org.rocksdb.FlinkCompactionFilter$ConfigHolder.<init>(FlinkCompactionFilter.java:107)
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.<init>(FlinkCompactionFilter.java:133)
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119)
~[flink-table-blink_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
It will be quite normal if I take away this line of code:
configuration.setString("table.exec.state.ttl","7200000");
so ,what’s wrong with this setting?
any suggestion will be appreciated.