[ 
https://issues.apache.org/jira/browse/KAFKA-7182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549542#comment-16549542
 ] 

ASF GitHub Bot commented on KAFKA-7182:
---------------------------------------

rajinisivaram closed pull request #5391: KAFKA-7182: SASL/OAUTHBEARER client 
response missing %x01 seps
URL: https://github.com/apache/kafka/pull/5391
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
new file mode 100644
index 00000000000..8d4b18aede6
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internals;
+
+import org.apache.kafka.common.utils.Utils;
+
+import javax.security.sasl.SaslException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class OAuthBearerClientInitialResponse {
+    static final String SEPARATOR = "\u0001";
+
+    private static final String SASLNAME = "(?:[\\x01-\\x7F&&[^=,]]|=2C|=3D)+";
+    private static final String KEY = "[A-Za-z]+";
+    private static final String VALUE = "[\\x21-\\x7E \t\r\n]+";
+    private static final String KVPAIRS = String.format("(%s=%s%s)*", KEY, 
VALUE, SEPARATOR);
+    private static final Pattern AUTH_PATTERN = 
Pattern.compile("(?<scheme>[\\w]+)[ ]+(?<token>[-_\\.a-zA-Z0-9]+)");
+    private static final Pattern CLIENT_INITIAL_RESPONSE_PATTERN = 
Pattern.compile(
+            String.format("n,(a=(?<authzid>%s))?,%s(?<kvpairs>%s)%s", 
SASLNAME, SEPARATOR, KVPAIRS, SEPARATOR));
+    private static final String AUTH_KEY = "auth";
+
+    private final String tokenValue;
+    private final String authorizationId;
+    private final Map<String, String> properties;
+
+    public OAuthBearerClientInitialResponse(byte[] response) throws 
SaslException {
+        String responseMsg = new String(response, StandardCharsets.UTF_8);
+        Matcher matcher = CLIENT_INITIAL_RESPONSE_PATTERN.matcher(responseMsg);
+        if (!matcher.matches())
+            throw new SaslException("Invalid OAUTHBEARER client first 
message");
+        String authzid = matcher.group("authzid");
+        this.authorizationId = authzid == null ? "" : authzid;
+        String kvPairs = matcher.group("kvpairs");
+        this.properties = Utils.parseMap(kvPairs, "=", SEPARATOR);
+        String auth = properties.get(AUTH_KEY);
+        if (auth == null)
+            throw new SaslException("Invalid OAUTHBEARER client first message: 
'auth' not specified");
+
+        Matcher authMatcher = AUTH_PATTERN.matcher(auth);
+        if (!authMatcher.matches())
+            throw new SaslException("Invalid OAUTHBEARER client first message: 
invalid 'auth' format");
+        if (!"bearer".equalsIgnoreCase(authMatcher.group("scheme"))) {
+            String msg = String.format("Invalid scheme in OAUTHBEARER client 
first message: %s",
+                    matcher.group("scheme"));
+            throw new SaslException(msg);
+        }
+        this.tokenValue = authMatcher.group("token");
+    }
+
+    public OAuthBearerClientInitialResponse(String tokenValue) {
+        this(tokenValue, "", new HashMap<>());
+    }
+
+    public OAuthBearerClientInitialResponse(String tokenValue, String 
authorizationId, Map<String, String> props) {
+        this.tokenValue = tokenValue;
+        this.authorizationId = authorizationId == null ? "" : authorizationId;
+        this.properties = new HashMap<>(props);
+    }
+
+    public byte[] toBytes() {
+        String authzid = authorizationId.isEmpty() ? "" : "a=" + 
authorizationId;
+        String message = String.format("n,%s,%sauth=Bearer %s%s%s", authzid,
+                SEPARATOR, tokenValue, SEPARATOR, SEPARATOR);
+        return message.getBytes(StandardCharsets.UTF_8);
+    }
+
+    public String tokenValue() {
+        return tokenValue;
+    }
+
+    public String authorizationId() {
+        return authorizationId;
+    }
+
+    public String propertyValue(String name) {
+        return properties.get(name);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
index 66942ba96bd..4d4ee57b3a8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
@@ -88,8 +88,7 @@ public boolean hasInitialResponse() {
                         throw new SaslException("Expected empty challenge");
                     callbackHandler().handle(new Callback[] {callback});
                     setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
-                    return String.format("n,,auth=Bearer %s", 
callback.token().value())
-                            .getBytes(StandardCharsets.UTF_8);
+                    return new 
OAuthBearerClientInitialResponse(callback.token().value()).toBytes();
                 case RECEIVE_SERVER_FIRST_MESSAGE:
                     if (challenge != null && challenge.length != 0) {
                         String jsonErrorResponse = new String(challenge, 
StandardCharsets.UTF_8);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
index 5d1f224883b..aacc8fa3cbb 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
@@ -21,8 +21,6 @@
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -48,13 +46,9 @@
  * for example).
  */
 public class OAuthBearerSaslServer implements SaslServer {
-    private static final String INVALID_OAUTHBEARER_CLIENT_FIRST_MESSAGE = 
"Invalid OAUTHBEARER client first message";
     private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerSaslServer.class);
     private static final String NEGOTIATED_PROPERTY_KEY_TOKEN = 
OAuthBearerLoginModule.OAUTHBEARER_MECHANISM + ".token";
     private static final String INTERNAL_ERROR_ON_SERVER = "Authentication 
could not be performed due to an internal error on the server";
-    private static final String SASLNAME = "(?:[\\x01-\\x7F&&[^=,]]|=2C|=3D)+";
-    private static final Pattern CLIENT_INITIAL_RESPONSE_PATTERN = 
Pattern.compile(
-            String.format("n,(a=(?<authzid>%s))?,auth=(?<scheme>[\\w]+)[ 
]+(?<token>[-_\\.a-zA-Z0-9]+)", SASLNAME));
 
     private final AuthenticateCallbackHandler callbackHandler;
 
@@ -90,24 +84,14 @@ public OAuthBearerSaslServer(CallbackHandler 
callbackHandler) {
             throw new SaslAuthenticationException(errorMessage);
         }
         errorMessage = null;
-        String responseMsg = new String(response, StandardCharsets.UTF_8);
-        Matcher matcher = CLIENT_INITIAL_RESPONSE_PATTERN.matcher(responseMsg);
-        if (!matcher.matches()) {
-            if (log.isDebugEnabled())
-                log.debug(INVALID_OAUTHBEARER_CLIENT_FIRST_MESSAGE);
-            throw new SaslException(INVALID_OAUTHBEARER_CLIENT_FIRST_MESSAGE);
-        }
-        String authzid = matcher.group("authzid");
-        String authorizationId = authzid != null ? authzid : "";
-        if (!"bearer".equalsIgnoreCase(matcher.group("scheme"))) {
-            String msg = String.format("Invalid scheme in OAUTHBEARER client 
first message: %s",
-                    matcher.group("scheme"));
-            if (log.isDebugEnabled())
-                log.debug(msg);
-            throw new SaslException(msg);
+        OAuthBearerClientInitialResponse clientResponse;
+        try {
+            clientResponse = new OAuthBearerClientInitialResponse(response);
+        } catch (SaslException e) {
+            log.debug(e.getMessage());
+            throw e;
         }
-        String tokenValue = matcher.group("token");
-        return process(tokenValue, authorizationId);
+        return process(clientResponse.tokenValue(), 
clientResponse.authorizationId());
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
index cbfca13dbf2..5028329feb1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
@@ -17,9 +17,9 @@
 package org.apache.kafka.common.security.scram.internals;
 
 import org.apache.kafka.common.security.scram.ScramLoginModule;
+import org.apache.kafka.common.utils.Utils;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -31,7 +31,7 @@ public ScramExtensions() {
     }
 
     public ScramExtensions(String extensions) {
-        this(stringToMap(extensions));
+        this(Utils.parseMap(extensions, "=", ","));
     }
 
     public ScramExtensions(Map<String, String> extensionMap) {
@@ -52,29 +52,6 @@ public boolean tokenAuthenticated() {
 
     @Override
     public String toString() {
-        return mapToString(extensionMap);
-    }
-
-    private static Map<String, String> stringToMap(String extensions) {
-        Map<String, String> extensionMap = new HashMap<>();
-
-        if (!extensions.isEmpty()) {
-            String[] attrvals = extensions.split(",");
-            for (String attrval : attrvals) {
-                String[] array = attrval.split("=", 2);
-                extensionMap.put(array[0], array[1]);
-            }
-        }
-        return extensionMap;
-    }
-
-    private static String mapToString(Map<String, String> extensionMap) {
-        StringBuilder builder = new StringBuilder();
-        for (Map.Entry<String, String> entry : extensionMap.entrySet()) {
-            builder.append(entry.getKey());
-            builder.append('=');
-            builder.append(entry.getValue());
-        }
-        return builder.toString();
+        return Utils.mkString(extensionMap, "", "", "=", ",");
     }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 31fa01cfc8f..330f968e409 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -512,6 +512,19 @@ public static String formatBytes(long bytes) {
         return bld.toString();
     }
 
+    public static Map<String, String> parseMap(String mapStr, String 
keyValueSeparator, String elementSeparator) {
+        Map<String, String> map = new HashMap<>();
+
+        if (!mapStr.isEmpty()) {
+            String[] attrvals = mapStr.split(elementSeparator);
+            for (String attrval : attrvals) {
+                String[] array = attrval.split(keyValueSeparator, 2);
+                map.put(array[0], array[1]);
+            }
+        }
+        return map;
+    }
+
     /**
      * Read a properties file from the given path
      * @param filename The path of the file to read
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
new file mode 100644
index 00000000000..eccf2dd2ed4
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internals;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class OAuthBearerClientInitialResponseTest {
+
+    @Test
+    public void testToken() throws Exception {
+        String message = "n,,\u0001auth=Bearer 123.345.567\u0001\u0001";
+        OAuthBearerClientInitialResponse response = new 
OAuthBearerClientInitialResponse(message.getBytes(StandardCharsets.UTF_8));
+        assertEquals("123.345.567", response.tokenValue());
+        assertEquals("", response.authorizationId());
+    }
+
+    @Test
+    public void testAuthorizationId() throws Exception {
+        String message = "n,a=myuser,\u0001auth=Bearer 345\u0001\u0001";
+        OAuthBearerClientInitialResponse response = new 
OAuthBearerClientInitialResponse(message.getBytes(StandardCharsets.UTF_8));
+        assertEquals("345", response.tokenValue());
+        assertEquals("myuser", response.authorizationId());
+    }
+
+    @Test
+    public void testProperties() throws Exception {
+        String message = "n,,\u0001propA=valueA1, valueA2\u0001auth=Bearer 
567\u0001propB=valueB\u0001\u0001";
+        OAuthBearerClientInitialResponse response = new 
OAuthBearerClientInitialResponse(message.getBytes(StandardCharsets.UTF_8));
+        assertEquals("567", response.tokenValue());
+        assertEquals("", response.authorizationId());
+        assertEquals("valueA1, valueA2", response.propertyValue("propA"));
+        assertEquals("valueB", response.propertyValue("propB"));
+    }
+
+    // The example in the RFC uses 
`vF9dft4qmTc2Nvb3RlckBhbHRhdmlzdGEuY29tCg==` as the token
+    // But since we use Base64Url encoding, padding is omitted. Hence this 
test verifies without '='.
+    @Test
+    public void testRfc7688Example() throws Exception {
+        String message = 
"n,a=u...@example.com,\u0001host=server.example.com\u0001port=143\u0001" +
+                "auth=Bearer 
vF9dft4qmTc2Nvb3RlckBhbHRhdmlzdGEuY29tCg\u0001\u0001";
+        OAuthBearerClientInitialResponse response = new 
OAuthBearerClientInitialResponse(message.getBytes(StandardCharsets.UTF_8));
+        assertEquals("vF9dft4qmTc2Nvb3RlckBhbHRhdmlzdGEuY29tCg", 
response.tokenValue());
+        assertEquals("u...@example.com", response.authorizationId());
+        assertEquals("server.example.com", response.propertyValue("host"));
+        assertEquals("143", response.propertyValue("port"));
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
index bf21f2b3430..6b53e963af7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
@@ -75,39 +75,41 @@ public void setUp() throws Exception {
     @Test
     public void noAuthorizationIdSpecified() throws Exception {
         byte[] nextChallenge = saslServer
-                
.evaluateResponse(clientInitialResponseText(null).getBytes(StandardCharsets.UTF_8));
+                .evaluateResponse(clientInitialResponse(null));
         assertTrue("Next challenge is not empty", nextChallenge.length == 0);
     }
 
     @Test
     public void authorizatonIdEqualsAuthenticationId() throws Exception {
         byte[] nextChallenge = saslServer
-                
.evaluateResponse(clientInitialResponseText(USER).getBytes(StandardCharsets.UTF_8));
+                .evaluateResponse(clientInitialResponse(USER));
         assertTrue("Next challenge is not empty", nextChallenge.length == 0);
     }
 
     @Test(expected = SaslAuthenticationException.class)
     public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
-        saslServer.evaluateResponse(clientInitialResponseText(USER + 
"x").getBytes(StandardCharsets.UTF_8));
+        saslServer.evaluateResponse(clientInitialResponse(USER + "x"));
     }
 
     @Test
     public void illegalToken() throws Exception {
-        byte[] bytes = saslServer
-                .evaluateResponse((clientInitialResponseText(null) + 
"AB").getBytes(StandardCharsets.UTF_8));
+        byte[] bytes = saslServer.evaluateResponse(clientInitialResponse(null, 
true));
         String challenge = new String(bytes, StandardCharsets.UTF_8);
         assertEquals("{\"status\":\"invalid_token\"}", challenge);
     }
 
-    private String clientInitialResponseText(String authorizationId)
+    private byte[] clientInitialResponse(String authorizationId)
+            throws OAuthBearerConfigException, IOException, 
UnsupportedCallbackException, LoginException {
+        return clientInitialResponse(authorizationId, false);
+    }
+
+    private byte[] clientInitialResponse(String authorizationId, boolean 
illegalToken)
             throws OAuthBearerConfigException, IOException, 
UnsupportedCallbackException, LoginException {
         OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
         LOGIN_CALLBACK_HANDLER.handle(new Callback[] {callback});
         OAuthBearerToken token = callback.token();
         String compactSerialization = token.value();
-        String clientInitialResponseText = "n,"
-                + (authorizationId == null || authorizationId.isEmpty() ? "" : 
"a=" + authorizationId) + ",auth=Bearer "
-                + compactSerialization;
-        return clientInitialResponseText;
+        String tokenValue = compactSerialization + (illegalToken ? "AB" : "");
+        return new OAuthBearerClientInitialResponse(tokenValue, 
authorizationId, Collections.emptyMap()).toBytes();
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SASL/OAUTHBEARER client response is missing %x01 separators
> -----------------------------------------------------------
>
>                 Key: KAFKA-7182
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7182
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.0.0
>            Reporter: Ron Dagostino
>            Assignee: Ron Dagostino
>            Priority: Blocker
>              Labels: pull-request-available
>
> The format of the SASL/OAUTHBEARER client response is defined in [RFC 7628 
> Section 3.1|https://tools.ietf.org/html/rfc7628#section-3.1] as follows:
> {noformat}
>      kvsep          = %x01
>      key            = 1*(ALPHA)
>      value          = *(VCHAR / SP / HTAB / CR / LF )
>      kvpair         = key "=" value kvsep
>      client-resp    = (gs2-header kvsep *kvpair kvsep) / kvsep
> {noformat}
> ;;gs2-header = See [RFC 5801 (Section 
> 4)|https://tools.ietf.org/html/rfc5801#section-4]
> The SASL/OAUTHBEARER client response as currently implemented in 
> OAuthBearerSaslClient sends the valid gs2-header "n,," but then sends the 
> "auth" key and value immediately after it, like this:
> {code:java}
> String.format("n,,auth=Bearer %s", callback.token().value())
> {code}
> This does not conform to the specification because there is no %x01 after the 
> gs2-header, no %x01 after the auth value, and no terminating %x01.  The code 
> should instead be as follows:
> {code:java}
> String.format("n,,\u0001auth=Bearer %s\u0001\u0001", callback.token().value())
> {code}
> Similarly, the parsing of the client response in OAuthBearerSaslServer, which 
> currently allows the malformed text, must also change.
> *This should be fixed prior to the initial release of the SASL/OAUTHBEARER 
> code in 2.0.0 to prevent compatibility problems.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to