Re:Re: flink sql api, exception when setting "table.exec.state.ttl"
I have feagured this out. It was because I put a flink-connector-tidb-cdc.jar in my Flink's lib folder earlier, and it is shipped with scala 2.11, while my flink is shipped with scala2.12. Some how when I submit a job with GroupAggregate operator, it needs to load keyed rocksdb states, and here come into a conflict. I will look into it and give a solution. At 2022-05-23 20:55:39, "Chesnay Schepler" wrote: You're probably mixing Flink versions. From the stack trace we can see that Flink classes are being loaded from 2 different jars (rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); I'd suggest to resolve that first and see if the error persists. On 23/05/2022 14:32, 李诗君 wrote: flink version: 1.13.5 java code: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);env.enableCheckpointing(6);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6); env.getCheckpointConfig().setCheckpointTimeout(6); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints"); env.setStateBackend(new RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints", true)); tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); Configuration configuration = tableEnv.getConfig().getConfiguration(); // configuration.setString("table.exec.resource.default-parallelism","16"); configuration.setString("table.exec.state.ttl","720"); and when I submit this job , I got this: Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched fromINITIALIZING to FAILEDon container_1647420330066_0473_01_02 @ test-wh-hadoop-1 (dataPort=38604). java.lang.UnsatisfiedLinkError: org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J at org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(NativeMethod) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
Re: flink sql api, exception when setting "table.exec.state.ttl"
You're probably mixing Flink versions. From the stack trace we can see that Flink classes are being loaded from 2 different jars (rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); I'd suggest to resolve that first and see if the error persists. On 23/05/2022 14:32, 李诗君 wrote: flink version: 1.13.5 java code: StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings =EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironmenttableEnv = StreamTableEnvironment.create(env, settings);env.enableCheckpointing(6);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6); env.getCheckpointConfig().setCheckpointTimeout(6); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints"); env.setStateBackend(new RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints", true)); tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); Configuration configuration =tableEnv.getConfig().getConfiguration(); // configuration.setString("table.exec.resource.default-parallelism","16"); configuration.setString("table.exec.state.ttl","720"); and when I submit this job , I got this: Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched from INITIALIZING to FAILED on container_1647420330066_0473_01_02 @ test-wh-hadoop-1 (dataPort=38604). java.lang.UnsatisfiedLinkError: org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J at org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native Method) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
flink sql api, exception when setting "table.exec.state.ttl"
flink version: 1.13.5 java code: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); env.enableCheckpointing(6); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6); env.getCheckpointConfig().setCheckpointTimeout(6); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints"); env.setStateBackend(new RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints", true)); tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); Configuration configuration = tableEnv.getConfig().getConfiguration(); // configuration.setString("table.exec.resource.default-parallelism","16"); configuration.setString("table.exec.state.ttl","720"); and when I submit this job , I got this: Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched fromINITIALIZING to FAILEDon container_1647420330066_0473_01_02 @ test-wh-hadoop-1 (dataPort=38604). java.lang.UnsatisfiedLinkError: org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J at org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(NativeMethod) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133) ~[flink-dist_2.12-1.13.5.jar:2.2.0] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203) ~[flink-dist_2.12-1.13.5.jar:1.13.5] at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119) ~[flink-table-blink_2.12-1