rkhachatryan commented on code in PR #27070:
URL: https://github.com/apache/flink/pull/27070#discussion_r2438586811


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,68 @@ public class ExecutionConfigOptions {
                                                     + "or force 
materialization(FORCE).")
                                     .build());
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using 
strategy=ADAPTIVE, defines the number of entries per key when the 
implementation is changed from MAP to VALUE. "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (300 for hashmap state 
backend and 40 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .text("The option takes effect 
during job (re)starting")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using 
strategy=ADAPTIVE, defines the number of entries per key when the 
implementation is changed from VALUE to MAP. "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (400 for hashmap state 
backend and 50 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .text("The option takes effect 
during job (re)starting")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<SinkUpsertMaterializeStrategy>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+                    key("table.exec.sink.upsert-materialize-strategy.type")
+                            .enumType(SinkUpsertMaterializeStrategy.class)
+                            .defaultValue(SinkUpsertMaterializeStrategy.LEGACY)
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "Which strategy of 
SinkUpsertMaterializer to use. Supported strategies:")
+                                            .linebreak()
+                                            .text(
+                                                    "LEGACY: Simple 
implementation based on ValueState<List> (the original implementation).")
+                                            .linebreak()
+                                            .text(
+                                                    "MAP: 
OrderedMultiSetState-based implementation based on a combination of several 
MapState maintaining ordering and fast lookup properties.")
+                                            .linebreak()
+                                            .text(
+                                                    "VALUE: Similar to LEGACY, 
but compatible with MAP and therefore allows to switch to ADAPTIVE.")
+                                            .linebreak()
+                                            .text(
+                                                    "ADAPTIVE: Alternate 
between MAP and VALUE depending on the number of entries for the given key 
starting with VALUE and switching to MAP upon reaching threshold.high value 
(and back to VALUE, when reaching low).")
+                                            .linebreak()
+                                            .text("The default is LEGACY")

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to