[ 
https://issues.apache.org/jira/browse/FLINK-35537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852688#comment-17852688
 ] 

Zakelly Lan commented on FLINK-35537:
-------------------------------------

I have confirmed that the problem comes from the 
{{{}EmbeddedRocksDBStateBackend#mergeConfigurableOptions{}}}, which merges two 
configuration map into one. It uses {{toString}} of each already parsed value 
and insert the value string into a new raw map. The {{toString}} gives the 
string format of enum list, not the legacy form of the value. The new raw map 
get parsed later, that's when the error happens.

> Error parsing list of enum in legacy yaml configuration 
> --------------------------------------------------------
>
>                 Key: FLINK-35537
>                 URL: https://issues.apache.org/jira/browse/FLINK-35537
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Configuration
>    Affects Versions: 1.19.0
>            Reporter: Zakelly Lan
>            Priority: Major
>
> In flink 1.9.0, when I submit a job to a standalone cluster, the TM throws
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Could not parse value 
> '[NO_COMPRESSION]' for key 'state.backend.rocksdb.compression.per.level'.
>       at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:827)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.internalGetOption(RocksDBResourceContainer.java:312)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.setColumnFamilyOptionsFromConfigurableOptions(RocksDBResourceContainer.java:361)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getColumnOptions(RocksDBResourceContainer.java:181)
>       at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.lambda$createKeyedStateBackend$0(EmbeddedRocksDBStateBackend.java:449)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyOptions(RocksDBOperationUtils.java:219)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:138)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
>       ... 19 more
> Caused by: java.lang.IllegalArgumentException: Could not parse value for enum 
> class org.rocksdb.CompressionType. Expected one of: [[NO_COMPRESSION, 
> SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZLIB2_COMPRESSION, LZ4_COMPRESSION, 
> LZ4HC_COMPRESSION, XPRESS_COMPRESSION, ZSTD_COMPRESSION, 
> DISABLE_COMPRESSION_OPTION]]
>       at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToEnum$12(ConfigurationUtils.java:502)
>       at java.util.Optional.orElseThrow(Optional.java:290)
>       at 
> org.apache.flink.configuration.ConfigurationUtils.convertToEnum(ConfigurationUtils.java:499)
>       at 
> org.apache.flink.configuration.ConfigurationUtils.convertValue(ConfigurationUtils.java:392)
>       at 
> org.apache.flink.configuration.ConfigurationUtils.lambda$convertToListWithLegacyProperties$4(ConfigurationUtils.java:440)
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>       at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>       at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>       at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>       at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>       at 
> org.apache.flink.configuration.ConfigurationUtils.convertToListWithLegacyProperties(ConfigurationUtils.java:441)
>       at 
> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:432)
>       at 
> org.apache.flink.configuration.Configuration.lambda$getOptional$3(Configuration.java:819)
>       at java.util.Optional.map(Optional.java:215)
>       at 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:819)
>       ... 28 more
> {code}
> I configured 'state.backend.rocksdb.compression.per.level: NO_COMPRESSION' in 
> flink-conf.yaml. I also tried the flink-1.18.1, and it runs well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to