[
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927451#comment-15927451
]
ASF GitHub Bot commented on FLINK-3930:
---------------------------------------
Github user WangTaoTheTonic commented on a diff in the pull request:
https://github.com/apache/flink/pull/2425#discussion_r106335560
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java
---
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CookieHandler {
+
+ public static class ClientCookieHandler extends
ChannelInboundHandlerAdapter {
+
+ private final Logger LOG =
LoggerFactory.getLogger(ClientCookieHandler.class);
+
+ private final String secureCookie;
+
+ final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+
+ public ClientCookieHandler(String secureCookie) {
+ this.secureCookie = secureCookie;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws
Exception {
+ super.channelActive(ctx);
+ LOG.debug("In channelActive method of
ClientCookieHandler");
+
+ if(this.secureCookie != null &&
this.secureCookie.length() != 0) {
+ LOG.debug("In channelActive method of
ClientCookieHandler -> sending secure cookie");
+ final ByteBuf buffer = Unpooled.buffer(4 +
this.secureCookie.getBytes(DEFAULT_CHARSET).length);
+
buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);
+
buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
+ ctx.writeAndFlush(buffer);
+ }
+ }
+ }
+
+ public static class ServerCookieDecoder extends
MessageToMessageDecoder<ByteBuf> {
+
+ private final String secureCookie;
+
+ private final List<Channel> channelList = new ArrayList<>();
+
+ private final Charset DEFAULT_CHARSET =
Charset.forName("utf-8");
+
+ private final Logger LOG =
LoggerFactory.getLogger(ServerCookieDecoder.class);
+
+ public ServerCookieDecoder(String secureCookie) {
+ this.secureCookie = secureCookie;
+ }
+
+ /**
+ * Decode from one message to an other. This method will be
called for each written message that can be handled
+ * by this encoder.
+ *
+ * @param ctx the {@link ChannelHandlerContext} which this
{@link MessageToMessageDecoder} belongs to
+ * @param msg the message to decode to an other one
+ * @param out the {@link List} to which decoded messages should
be added
+ * @throws Exception is thrown if an error accour
+ */
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
List<Object> out) throws Exception {
+
+ LOG.debug("ChannelHandlerContext name: {}, channel:
{}", ctx.name(), ctx.channel());
+
+ if(secureCookie == null || secureCookie.length() == 0) {
+ LOG.debug("Not validating secure cookie since
the server configuration is not enabled to use cookie");
+ return;
+ }
+
+ LOG.debug("Going to decode the secure cookie passed by
the remote client");
+
+ if(channelList.contains(ctx.channel())) {
+ LOG.debug("Channel: {} already authorized",
ctx.channel());
+ return;
+ }
+
+ //read cookie based on the cookie length passed
+ int cookieLength = msg.readInt();
+ if(cookieLength !=
secureCookie.getBytes(DEFAULT_CHARSET).length) {
+ ctx.channel().close();
+ String message = "Cookie length does not match
with source cookie. Invalid secure cookie passed.";
+ throw new IllegalStateException(message);
+ }
+
+ //read only if cookie length is greater than zero
+ if(cookieLength > 0) {
+
+ final byte[] buffer = new
byte[secureCookie.getBytes(DEFAULT_CHARSET).length];
+ msg.readBytes(buffer, 0, cookieLength);
+
+
if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) {
+ LOG.error("Secure cookie from the
client is not matching with the server's identity");
+ throw new
IllegalStateException("Invalid secure cookie passed.");
+ }
+
+ LOG.info("Secure cookie validation passed");
+
+ channelList.add(ctx.channel());
--- End diff --
It seems like we only have channels added, but not removed. Would this list
increase indefinitely?
> Implement Service-Level Authorization
> -------------------------------------
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
> Issue Type: New Feature
> Components: Security
> Reporter: Eron Wright
> Assignee: Vijay Srinivasaraghavan
> Labels: security
> Original Estimate: 672h
> Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
> design doc._
> Service-level authorization is the initial authorization mechanism to ensure
> clients (or servers) connecting to the Flink cluster are authorized to do so.
> The purpose is to prevent a cluster from being used by an unauthorized
> user, whether to execute jobs, disrupt cluster functionality, or gain access
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)