ableegoldman commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r618914490



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1010,18 +1026,51 @@ public StreamsConfig(final Map<?, ?> props) {
     protected StreamsConfig(final Map<?, ?> props,
                             final boolean doLog) {
         super(CONFIG, props, doLog);
-        eosEnabled = StreamThread.eosEnabled(this);
+        eosEnabled = eosEnabled();
+
+        final String processingModeConfig = 
getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
+        if (processingModeConfig.equals(EXACTLY_ONCE)) {
+            log.warn("Configuration parameter `{}` is deprecated and will be 
removed in the 4.0.0 release. " +
+                         "Please use `{}` instead. Note that this requires 
broker version 2.5+ so you should prepare "
+                         + "to upgrade your brokers if necessary.", 
EXACTLY_ONCE, EXACTLY_ONCE_V2);
+        }
+        if (processingModeConfig.equals(EXACTLY_ONCE_BETA)) {
+            log.warn("Configuration parameter `{}` is deprecated and will be 
removed in the 4.0.0 release. " +
+                         "Please use `{}` instead.", EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2);
+        }
+
         if (props.containsKey(RETRIES_CONFIG)) {
-            log.warn("Configuration parameter `{}` is deprecated and will be 
removed in 3.0.0 release.", RETRIES_CONFIG);
+            log.warn("Configuration parameter `{}` is deprecated and will be 
removed in the 4.0.0 release.", RETRIES_CONFIG);
+        }
+    }
+
+    public ProcessingMode processingMode() {
+        if 
(EXACTLY_ONCE.equals(getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
+            return StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
+        } else if 
(EXACTLY_ONCE_BETA.equals(getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)))
 {
+            return StreamThread.ProcessingMode.EXACTLY_ONCE_V2;
+        } else if 
(EXACTLY_ONCE_V2.equals(getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
+            return StreamThread.ProcessingMode.EXACTLY_ONCE_V2;
+        } else {
+            return StreamThread.ProcessingMode.AT_LEAST_ONCE;
         }
     }
 
+    public boolean eosEnabled() {

Review comment:
       Ohh, I forgot this was public. Now the current code makes much more 
sense, I moved it because I thought it was so awkward. I'll just put it back 
and leave a comment so the next person doesn't fall into the same trap. Thanks 
for the explanation 😅 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to