Re: [Discuss] FLIP-26 - SSL Mutual Authentication

2018-05-13 Thread Stephan Ewen
Throwing in some more food for thought:

An alternative to the above proposed separation of internal and external
SSL would be the following:

  - We separate channel encryption and authentication
  - We use one common SSL layer (internal and external) that is in both
cases only responsible for establishing an encrypted connection
  - Authentication / authorization internally is done by SASL with
username/password or shared secret.
  - Authentication externally must be through a proxy and authorization
based on a validating HTTP headers set by the proxy, as discussed above..

Advantages:
  - There is only one certificate needed, which could also be shared across
applications
  - One or two lines in the config authenticate and authorize internal
communication
  - One could possibly still fall back to the other mode by skipping

Open Questions / Disadvantages
  - Given that hostname verification during SSL handshake is not possible
in many setups, the encrypted channel is vulnerable to man-in-the-middle
attacks without mutual authentication. Not sure how serious that is,
because it would need an attacker to have compromise network nodes of the
cluster already. is that not a universal issue in the K8s world?

This is anyways a bit hypothetical, because as long as we have akka beneath
the RPC layer, we cannot go with that approach.

However, if we want to at least keep the door open towards something like
that in the future, we would need to set up configuration in such a way
that we have a "common SSL" configuration (keystore, truststore, etc.) and
internal/external options that override those. That would anyways be
helpful for backwards compatibility.

@Eron - what are your thoughts on that?








On Sun, May 13, 2018 at 1:40 AM, Stephan Ewen  wrote:

> Thank you for bringing this proposal up. It looks very good and we seem to
> be thinking along very similar lines.
>
> Below are some comments and thoughts on the FLIP.
>
> *Internal vs. External Connectivity*
>
> That is a very helpful distinction, let's build on that.
>
>   - I would suggest to treat eventually all communication coming
> potentially from users as external, meaning Client-to-Dispatcher,
> Client-to-JobManager (trigger savepoint, change parallelism, ...), Web UI,
> Queryable State.
>
>   - That leaves communication that is only between JobManager/TaskManager/
> ResourceManager/Dispatcher/HistoryServer as internal.
>
>   - I am somewhat operating under the assumption that all external
> communication will eventually be HTTP/REST. That works best with many
> setups and is the basis for using service proxies that
> handle  authentication/authorization.
>
>
> In Flink 1.5 and future versions, we have the following update there:
>
>   - Akka is now strictly internal connectivity, the client (except legacy
> client) do not use it any more.
>
>   - The Blob Server will move to purely internal connectivity in Flink
> 1.6, where a POST of a job to the Dispatcher has the jars and the JobGraph.
> That is important for Kubernetes setups, where exposing the BlobServer and
> querying the blob port causes quite some friction.
>
>   - Treating queryable state as "internal connectivity" is fine for now.
> We should treat it as "external" connectivity in the future if we move it
> to HTTP/REST.
>
>
> *Internal Connectivity and SSL Mutual Authentication*
>
> Simply activating SSL mutual authentication for the internal communication
> is a really low hanging fruit.
>
> Activating client authentication for Akka, network stack Netty (and Blob
> Server/Client in Flink 1.6) should require no change in the configurations
> with respect to Flink 1.4. All processes are, with respect to internal
> communication, simultaneously server and client endpoints. Because of that,
> they already need KeyStore and TrustStore files for SSL handshakes, where
> the TrustStore needs to trust the KeyStore Certificate.
>
> I personally favor the suggestion made to have a script that generates a
> self-signed certificate and adds it to "conf" and updates the
> configuration. That should be picked up by the Yarn and Mesos clients
> anyways.
>
>
> *External Connectivity*
>
> There is a huge surface area and I think we need to give users a way to
> plug in their own tools.
> From what I see (and after some discussions with Patrick and Gary) I think
> it makes sense to look at proxies in a broad way, similar to the approach
> Eron outlined.
>
> The basic approach could be like that:
>
>   - Everything goes through HTTPS, so the proxy can work with HTTP headers.
>   - The proxy handles authentication and possibly authorization. The proxy
> adds some header, for example a user name, a group id, an authorization
> token.
>   - Flink can configure an implementation of an 'authorizer' or validator
> on the headers to decide whether the request is valid.
>
>   - Example 1: The proxy does authentication and adds the user name /
> group as a header. The the Flink-side authorizer simply checks whe

[jira] [Created] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

2018-05-13 Thread Vishal Santoshi (JIRA)
Vishal Santoshi created FLINK-9349:
--

 Summary: KafkaConnector Exception  while fetching from multiple 
kafka topics
 Key: FLINK-9349
 URL: https://issues.apache.org/jira/browse/FLINK-9349
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.0
Reporter: Vishal Santoshi


./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
It seems the List subscribedPartitionStates was being modified when 
runFetchLoop iterated the List.
This can happen if, e.g., FlinkKafkaConsumer runs the following code 
concurrently:
                kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
 
{code:java}
 java.util.ConcurrentModificationException
at 
java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
at java.util.LinkedList$ListItr.next(LinkedList.java:888)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


request for JIRA contributor permissions

2018-05-13 Thread Yaz Sh
I would appreciate if someone grant me the JIRA contributor permissions to be 
able to assign tickets to myself that I am working on them. 
FLINK-9348 
FLINK-9343 
FLINK-9317 

Here is my jira id: 
Username:   yazdanjs
Cheers,
Yazdan

Re: request for JIRA contributor permissions

2018-05-13 Thread Chesnay Schepler

I've given you contributor permissions.

On 13.05.2018 18:25, Yaz Sh wrote:

I would appreciate if someone grant me the JIRA contributor permissions to be 
able to assign tickets to myself that I am working on them.
FLINK-9348 
FLINK-9343 
FLINK-9317 

Here is my jira id:
Username:   yazdanjs
Cheers,
Yazdan





Re: request for JIRA contributor permissions

2018-05-13 Thread Yaz Sh
Thanks! I have it now.

Cheers,
Yaz

On Sun, May 13, 2018 at 12:39 PM Chesnay Schepler 
wrote:

> I've given you contributor permissions.
>
> On 13.05.2018 18:25, Yaz Sh wrote:
> > I would appreciate if someone grant me the JIRA contributor permissions
> to be able to assign tickets to myself that I am working on them.
> > FLINK-9348 
> > FLINK-9343 
> > FLINK-9317 
> >
> > Here is my jira id:
> > Username: yazdanjs
> > Cheers,
> > Yazdan
>
>
>


Re: Elasticsearch Sink

2018-05-13 Thread Christophe Jolif
Hi Gordon,

Thanks for your feedback (and Flavio for your support!)

About your remarks/questions:

> - Maybe we can consider removing support for ES 1.x and 2.x starting from
1.6. Those are very old ES versions (considering that ES 6.x has already
been out for a while). Do you think this would simply how our base module
APIs are designed?

I would tend to say it should not change drastically the picture but would
have to look into it.

> - Wouldn't it be possible to have a REST implementation of the
`ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If so,
once we remove ES 1.x and 2.x, it might actually be possible to completely
replace the current `elasticsearch-base` module.

The High level REST API was introduced in Elasticsearch 5.6 so it is not
possible to cover 5.5 and below with it.

If all the necessary APIs are already here (to be double checked) it should
be able cover 5.6. What I noticed when working on the PRs is that 6.2 REST
Level High Level client API was improved to be closer to original APIs, if
we want to support 5.6 with it we might have to rely on APIs they already
improved since then. Not dramatic. But does it worth it knowing this would
just be giving us 5.6 not 5.2,3,4 and 5?

Now on moving forward I read:

> I'm definitely a +1 to try to move this forward with a proper fix.

and

> Working around that would require introducing a new base module
specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
nice way to go.

So if I read you correctly you are ok moving with a proper fix but it
should not introduce a new (REST based) base module? Then to be honest I'm
not sure how to proceed :) Any more specific feedback on the direction to
follow would be great!

Thanks,
--
Christophe

On Sun, May 13, 2018 at 5:39 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Christophe,
>
> Thanks for bringing this up.
>
> Yes, the main issue with the existing PRs and preventing it from moving
> forward is how it currently breaks initial assumptions of APIs in the
> `elasticsearch-base` module.
> Working around that would require introducing a new base module
> specifically for 5.3+ and 6.x+, which we've also agreed on the PR isn't a
> nice way to go.
>
> I had a quick stab at the REST API, and it seems to be promising,
> especially given that you mentioned that starting from next versions, the
> current API we use will be fully removed.
> I'm definitely a +1 to try to move this forward with a proper fix.
>
> Some other remarks / questions I have:
> - Maybe we can consider removing support for ES 1.x and 2.x starting from
> 1.6. Those are very old ES versions (considering that ES 6.x has already
> been out for a while). Do you think this would simply how our base module
> APIs are designed?
> - Wouldn't it be possible to have a REST implementation of the
> `ElasticsearchSinkCallBridge` for 5.x (covering both 5.2 and 5.3+)? If so,
> once we remove ES 1.x and 2.x, it might actually be possible to completely
> replace the current `elasticsearch-base` module.
>
> Cheers,
> Gordon
>
>
> On Sun, May 13, 2018 at 12:36 AM, Flavio Pompermaier  >
> wrote:
>
> > +1. Torally agree
> >
> > On Sat, 12 May 2018, 18:14 Christophe Jolif,  wrote:
> >
> > > Hi all,
> > >
> > > There is quite some time Flink Elasticsearch sink is broken for
> > > Elastisearch 5.x  (nearly a year):
> > >
> > > https://issues.apache.org/jira/browse/FLINK-7386
> > >
> > > And there is no support for Elasticsearch 6.x:
> > >
> > > https://issues.apache.org/jira/browse/FLINK-8101
> > >
> > > However several PRs were issued:
> > >
> > > https://github.com/apache/flink/pull/4675
> > > https://github.com/apache/flink/pull/5374
> > >
> > > I also raised the issue on the mailing list in the 1.5 timeframe:
> > >
> > > http://apache-flink-mailing-list-archive.1008284.n3.
> > > nabble.com/DISCUSS-Releasing-Flink-1-5-0-td20867.html#a20905
> > >
> > > But things are still not really moving. However this seems something
> > people
> > > are looking for, so I would really like the community to consider that
> > for
> > > 1.6.
> > >
> > > The problems I see from comments on the PRs:
> > >
> > > - getting something that is following the Flink APIs initially created
> > is a
> > > nightmare because Elastic is pretty good at breaking compatibility the
> > hard
> > > way (see in particular in the last PR the cast that have to be made to
> > get
> > > an API that works in all cases)
> > > - Elasticsearch is moving away from their "native" API Flink is using
> to
> > > the REST APIs so there is little  common ground between pre 6 and post
> 6
> > > even if Elasticsearch tried to get some level of compatibility in the
> > APIs.
> > >
> > > My fear is that by trying to kill two birds with one stone, we actually
> > get
> > > nothing done.
> > >
> > > In the hope of moving that forward I would like to propose for 1.6 a
> new
> > > Elasticsearch 6.x+ sink that would follow the design of the previous
> ones
> > > BUT only leverage the new REST

Re: [DISCUSS] Configuration for local recovery

2018-05-13 Thread Till Rohrmann
I agree with Stephan that a simple on/off configuration option for local
recovery would be easier to understand and gives more flexibility wrt
future changes.

Cheers,
Till

On Sun, May 13, 2018 at 4:00 PM, sihua zhou  wrote:

> +1 for @Stephan's proposal, it makes the out of the box experience better
> and also leaves some space for the expert.
>
> Best,
> Sihua
>
>
>
> On 05/12/2018 02:41,Stephan Ewen 
> wrote:
>
> Hi!
>
> The configuration option (in flink-conf.yaml) for local recovery is
> currently an enumeration with the values "DISABLED" and
> "ENABLE_FILE_BASED".
>
> I would suggest to change that, for a few reasons:
>
> - Having values like "ENABLE_FILE_BASED" breaks with the style of the
> other config options. Having a homogeneous feel for the configuration of
> the system is important for ease of use.
>
> - Do we need to require users to understand what file-based local
> recovery means? It might be easier for users to have an option to activate
> deactivate the mode (on by default in the future) and if we need to have
> different modes in the future, then we can have a "mode" option as an
> "expert option". That way we expose the simple fact of whether to use local
> recovery or not in a simple boolean, and hide the complex tuning part
> (which hopefully few users ever need to touch) in a separate option.
>
> - Are we sure already whether options beyond "on/off" are shared across
> state backends? For example, memory snapshot based local recovery would be
> specific to the Memoy/FsStateBackend. Persistent-volume based local
> recovery may behave differently for RocksDB and FsStateBackend.
>
>
> ==>  This config option looks like it sets things up in a tricky direction.
> We can still change it, now that we have not yet released it.
>
> Best,
> Stephan
>
>


[jira] [Created] (FLINK-9350) Parameter baseInterval has wrong check message in CheckpointCoordinator constructor

2018-05-13 Thread vinoyang (JIRA)
vinoyang created FLINK-9350:
---

 Summary: Parameter baseInterval has wrong check message in 
CheckpointCoordinator constructor
 Key: FLINK-9350
 URL: https://issues.apache.org/jira/browse/FLINK-9350
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.0, 1.3.0, 1.5.0, 1.6.0
Reporter: vinoyang
Assignee: vinoyang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9351) RM stop assigning slot to Job because the TM killed before connecting to JM successfully

2018-05-13 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9351:
-

 Summary: RM stop assigning slot to Job because the TM killed 
before connecting to JM successfully
 Key: FLINK-9351
 URL: https://issues.apache.org/jira/browse/FLINK-9351
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Sihua Zhou


The steps are the following(copied from Stephan's comments in [5931 
title|https://github.com/apache/flink/pull/5931]):

JobMaster / SlotPool requests a slot (AllocationID) from the ResourceManager
ResourceManager starts a container with a TaskManager
TaskManager registers at ResourceManager, which tells the TaskManager to push a 
slot to the JobManager.
TaskManager container is killed
The ResourceManager does not queue back the slot requests (AllocationIDs) that 
it sent to the previous TaskManager, so the requests are lost and need to time 
out before another attempt is tried.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

2018-05-13 Thread Shuyi Chen
Hi Flink devs,

In an effort to support loading external libraries and creating UDFs from
external libraries using DDL in Flink SQL, we want to use Flink’s Blob
Server to distribute the external libraries in runtime and load those
libraries into the user code classloader automatically.

However, the current [Stream]ExecutionEnvironment.registerCachedFile
interface limits only to registering executable or non-executable blobs.
It’s not possible to tell in runtime if the blob files are libraries and
should be loaded into the user code classloader in RuntimeContext.
Therefore, I want to propose to add an enum called *BlobType* explicitly to
indicate the type of the Blob file being distributed, and the following
interface in [Stream]ExecutionEnvironment to support it. In general, I
think the new BlobType information can be used by Flink runtime to
preprocess the Blob files if needed.

*/***
** Registers a file at the distributed cache under the given name. The file
will be accessible*
** from any user-defined function in the (distributed) runtime under a
local path. Files*
** may be local files (as long as all relevant workers have access to it),
or files in a distributed file system.*
** The runtime will copy the files temporarily to a local cache, if needed.*
***
** The {@link org.apache.flink.api.common.functions.RuntimeContext} can
be obtained inside UDFs via*
** {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access*
** {@link org.apache.flink.api.common.ca
che.DistributedCache} via*
** {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.*
***
** @param filePath The path of the file, as a URI (e.g. "file:///some/path"
or "hdfs://host:port/and/path")*
** @param name The name under which the file is registered.*
** @param blobType indicating the type of the Blob file*
**/*

*public void registerCachedFile(String filePath, String name,
DistributedCache.BlobType blobType) {...}*

Optionally, we can add another interface to register UDF Jars which will
use the interface above to implement.

*public void registerJarFile(String filePath, String name) {...}*

The existing interface in the following will be marked deprecated:

*public void registerCachedFile(String filePath, String name, boolean
executable) {...}*

And the following interface will be implemented using the new interface
proposed above with a EXECUTABLE BlobType:

*public void registerCachedFile(String filePath, String name) { ... }*

Thanks a lot.
Shuyi

"So you have to trust that the dots will somehow connect in your future."


Re: [DISCUSS] Configuration for local recovery

2018-05-13 Thread Shuyi Chen
+1 to the proposal. IMO, the current option "ENABLE_FILE_BASED" contains
too much implementation details and might confuse the simple users. Having
a simple on/off toggle for majority of the users and an advanced option for
the experts to do the tuning will definitely make better user experience.

On Sun, May 13, 2018 at 12:33 PM, Till Rohrmann 
wrote:

> I agree with Stephan that a simple on/off configuration option for local
> recovery would be easier to understand and gives more flexibility wrt
> future changes.
>
> Cheers,
> Till
>
> On Sun, May 13, 2018 at 4:00 PM, sihua zhou  wrote:
>
> > +1 for @Stephan's proposal, it makes the out of the box experience better
> > and also leaves some space for the expert.
> >
> > Best,
> > Sihua
> >
> >
> >
> > On 05/12/2018 02:41,Stephan Ewen 
> > wrote:
> >
> > Hi!
> >
> > The configuration option (in flink-conf.yaml) for local recovery is
> > currently an enumeration with the values "DISABLED" and
> > "ENABLE_FILE_BASED".
> >
> > I would suggest to change that, for a few reasons:
> >
> > - Having values like "ENABLE_FILE_BASED" breaks with the style of the
> > other config options. Having a homogeneous feel for the configuration of
> > the system is important for ease of use.
> >
> > - Do we need to require users to understand what file-based local
> > recovery means? It might be easier for users to have an option to
> activate
> > deactivate the mode (on by default in the future) and if we need to have
> > different modes in the future, then we can have a "mode" option as an
> > "expert option". That way we expose the simple fact of whether to use
> local
> > recovery or not in a simple boolean, and hide the complex tuning part
> > (which hopefully few users ever need to touch) in a separate option.
> >
> > - Are we sure already whether options beyond "on/off" are shared across
> > state backends? For example, memory snapshot based local recovery would
> be
> > specific to the Memoy/FsStateBackend. Persistent-volume based local
> > recovery may behave differently for RocksDB and FsStateBackend.
> >
> >
> > ==>  This config option looks like it sets things up in a tricky
> direction.
> > We can still change it, now that we have not yet released it.
> >
> > Best,
> > Stephan
> >
> >
>



-- 
"So you have to trust that the dots will somehow connect in your future."


Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

2018-05-13 Thread Bowen Li
Thank you, Fabian! I've created the FLIP-25 page

.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let *InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers
(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);
```

The biggest advantage, compared to option 1, is that it doesn't impact
existing timer-related checkpoint/savepoint, restore and migrations.

What do you think?  And, any other Flink committers want to chime in for
ideas? I've also documented the above two discussion points to the FLIP
page.

Thanks,
Bowen


On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske  wrote:

> Hi Bowen,
>
> 1. The motivation to keep the TTL logic outside of the state backend was
> mainly to avoid state backend custom implementations. If we have a generic
> approach that would work for all state backends, we could try to put the
> logic into a base class like AbstractStateBackend. After all, state cleanup
> is tightly related to the responsibilities of state backends.
> 2. -
> 3. You're right. We should first call the user code before cleaning up.
> The main problem that I see right now is that we have to distinguish
> between user and TTL timers. AFAIK, the timer service does not support
> timer tags (or another method) to distinguish timers.
>
> I've given you the permissions to create and edit wiki pages.
>
> Best, Fabian
>
> 2018-04-30 7:47 GMT+02:00 Bowen Li :
>
>> Thanks Fabian! Here're my comments inline, and let me know your thoughts.
>>
>> 1. Where should the TTL code reside? In the state backend or in the
>> operator?
>>
>> I believe TTL code should not reside in state backend, because a critical
>> design is that TTL is independent of and transparent to state backends.
>>
>> According to my current knowledge, I think it probably should live with
>> operators in flink-streaming-java.
>>
>>
>> 2. How to get notified about state accesses? I guess this depends on 1.
>>
>> You previously suggested using callbacks. I believe that's the right way
>> to do decoupling.
>>
>>
>> 3. How to avoid conflicts of TTL timers and user timers?
>>
>> User timers might always be invoked first? This is not urgent, shall we
>> bake it for more time and discuss it along the way?
>>
>>
>>
>> Besides, I don't have access to create a FLIP page under
>> https://cwiki.apache.org/confluence/display/FLINK/Flin
>> k+Improvement+Proposals. Can you grant me the proper access?
>>
>> Thanks,
>>
>> Bowen
>>
>>
>>
>>
>> On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske  wrote:
>>
>>> Hi Bowen,
>>>
>>> Thanks for updating the proposal. This looks pretty good (as I said
>>> before).
>>> There are a few areas, that are not yet fully fleshed out:
>>>
>>> 1. Where should the TTL code reside? In the state backend or in the
>>> operator?
>>> 2. How to get notified about state accesses? I guess this depends on 1.
>>> 3. How to avoid conflicts of TTL timers and user timers?
>>>
>>> @Stefan (in CC) might have some ideas on these issues as well.
>>>
>>> Cheers, Fabian
>>>
>>> 2018-04-22 21:14 GMT+02:00 Bowen :
>>>
 Hello community,

 We've come up with a completely new design for Flink state TTL, documented
 here
 ,
 and have run it by a few Flink PMC/committers.

 What do you think? We'd love to hear feedbacks from you

 Thanks,
 Bowen


 On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske 
 wrote:

> Hi Bowen,
>
> Thanks for the proposal! I think state TTL would be a great feature!
> Actually, we have implemented this for SQL / Table API [1].
> I've added a couple of comments to the design doc.
>
> In principle, I'm not sure if this functionality should be added to the
> state backends.
> We could also use the existing timer service which would have a few
> nice
> benefits (see my comments in the docs).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/streaming