Re: Few question about upgrade from 1.4 to 1.5 flink ( some very basic )

2018-06-28 Thread Christophe Jolif
Chesnay,

Do you have rough idea of the 1.5.1 timeline?

Thanks,
--
Christophe

On Mon, Jun 25, 2018 at 4:22 PM, Chesnay Schepler 
wrote:

> The watermark issue is know and will be fixed in 1.5.1
>
>
> On 25.06.2018 15:03, Vishal Santoshi wrote:
>
> Thank you
>
> One addition
>
> I do not see WM info on the UI  ( Attached )
>
> Is this a know issue. The same pipe on our production has the WM ( In fact
> never had an issue with  Watermarks not appearing ) . Am I missing
> something ?
>
> On Mon, Jun 25, 2018 at 4:15 AM, Fabian Hueske  wrote:
>
>> Hi Vishal,
>>
>> 1. I don't think a rolling update is possible. Flink 1.5.0 changed the
>> process orchestration and how they communicate. IMO, the way to go is to
>> start a Flink 1.5.0 cluster, take a savepoint on the running job, start
>> from the savepoint on the new cluster and shut the old job down.
>> 2. Savepoints should be compatible.
>> 3. You can keep the slot configuration as before.
>> 4. As I said before, mixing 1.5 and 1.4 processes does not work (or at
>> least, it was not considered a design goal and nobody paid attention that
>> it is possible).
>>
>> Best, Fabian
>>
>>
>> 2018-06-23 13:38 GMT+02:00 Vishal Santoshi :
>>
>>>
>>> 1.
>>> Can or has any one  done  a rolling upgrade from 1.4 to 1.5 ?  I am not
>>> sure we can. It seems that JM cannot recover jobs with this exception
>>>
>>> Caused by: java.io.InvalidClassException: org.apache.flink.runtime.jobgr
>>> aph.tasks.CheckpointCoordinatorConfiguration; local class incompatible:
>>> stream classdesc serialVersionUID = -647384516034982626, local class
>>> serialVersionUID = 2
>>>
>>>
>>>
>>>
>>> 2.
>>> Does SP on 1.4, resume on 1.5 ( pretty basic but no harm asking ) ?
>>>
>>>
>>>
>>> 3.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/
>>> release-notes/flink-1.5.html#update-configuration-for-rework
>>> ed-job-deployment The taskmanager.numberOfTaskSlots: What would be the
>>> desired setting in a stand alone ( non mesos/yarn ) cluster ?
>>>
>>>
>>> 4. I suspend all jobs and establish 1.5 on the JM ( the TMs are still
>>> running with 1.4 ) . JM refuse to start  with
>>>
>>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: 2018-06-23
>>> 11:34:23 ERROR JobManager:116 - Failed to recover job
>>> 454cd84a519f3b50e88bcb378d8a1330.
>>>
>>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]:
>>> java.lang.InstantiationError: org.apache.flink.runtime.blob.BlobKey
>>>
>>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: at
>>> sun.reflect.GeneratedSerializationConstructorAccessor51.newInstance(Unknown
>>> Source)
>>>
>>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: at
>>> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>
>>> Jun 23 07:34:23 flink-ad21ac07.bf2.tumblr.net docker[3395]: at
>>> java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:1079)
>>>
>>> Jun
>>> .
>>>
>>>
>>>
>>> Any feedback would be highly appreciated...
>>>
>>>
>>
>
>


-- 
Christophe


"keyed" aggregation

2018-01-05 Thread Christophe Jolif
Hi all,

I'm sourcing from a Kafka topic, using the key of the Kafka message to key
the stream, then doing some aggregation on the keyed stream.

Now I want to sink back to a different Kafka topic but re-using the same
key. The thing is that my aggregation "lost" the key. Obviously I can make
sure my aggregation function keeps the key, but I find that a bit strange
as it does not relate to aggregation.

Is there a best practice in that domain? How should the key be carried when
moving from a kafka source to sink but doing some aggregation along the way?

Thanks,
-- 
Christophe


event ordering

2018-01-09 Thread Christophe Jolif
Hi everyone,

Let's imagine I have a stream of events coming a bit like this:

{ id: "1", value: 1, timestamp: 1 }
{ id: "2", value: 2, timestamp: 1 }
{ id: "1", value: 4, timestamp: 3 }
{ id: "1", value: 5, timestamp: 2 }
{ id: "2", value: 5, timestamp: 3 }
...

As you can see  with the non monotonically increasing timestamps, for
various reasons, events can be slightly "un-ordered"

Now I want to use Flink to process this stream, to compute by id (my key)
the latest value and update it in a DB. But obviously that latest value
must reflect the original time stamp and not the processing time stamp.

I've seen that Flink can deal with event-time processing, in the sense that
if I need to do a windowed operation I can ensure an event will be assign
to the "correct" window.

But here the use-case seems slightly different. How would you proceed to do
that in Flink?

Thanks!
-- 
Christophe


State backend questions

2018-01-16 Thread Christophe Jolif
Hi all,

At first my state should not be "that" big and fit in memory, so
FsStateBackend could be a solution for me. However moving forward I
envision more features and more users and the state growing. With that in
mind RocksDBStateBackend might be the solution.

Is there an easy "upgrade" path from one to another? In other words is
there an easy path to "move" the state from one backend to another one and
restart the job from there if the need arise or should I definitely plan
long ahead and use RocksDB right away if I don't want to get into trouble?

If yes, how much configuration is available at Flink level to configure
memory used by RocksDB to cache data without having to go back to disk so
that I don't penalize too much the current use-cases?

Thanks,
-- 
Christophe


Re: ElasticsearchSink in Flink 1.4.0 with Elasticsearch 5.2+

2018-01-25 Thread Christophe Jolif
Hi Fabian,

FYI I rebased the branch and tested it and it worked OK on a sample.

--
Christophe

On Mon, Jan 22, 2018 at 2:53 PM, Fabian Hueske  wrote:

> Hi Adrian,
>
> thanks for raising this issue again.
> I agree, we should add support for newer ES versions.
> I've added 1.5.0 as target release for FLINK-7386 and bumped the priority
> up.
>
> In the meantime, you can try Flavio's approach (he responded to the mail
> thread you linked) and fork and fix the connector.
> You could also try the PR for FLINK-7386 [1] and comment on the pull
> request whether it works for you or not.
>
> Best, Fabian
>
> [1] https://github.com/apache/flink/pull/4675
>
>
> 2018-01-22 13:54 GMT+01:00 Adrian Vasiliu :
>
>> Hello,
>>
>> With a local run of Flink 1.4.0, ElasticsearchSink fails for me with a
>> local run of Elasticsearch 5.6.4 and 5.2.1, while the same code (with
>> adjusted versions of dependencies) works fine with Elasticsearch 2.x (tried
>> 2.4.6).
>> I get:
>> java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.
>> BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)
>> Lorg/elasticsearch/action/bulk/BulkProcessor
>>
>> (env: Mac OSX 10.13.2, oracle jdk 1.8.0_112)
>>
>> Now, this looks similar to the issue referred in
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Elasticsearch-Sink-Error-td15246.html
>> which points to
>> "Flink Elasticsearch 5 connector is not compatible with Elasticsearch
>> 5.2+ client"
>> https://issues.apache.org/jira/browse/FLINK-7386
>>
>> Side-remark: when trying with Elasticsearch 5.6.4 via a docker container,
>> for some reason the error I get is different: "RuntimeException: Client is
>> not connected to any Elasticsearch nodes!" (while Elasticsearch 2.4.6 works
>> fine via docker too).
>>
>> FLINK-7386  being
>> pending since August 2017, would it mean that there is nowadays still no
>> way to make Flink 1.4.0's sink work with Elasticsearch 5.2+? My use-case
>> involves Compose for Elasticsearch 5.6.3, shared by different apps, and I
>> can't really downgrade its Elasticsearch version.
>> Or would there be signs it will be fixed in Flink 1.5.0?
>>
>> Any lights welcome.
>>
>> Thanks,
>> Adrian
>>
>>
>> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
>> Compagnie IBM France
>> Siège Social : 17 avenue de l'Europe
>> ,
>> 92275 Bois-Colombes Cedex
>> RCS Nanterre 552 118 465
>> Forme Sociale : S.A.S.
>> Capital Social : 657.364.587 €
>> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A
>>
>
>


Re: ElasticsearchSink in Flink 1.4.0 with Elasticsearch 5.2+

2018-01-26 Thread Christophe Jolif
Ok, I got it "done". I have a PR for ES5.3 (FLINK-7386) just rebasing the
original one that was never merged (#4675). And added ES 6.X through
RestHighLevelClient on top (FLINK-8101).  This is:
https://github.com/apache/flink/pull/5374. And believe it or not but
someone else submitted a PR for those two as well today! See:
https://github.com/apache/flink/pull/5372. So looks like there is some
traction to get it done? If would really be good if a committer could look
at those PRs and let us know which one is closer to get merge so we focus
on it instead of duplicating work ;)

Thanks,
--
Christophe

On Fri, Jan 26, 2018 at 1:46 PM, Christophe Jolif  wrote:

> Fabien,
>
> Unfortunately I need more than that :) But this PR is definitely a first
> step.
>
> My real need is Elasticsearch 6.x support through RestHighLevel client.
> FYI Elastic has deprecated the TransportClient that Flink connector
> leverages and it will be removed in Elasticsearch 8 (presumably ~1.5 years
> from now at their current release pace). Also TransportClient is not
> working with hosted version of Elasticsearch like Compose.io. So I think it
> makes a lot of sense to start introduce a sink based on RestHighLevel
> client. I'll be looking at creating a PR for that.
>
> Thanks,
>
> --
> Christophe
>
> On Fri, Jan 26, 2018 at 10:11 AM, Fabian Hueske  wrote:
>
>> Great, thank you!
>> Hopefully, this pushes the PR forward.
>>
>> Thanks, Fabian
>>
>> 2018-01-25 22:30 GMT+01:00 Christophe Jolif :
>>
>>> Hi Fabian,
>>>
>>> FYI I rebased the branch and tested it and it worked OK on a sample.
>>>
>>> --
>>> Christophe
>>>
>>> On Mon, Jan 22, 2018 at 2:53 PM, Fabian Hueske 
>>> wrote:
>>>
>>>> Hi Adrian,
>>>>
>>>> thanks for raising this issue again.
>>>> I agree, we should add support for newer ES versions.
>>>> I've added 1.5.0 as target release for FLINK-7386 and bumped the
>>>> priority up.
>>>>
>>>> In the meantime, you can try Flavio's approach (he responded to the
>>>> mail thread you linked) and fork and fix the connector.
>>>> You could also try the PR for FLINK-7386 [1] and comment on the pull
>>>> request whether it works for you or not.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://github.com/apache/flink/pull/4675
>>>>
>>>>
>>>> 2018-01-22 13:54 GMT+01:00 Adrian Vasiliu :
>>>>
>>>>> Hello,
>>>>>
>>>>> With a local run of Flink 1.4.0, ElasticsearchSink fails for me with
>>>>> a local run of Elasticsearch 5.6.4 and 5.2.1, while the same code
>>>>> (with adjusted versions of dependencies) works fine with Elasticsearch 2.x
>>>>> (tried 2.4.6).
>>>>> I get:
>>>>> java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.
>>>>> BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)L
>>>>> org/elasticsearch/action/bulk/BulkProcessor
>>>>>
>>>>> (env: Mac OSX 10.13.2, oracle jdk 1.8.0_112)
>>>>>
>>>>> Now, this looks similar to the issue referred in
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Elasticsearch-Sink-Error-td15246.html
>>>>> which points to
>>>>> "Flink Elasticsearch 5 connector is not compatible with Elasticsearch
>>>>> 5.2+ client"
>>>>> https://issues.apache.org/jira/browse/FLINK-7386
>>>>>
>>>>> Side-remark: when trying with Elasticsearch 5.6.4 via a docker
>>>>> container, for some reason the error I get is different: 
>>>>> "RuntimeException:
>>>>> Client is not connected to any Elasticsearch nodes!" (while Elasticsearch
>>>>> 2.4.6 works fine via docker too).
>>>>>
>>>>> FLINK-7386 <https://issues.apache.org/jira/browse/FLINK-7386> being
>>>>> pending since August 2017, would it mean that there is nowadays still no
>>>>> way to make Flink 1.4.0's sink work with Elasticsearch 5.2+? My use-case
>>>>> involves Compose for Elasticsearch 5.6.3, shared by different apps, and I
>>>>> can't really downgrade its Elasticsearch version.
>>>>> Or would there be signs it will be fixed in Flink 1.5.0?
>>>>>
>>>>> Any lights welcome.
>>>>>
>>>>> Thanks,
>>>>> Adrian
>>>>>
>>>>>
>>>>> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
>>>>> Compagnie IBM France
>>>>> Siège Social : 17 avenue de l'Europe
>>>>> <https://maps.google.com/?q=17+avenue+de+l'Europe&entry=gmail&source=g>,
>>>>> 92275 Bois-Colombes Cedex
>>>>> RCS Nanterre 552 118 465
>>>>> Forme Sociale : S.A.S.
>>>>> Capital Social : 657.364.587 €
>>>>> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A
>>>>>
>>>>
>>>>
>>>
>>>
>>
>
>
> --
> Christophe
>



-- 
Christophe


Re: ElasticsearchSink in Flink 1.4.0 with Elasticsearch 5.2+

2018-01-29 Thread Christophe Jolif
Thanks a lot. Is there any timeline for 1.5 by the way?

--
Christophe

On Mon, Jan 29, 2018 at 11:36 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Christophe,
>
> Thanks a lot for the contribution! I’ll add reviewing the PR to my backlog.
> I would like / will try to take a look at the PR by the end of this week,
> after some 1.4.1 blockers which I’m still busy with.
>
> Cheers,
> Gordon
>
>
> On 29 January 2018 at 9:25:27 AM, Fabian Hueske (fhue...@gmail.com) wrote:
>
> Hi Christophe,
>
> great! Thanks for your contribution.
> I'm quite busy right now, but I agree that we should have support for ES
> 5.3 and Es 6.x for the next minor release 1.5.
>
> Best,
> Fabian
>
>
> 2018-01-26 23:09 GMT+01:00 Christophe Jolif :
>
>> Ok, I got it "done". I have a PR for ES5.3 (FLINK-7386) just rebasing
>> the original one that was never merged (#4675). And added ES 6.X through
>> RestHighLevelClient on top (FLINK-8101).  This is:
>> https://github.com/apache/flink/pull/5374. And believe it or not but
>> someone else submitted a PR for those two as well today! See:
>> https://github.com/apache/flink/pull/5372. So looks like there is some
>> traction to get it done? If would really be good if a committer could look
>> at those PRs and let us know which one is closer to get merge so we focus
>> on it instead of duplicating work ;)
>>
>> Thanks,
>> --
>> Christophe
>>
>> On Fri, Jan 26, 2018 at 1:46 PM, Christophe Jolif 
>> wrote:
>>
>>> Fabien,
>>>
>>> Unfortunately I need more than that :) But this PR is definitely a first
>>> step.
>>>
>>> My real need is Elasticsearch 6.x support through RestHighLevel client.
>>> FYI Elastic has deprecated the TransportClient that Flink connector
>>> leverages and it will be removed in Elasticsearch 8 (presumably ~1.5 years
>>> from now at their current release pace). Also TransportClient is not
>>> working with hosted version of Elasticsearch like Compose.io. So I think it
>>> makes a lot of sense to start introduce a sink based on RestHighLevel
>>> client. I'll be looking at creating a PR for that.
>>>
>>> Thanks,
>>>
>>> --
>>> Christophe
>>>
>>> On Fri, Jan 26, 2018 at 10:11 AM, Fabian Hueske 
>>> wrote:
>>>
>>>> Great, thank you!
>>>> Hopefully, this pushes the PR forward.
>>>>
>>>> Thanks, Fabian
>>>>
>>>> 2018-01-25 22:30 GMT+01:00 Christophe Jolif :
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> FYI I rebased the branch and tested it and it worked OK on a sample.
>>>>>
>>>>> --
>>>>> Christophe
>>>>>
>>>>> On Mon, Jan 22, 2018 at 2:53 PM, Fabian Hueske 
>>>>> wrote:
>>>>>
>>>>>> Hi Adrian,
>>>>>>
>>>>>> thanks for raising this issue again.
>>>>>> I agree, we should add support for newer ES versions.
>>>>>> I've added 1.5.0 as target release for FLINK-7386 and bumped the
>>>>>> priority up.
>>>>>>
>>>>>> In the meantime, you can try Flavio's approach (he responded to the
>>>>>> mail thread you linked) and fork and fix the connector.
>>>>>> You could also try the PR for FLINK-7386 [1] and comment on the pull
>>>>>> request whether it works for you or not.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1] https://github.com/apache/flink/pull/4675
>>>>>>
>>>>>>
>>>>>> 2018-01-22 13:54 GMT+01:00 Adrian Vasiliu :
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> With a local run of Flink 1.4.0, ElasticsearchSink fails for me with
>>>>>>> a local run of Elasticsearch 5.6.4 and 5.2.1, while the same code
>>>>>>> (with adjusted versions of dependencies) works fine with Elasticsearch 
>>>>>>> 2.x
>>>>>>> (tried 2.4.6).
>>>>>>> I get:
>>>>>>> java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.
>>>>>>> BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)L
>>>>>>> org/elasticsearch/action/bulk/BulkProcessor
>>>>>>>
>>>>>>> (env: Mac OSX 10.13.2, oracle jdk 1.8.0_112)
>>>>>>&

Re: ElasticsearchSink in Flink 1.4.0 with Elasticsearch 5.2+

2018-01-30 Thread Christophe Jolif
Thanks Chesnay, so if I read it well it shouldn't be too long (at least
less time than between regular 1.x releases).

On Mon, Jan 29, 2018 at 4:24 PM, Chesnay Schepler 
wrote:

> As of right now there is no specific date, see also
> https://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html
> .
>
>
> On 29.01.2018 13:41, Christophe Jolif wrote:
>
> Thanks a lot. Is there any timeline for 1.5 by the way?
>
> --
> Christophe
>
> On Mon, Jan 29, 2018 at 11:36 AM, Tzu-Li (Gordon) Tai  > wrote:
>
>> Hi Christophe,
>>
>> Thanks a lot for the contribution! I’ll add reviewing the PR to my
>> backlog.
>> I would like / will try to take a look at the PR by the end of this week,
>> after some 1.4.1 blockers which I’m still busy with.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 29 January 2018 at 9:25:27 AM, Fabian Hueske (fhue...@gmail.com)
>> wrote:
>>
>> Hi Christophe,
>>
>> great! Thanks for your contribution.
>> I'm quite busy right now, but I agree that we should have support for ES
>> 5.3 and Es 6.x for the next minor release 1.5.
>>
>> Best,
>> Fabian
>>
>>
>> 2018-01-26 23:09 GMT+01:00 Christophe Jolif :
>>
>>> Ok, I got it "done". I have a PR for ES5.3 (FLINK-7386) just rebasing
>>> the original one that was never merged (#4675). And added ES 6.X through
>>> RestHighLevelClient on top (FLINK-8101).  This is:
>>> https://github.com/apache/flink/pull/5374. And believe it or not but
>>> someone else submitted a PR for those two as well today! See:
>>> https://github.com/apache/flink/pull/5372. So looks like there is some
>>> traction to get it done? If would really be good if a committer could look
>>> at those PRs and let us know which one is closer to get merge so we focus
>>> on it instead of duplicating work ;)
>>>
>>> Thanks,
>>> --
>>> Christophe
>>>
>>> On Fri, Jan 26, 2018 at 1:46 PM, Christophe Jolif 
>>> wrote:
>>>
>>>> Fabien,
>>>>
>>>> Unfortunately I need more than that :) But this PR is definitely a
>>>> first step.
>>>>
>>>> My real need is Elasticsearch 6.x support through RestHighLevel client.
>>>> FYI Elastic has deprecated the TransportClient that Flink connector
>>>> leverages and it will be removed in Elasticsearch 8 (presumably ~1.5 years
>>>> from now at their current release pace). Also TransportClient is not
>>>> working with hosted version of Elasticsearch like Compose.io. So I think it
>>>> makes a lot of sense to start introduce a sink based on RestHighLevel
>>>> client. I'll be looking at creating a PR for that.
>>>>
>>>> Thanks,
>>>>
>>>> --
>>>> Christophe
>>>>
>>>> On Fri, Jan 26, 2018 at 10:11 AM, Fabian Hueske 
>>>> wrote:
>>>>
>>>>> Great, thank you!
>>>>> Hopefully, this pushes the PR forward.
>>>>>
>>>>> Thanks, Fabian
>>>>>
>>>>> 2018-01-25 22:30 GMT+01:00 Christophe Jolif :
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> FYI I rebased the branch and tested it and it worked OK on a sample.
>>>>>>
>>>>>> --
>>>>>> Christophe
>>>>>>
>>>>>> On Mon, Jan 22, 2018 at 2:53 PM, Fabian Hueske 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Adrian,
>>>>>>>
>>>>>>> thanks for raising this issue again.
>>>>>>> I agree, we should add support for newer ES versions.
>>>>>>> I've added 1.5.0 as target release for FLINK-7386 and bumped the
>>>>>>> priority up.
>>>>>>>
>>>>>>> In the meantime, you can try Flavio's approach (he responded to the
>>>>>>> mail thread you linked) and fork and fix the connector.
>>>>>>> You could also try the PR for FLINK-7386 [1] and comment on the pull
>>>>>>> request whether it works for you or not.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> [1] https://github.com/apache/flink/pull/4675
>>>>>>>
>>>>>>>
>>>>>>> 2018-01-22 13:54 GMT+01:00 Adrian Vasiliu :
>>>>>>>
>>

Re: Flink on K8s job submission best practices

2018-02-01 Thread Christophe Jolif
Hi Maximilian,

Coming back on this as we have similar challenges.

I was leaning towards 3. But then I read you and figured I might have
missed something ;)

We agree 3 is not idiomatic and creates a "detached job" but in a lack of a
proper solution I can live with that.

We also agree there is not risk to get the (Flink) job resubmitted while it
should not because of the (Kubernetes) Job nature.

What I missed and I would like to better understand is the risk of the
(Flink) job not being restarted. If I'm not mistaken (and if the job is
probably configured) in case of (Flink) job failure Flink will restart the
(Flink) job itself so you don't have anything else to do. What type of
failures do you have in mind when saying 3 will not see the (Flink) job
restarted?

Thanks!!
--
Christophe


On Fri, Dec 22, 2017 at 2:56 PM, Maximilian Bode <
maximilian.b...@tngtech.com> wrote:

> Hi everyone,
>
> We are beginning to run Flink on K8s and found the basic templates [1] as
> well as the example Helm chart [2] very helpful. Also the discussion about
> JobManager HA [3] and Patrick's talk [4] was very interesting. All in all
> it is delightful how easy everything can be set up and works out of the box.
>
> Now we are looking for some best practices as far as job submission is
> concerned. Having played with a few alternative options, we would like to
> get some input on what other people are using. What we have looked into so
> far:
>
>1. Packaging the job jar into e.g. the JM image and submitting
>manually (either from the UI or via `kubectl exec`). Ideally, we would like
>to establish a more automated setup, preferably using native Kubernetes
>objects.
>2. Building a separate image whose responsibility it is to submit the
>job and keep it running. This could either use the API [5] or share the
>Flink config so that CLI calls connect to the existing cluster. When
>scheduling this as a Kubernetes deployment [6] and e.g. the node running
>this client pod fails, one ends up with duplicate jobs. One could build
>custom logic (poll if job exists, only submit if it does not), but this
>seems fragile and it is conceivable that this could lead to weird timing
>issues like different containers trying to submit at the same time. One
>solution would be to implement an atomic submit-if-not-exists, but I
>suppose this would need to involve some level of locking on the JM.
>3. Schedule the client container from the step above as a Kubernetes
>job [7]. This seems somewhat unidiomatic for streaming jobs that are not
>expected to terminate, but one would not have to deal with duplicate Flink
>jobs. In the failure scenario described above, the (Flink) job would still
>be running on the Flink cluster, there just would not be a client attached
>to it (as the Kubernetes job would not be restarted). On the other hand,
>should the (Flink) job fail for some reason, there is no fashion of
>restarting it automatically.
>
> Are we missing something obvious? Has the Flink community come up with a
> default way of submitting Flink jobs on Kubernetes yet or are there people
> willing to share their experiences?
>
> Best regards and happy holidays,
> Max
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/deployment/kubernetes.html
> [2] https://github.com/docker-flink/examples/tree/master/helm/flink
> [3] http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-HA-with-Kubernetes-without-Zookeeper-td15033.html
> [4] https://www.youtube.com/watch?v=w721NI-mtAA Slides:
> https://www.slideshare.net/FlinkForward/flink-forward-
> berlin-2017-patrick-lucas-flink-in-containerland
> [5] https://ci.apache.org/projects/flink/flink-docs-
> master/monitoring/rest_api.html#submitting-programs
> [6] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
> [7] https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-
> completion/
> --
> Maximilian Bode * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


RocksDB / checkpoint questions

2018-02-02 Thread Christophe Jolif
If I understand well RocksDB is using two disk, the Task Manager local disk
for "local storage" of the state and the distributed disk for checkpointing.

Two questions:

- if I have 3 TaskManager I should expect more or less (depending on how
the tasks are balanced) to find a third of my overall state stored on disk
on each of this TaskManager node?

- if the local node/disk fails I will get the state back from the
distributed disk and things will start again and all is fine. However what
happens if the distributed disk fails? Will Flink continue processing
waiting for me to mount a new distributed disk? Or will it stop? May I lose
data/reprocess things under that condition?

-- 
Christophe Jolif


Re: RocksDB / checkpoint questions

2018-02-03 Thread Christophe Jolif
Thanks for sharing Kien. Sounds like the logical behavior but good to hear
it is confirmed by your experience.

--
Christophe

On Sat, Feb 3, 2018 at 7:25 AM, Kien Truong  wrote:

>
>
> Sent from TypeApp <http://www.typeapp.com/r?b=11979>
> On Feb 3, 2018, at 10:48, Kien Truong  wrote:
>>
>> Hi,
>> Speaking from my experience, if the distributed disk fail, the checkpoint
>> will fail as well, but the job will continue running. The checkpoint
>> scheduler will keep running, so the first scheduled checkpoint after you
>> repair your disk should succeed.
>>
>> Of course, if you also write to the distributed disk inside your job,
>> then your job may crash too, but this is unrelated to the checkpoint
>> process.
>>
>> Best regards,
>> Kien
>>
>> Sent from TypeApp <http://www.typeapp.com/r?b=11979>
>> On Feb 2, 2018, at 23:30, Christophe Jolif < cjo...@gmail.com> wrote:
>>>
>>> If I understand well RocksDB is using two disk, the Task Manager local
>>> disk for "local storage" of the state and the distributed disk for
>>> checkpointing.
>>>
>>> Two questions:
>>>
>>> - if I have 3 TaskManager I should expect more or less (depending on how
>>> the tasks are balanced) to find a third of my overall state stored on disk
>>> on each of this TaskManager node?
>>>
>>> - if the local node/disk fails I will get the state back from the
>>> distributed disk and things will start again and all is fine. However what
>>> happens if the distributed disk fails? Will Flink continue processing
>>> waiting for me to mount a new distributed disk? Or will it stop? May I lose
>>> data/reprocess things under that condition?
>>>
>>> --
>>> Christophe Jolif
>>>
>>


Kafka and parallelism

2018-02-03 Thread Christophe Jolif
Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink
job parallelism to the number of partions or will it adjust automatically
accordingly? In other word if I don't call setParallelism will get 1 or the
number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single
topic and the number of actual topic (and so partitions) behind the pattern
can change so it is not possible to know ahead ot time how many partitions
I will get.

Thanks!
-- 
Christophe


Re: Kafka and parallelism

2018-02-05 Thread Christophe Jolif
Thanks. It helps indeed.

I guess the last point it does not explicitly answer is "does just creating
a kafka consumer reading from multiple partition set the parallelism to the
number of partitions". But reading between the lines I think this answer is
clearly no. You have to set your parallelism yourself and then it will
round robin between them.

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Christophe,
>
> You can set the parallelism of the FlinkKafkaConsumer independently of the
> total number of Kafka partitions (across all subscribed streams, including
> newly created streams that match a subscribed pattern).
>
> The consumer deterministically assigns each partition to a single consumer
> subtask, in a round-robin fashion.
> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6
> partitions, each consumer subtask will be assigned 3 partitions.
>
> As for topic pattern subscription, FlinkKafkaConsumers starting from
> version 1.4.0 support this feature. You can take a look at [1] on how to do
> that.
>
> Hope this helps!
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#kafka-consumers-
> topic-and-partition-discovery
>
> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjo...@gmail.com)
> wrote:
>
> Hi,
>
> If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink
> job parallelism to the number of partions or will it adjust automatically
> accordingly? In other word if I don't call setParallelism will get 1 or the
> number of partitions?
>
> The reason I'm asking is that I'm listening to a topic pattern not a
> single topic and the number of actual topic (and so partitions) behind the
> pattern can change so it is not possible to know ahead ot time how many
> partitions I will get.
>
> Thanks!
> --
> Christophe
>
>


Re: RocksDB / checkpoint questions

2018-02-05 Thread Christophe Jolif
Thanks a lot for the details Steffan.

--
Christophe

On Mon, Feb 5, 2018 at 11:31 AM, Stefan Richter  wrote:

> Hi,
>
> you are correct that RocksDB has a „working directory“ on local disk and
> checkpoints + savepoints go to a distributed filesystem.
>
> - if I have 3 TaskManager I should expect more or less (depending on how
> the tasks are balanced) to find a third of my overall state stored on disk
> on each of this TaskManager node?
>
> This question is not so much about RocksDB, but more about Flink’s keyBy
> partitioning, i.e. how work is distributed between the parallel instances
> of an operator, and the answer is that it will apply hash partitioning
> based on your event keys to distribute the keys (and their state) between
> your 3 nodes. If your key space is very skewed or there are heavy hitter
> keys with much larger state than most other keys, this can lead to some
> imbalances. If your keys are not skewed and have similar state size, every
> node should have roughly the same state size.
>
> - if the local node/disk fails I will get the state back from the
> distributed disk and things will start again and all is fine. However what
> happens if the distributed disk fails? Will Flink continue processing
> waiting for me to mount a new distributed disk? Or will it stop? May I lose
> data/reprocess things under that condition?
>
> Starting from Flink 1.5, this is configurable, please see
> https://issues.apache.org/jira/browse/FLINK-4809 and htt
> ps://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/
> checkpointing.html in section „*fail/continue task on checkpoint errors*“.
> If you tolerate checkpoint failures, you will not lose data: if your job
> fails, it can recover from the latest successful checkpoint once your DFS
> is again available If the job does not fail, it will eventually make
> another checkpoint once DFS is back. If you do not tolerate checkpoint
> failures, your job will simply fail and restart from the last successful
> checkpoint and recover once DFS is back.
>
> Best,
> Stefan
>
> Am 03.02.2018 um 17:45 schrieb Christophe Jolif :
>
> Thanks for sharing Kien. Sounds like the logical behavior but good to hear
> it is confirmed by your experience.
>
> --
> Christophe
>
> On Sat, Feb 3, 2018 at 7:25 AM, Kien Truong 
> wrote:
>
>>
>>
>> Sent from TypeApp <http://www.typeapp.com/r?b=11979>
>> On Feb 3, 2018, at 10:48, Kien Truong  wrote:
>>>
>>> Hi,
>>> Speaking from my experience, if the distributed disk fail, the
>>> checkpoint will fail as well, but the job will continue running. The
>>> checkpoint scheduler will keep running, so the first scheduled checkpoint
>>> after you repair your disk should succeed.
>>>
>>> Of course, if you also write to the distributed disk inside your job,
>>> then your job may crash too, but this is unrelated to the checkpoint
>>> process.
>>>
>>> Best regards,
>>> Kien
>>>
>>> Sent from TypeApp <http://www.typeapp.com/r?b=11979>
>>> On Feb 2, 2018, at 23:30, Christophe Jolif < cjo...@gmail.com> wrote:
>>>>
>>>> If I understand well RocksDB is using two disk, the Task Manager local
>>>> disk for "local storage" of the state and the distributed disk for
>>>> checkpointing.
>>>>
>>>> Two questions:
>>>>
>>>> - if I have 3 TaskManager I should expect more or less (depending on
>>>> how the tasks are balanced) to find a third of my overall state stored on
>>>> disk on each of this TaskManager node?
>>>>
>>>> - if the local node/disk fails I will get the state back from the
>>>> distributed disk and things will start again and all is fine. However what
>>>> happens if the distributed disk fails? Will Flink continue processing
>>>> waiting for me to mount a new distributed disk? Or will it stop? May I lose
>>>> data/reprocess things under that condition?
>>>>
>>>> --
>>>> Christophe Jolif
>>>>
>>>
>


ML and Stream

2018-02-05 Thread Christophe Jolif
Hi all,

Sorry, this is me again with another question.

Maybe I did not search deep enough, but it seems the FlinkML API is still
pure batch.

If I read
https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap
it seems there was the intend to "exploit the streaming nature of Flink,
and provide functionality designed specifically for data streams" but from
my external point of view, I don't see much happening here. Is there work
in progress towards that?

I would personally see two use-cases around streaming, first one around
updating an existing model that was build in batch, second one would be
triggering prediction not through a batch job but in a stream job.

Are these things that are in the works? or maybe already feasible despite
the API looking like purely batch branded?

Thanks,
-- 
Christophe


Re: ML and Stream

2018-02-05 Thread Christophe Jolif
Fabian,

Ok thanks for the update. Meanwhile I was looking at how I could still
leverage current FlinkML API, but as far as I can see, it misses the
ability of being able to persist its own models? So even for pure batch it
prevents running your (once built) model in several jobs? Or am I missing
something?

I suspect I should not be the only one that would love to apply machine
learning as part of a Flink Processing? Waiting for FLIP-23 what are the
"best" practices today?

Thanks again for your help,
--
Christophe

On Mon, Feb 5, 2018 at 6:01 PM, Fabian Hueske  wrote:

> Hi Christophe,
>
> it is true that FlinkML only targets batch workloads. Also, there has not
> been any development since a long time.
>
> In March last year, a discussion was started on the dev mailing list about
> different machine learning features for stream processing [1].
> One result of this discussion was FLIP-23 [2] which will add a library for
> model serving to Flink, i.e., it can load (and update) machine learning
> models and evaluate them on a stream.
> If you dig through the mailing list thread, you'll find a link to a Google
> doc that discusses other possible directions.
>
> Best, Fabian
>
> [1] https://lists.apache.org/thread.html/eeb80481f3723c160bc923d689416a
> 352d6df4aad98fe7424bf33132@%3Cdev.flink.apache.org%3E
> [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 23+-+Model+Serving
>
> 2018-02-05 16:43 GMT+01:00 Christophe Jolif :
>
>> Hi all,
>>
>> Sorry, this is me again with another question.
>>
>> Maybe I did not search deep enough, but it seems the FlinkML API is still
>> pure batch.
>>
>> If I read https://cwiki.apache.org/confluence/display/FLINK/Flink
>> ML%3A+Vision+and+Roadmap it seems there was the intend to "exploit the
>> streaming nature of Flink, and provide functionality designed
>> specifically for data streams" but from my external point of view, I don't
>> see much happening here. Is there work in progress towards that?
>>
>> I would personally see two use-cases around streaming, first one around
>> updating an existing model that was build in batch, second one would be
>> triggering prediction not through a batch job but in a stream job.
>>
>> Are these things that are in the works? or maybe already feasible despite
>> the API looking like purely batch branded?
>>
>>
>>


Re: Kafka and parallelism

2018-02-07 Thread Christophe Jolif
Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern.

I want to then to sink the result of the processing in a set of topics
depending on from where the original message came from (i.e. if this comes
from origin-topic-1 I will serialize the result in destination-topic-1, if
from topic-2 to topic-2 etc...). However the KafkaProducer is working on a
fixed topic. You can provide a partitioning function
(FlinkKafkaPartitioner) but not a "topic" function that would allow to
decide to witch topic sending the message a bit like a BucketingSink would
decide the bucket or ElasticsearchSinkFunction allows you to choose the
index.

Am I missing something? The reason I'm asking is that some of the sink ctor
are talking about "defaultTopicId" and some about "topicId" just like if in
some case there was some ability to override the topic. Is there there a
feature that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Christophe,
>
> You can set the parallelism of the FlinkKafkaConsumer independently of the
> total number of Kafka partitions (across all subscribed streams, including
> newly created streams that match a subscribed pattern).
>
> The consumer deterministically assigns each partition to a single consumer
> subtask, in a round-robin fashion.
> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6
> partitions, each consumer subtask will be assigned 3 partitions.
>
> As for topic pattern subscription, FlinkKafkaConsumers starting from
> version 1.4.0 support this feature. You can take a look at [1] on how to do
> that.
>
> Hope this helps!
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#kafka-consumers-
> topic-and-partition-discovery
>
> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjo...@gmail.com)
> wrote:
>
> Hi,
>
> If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink
> job parallelism to the number of partions or will it adjust automatically
> accordingly? In other word if I don't call setParallelism will get 1 or the
> number of partitions?
>
> The reason I'm asking is that I'm listening to a topic pattern not a
> single topic and the number of actual topic (and so partitions) behind the
> pattern can change so it is not possible to know ahead ot time how many
> partitions I will get.
>
> Thanks!
> --
> Christophe
>
>


-- 
Christophe


Re: Kafka and parallelism

2018-02-07 Thread Christophe Jolif
Ok thanks! I should have seen this. Sorry.

--
Christophe

On Wed, Feb 7, 2018 at 10:27 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Christophe,
>
> Yes, you can achieve writing to different topics per-message using the
> `KeyedSerializationSchema` provided to the Kafka producer.
> The schema interface has a `getTargetTopic` method which allows you to
> override the default target topic for a given record.
> I agree that the method is somewhat odd to be part of the serialization
> schema, so I have also been thinking about moving that elsewhere (maybe as
> part of the partitioner).
>
> If you want to route a record to some topic depending on which topic it
> came from on the consumer side, you’ll have to wrap the source topic
> information within the records so that it is available to the producer.
> You can access that in the `KeyedDeserializationSchema#deserialize`
> method, which exposes information about which topic and partition each
> record came from.
>
> Cheers,
> Gordon
>
> On 7 February 2018 at 9:40:50 AM, Christophe Jolif (cjo...@gmail.com)
> wrote:
>
> Hi Gordon, or anyone else reading this,
>
> Still on this idea that I consume a Kafka topic pattern.
>
> I want to then to sink the result of the processing in a set of topics
> depending on from where the original message came from (i.e. if this comes
> from origin-topic-1 I will serialize the result in destination-topic-1, if
> from topic-2 to topic-2 etc...). However the KafkaProducer is working on a
> fixed topic. You can provide a partitioning function
> (FlinkKafkaPartitioner) but not a "topic" function that would allow to
> decide to witch topic sending the message a bit like a BucketingSink would
> decide the bucket or ElasticsearchSinkFunction allows you to choose the
> index.
>
> Am I missing something? The reason I'm asking is that some of the sink
> ctor are talking about "defaultTopicId" and some about "topicId" just like
> if in some case there was some ability to override the topic. Is there
> there a feature that allows me to do that?
>
> If not do you think this would be a worthwhile addition?
>
> Thanks again,
> --
> Christophe
>
> On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Christophe,
>>
>> You can set the parallelism of the FlinkKafkaConsumer independently of
>> the total number of Kafka partitions (across all subscribed streams,
>> including newly created streams that match a subscribed pattern).
>>
>> The consumer deterministically assigns each partition to a single
>> consumer subtask, in a round-robin fashion.
>> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6
>> partitions, each consumer subtask will be assigned 3 partitions.
>>
>> As for topic pattern subscription, FlinkKafkaConsumers starting from
>> version 1.4.0 support this feature. You can take a look at [1] on how to do
>> that.
>>
>> Hope this helps!
>>
>> Cheers,
>> Gordon
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-
>> partition-discovery
>>
>> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjo...@gmail.com)
>> wrote:
>>
>> Hi,
>>
>> If I'm sourcing from a KafkaConsumer do I have to explicitly set the
>> Flink job parallelism to the number of partions or will it adjust
>> automatically accordingly? In other word if I don't call setParallelism
>> will get 1 or the number of partitions?
>>
>> The reason I'm asking is that I'm listening to a topic pattern not a
>> single topic and the number of actual topic (and so partitions) behind the
>> pattern can change so it is not possible to know ahead ot time how many
>> partitions I will get.
>>
>> Thanks!
>> --
>> Christophe
>>
>>
>
>
> --
> Christophe
>
>


-- 
Christophe


Re: Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-20 Thread Christophe Jolif
Hmm, I did not realize that.

I was planning when upgrading a job (consuming from Kafka) to cancel it
with a savepoint and then start it back from the savedpoint. But this
savedpoint thing was giving me the apparently false feeling I would not
lose anything? My understanding was that maybe I would process some events
twice in this case but certainly not miss events entirely.

Did I misunderstand this thread?

If not this sounds like pretty annoying? Do people have some sort of
workaround for that?

Thanks,
--
Christophe



On Mon, Feb 19, 2018 at 5:50 PM, Till Rohrmann  wrote:

> Hi Bart,
>
> you're right that Flink currently does not support a graceful stop
> mechanism for the Kafka source. The community has already a good idea how
> to solve it in the general case and will hopefully soon add it to Flink.
>
> Concerning the StoppableFunction: This interface was introduced quite some
> time ago and currently only works for some batch sources. In order to make
> it work with streaming, we need to add some more functionality to the
> engine in order to properly stop and take a savepoint.
>
> Cheers,
> Till
>
> On Mon, Feb 19, 2018 at 3:36 PM, Bart Kastermans 
> wrote:
>
>> In https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> ops/cli.html it is shown that
>> for gracefully stopping a job you need to implement the StoppableFunction
>> interface.  This
>> appears not (yet) implemented for Kafka consumers.  Am I missing
>> something, or is there a
>> different way to gracefully stop a job using a kafka source so we can
>> restart it later without
>> losing any (in flight) events?
>>
>> - bart
>>
>
>


Re: Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-21 Thread Christophe Jolif
Ok. Thanks a lot of the clarification. That was my initial understanding
but then was confused by the "losing in-flight events" wording.

On Wed, Feb 21, 2018 at 10:26 AM, Till Rohrmann 
wrote:

> Hi Christophe,
>
> yes I think you misunderstood the thread. Cancel with savepoint will never
> cause any data loss. The only problem which might arise if you have an
> operator which writes data to an external system immediately, then you
> might see some data in the external system which originates from after the
> savepoint. By implementing the interaction with the external system, for
> example only flush on notify checkpoint complete, you can solve this
> problem. The bottom line is that if you don't do it like this, then you
> might see some duplicate data. The Kafka exactly once sink, for example, is
> implemented such that it takes care of this problem and gives you exactly
> once guarantees.
>
> Cheers,
> Till
>
> On Tue, Feb 20, 2018 at 11:51 PM, Christophe Jolif 
> wrote:
>
>> Hmm, I did not realize that.
>>
>> I was planning when upgrading a job (consuming from Kafka) to cancel it
>> with a savepoint and then start it back from the savedpoint. But this
>> savedpoint thing was giving me the apparently false feeling I would not
>> lose anything? My understanding was that maybe I would process some events
>> twice in this case but certainly not miss events entirely.
>>
>> Did I misunderstand this thread?
>>
>> If not this sounds like pretty annoying? Do people have some sort of
>> workaround for that?
>>
>> Thanks,
>> --
>> Christophe
>>
>>
>>
>> On Mon, Feb 19, 2018 at 5:50 PM, Till Rohrmann 
>> wrote:
>>
>>> Hi Bart,
>>>
>>> you're right that Flink currently does not support a graceful stop
>>> mechanism for the Kafka source. The community has already a good idea how
>>> to solve it in the general case and will hopefully soon add it to Flink.
>>>
>>> Concerning the StoppableFunction: This interface was introduced quite
>>> some time ago and currently only works for some batch sources. In order to
>>> make it work with streaming, we need to add some more functionality to the
>>> engine in order to properly stop and take a savepoint.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Feb 19, 2018 at 3:36 PM, Bart Kastermans 
>>> wrote:
>>>
>>>> In https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>> ops/cli.html it is shown that
>>>> for gracefully stopping a job you need to implement the
>>>> StoppableFunction interface.  This
>>>> appears not (yet) implemented for Kafka consumers.  Am I missing
>>>> something, or is there a
>>>> different way to gracefully stop a job using a kafka source so we can
>>>> restart it later without
>>>> losing any (in flight) events?
>>>>
>>>> - bart
>>>>
>>>
>>>
>>
>>
>


-- 
Christophe


"dynamic" bucketing sink

2018-03-23 Thread Christophe Jolif
Hi all,

I'm using the nice topic pattern feature on the KafkaConsumer to read from
multiple topics, automatically discovering new topics added into the system.

At the end of the processing I'm sinking the result into a Hadoop
Filesystem using a BucketingSink.

All works great until I get the requirement to sink into a different Hadoop
Filesystem based on the input topic.

One way to do this would obviously be to get rid of the topic pattern and
start a (similar) job per topic which would each get its own sink to its
own filesystem. And start new jobs when new topics are added. But that's
far from being ideal. This would lead to the usual issues with Flink and a
dynamic number of jobs (requiring new task slots...) also obviously it
would require some external machinery to know new topics have been added
and create new jobs etc...

What would be the recommended way to have a "dynamic" BucketingSink that
can not only write to several basePath (not too hard I guess) but also
dynamically add new base path when new topics are coming into the system.

Thanks,
-- 
Christophe


Re: "dynamic" bucketing sink

2018-03-26 Thread Christophe Jolif
Thanks Timo & Ashish for your input.

I will definitely have a look at Kite SDK (was not aware of it). Otherwise
I'll try to prototype something and share it with the community through a
JIRA issue.

--
Christophe

On Mon, Mar 26, 2018 at 1:34 PM, ashish pok  wrote:

> Hi Christophe,
>
> Have you looked at Kite SDK? We do something like this but using Gobblin
> and Kite SDK, which is a parallel pipeline to Flink. It feels like if you
> partition by something logical like topic name, you should be able to sink
> using Kite SDK. Kite allows you good ways to handle further partitoning
> like using timestamp and also schema evolution if you are using AVRO.
>
> -- Ashish
>
> On Mon, Mar 26, 2018 at 4:57 AM, Timo Walther
>  wrote:
> Hi Christophe,
>
> I think this will require more effort. As far as I know there is no such
> "dynamic" feature. Have you looked in to the bucketing sink code? Maybe you
> can adapt it to your needs?
>
> Otherwise it might also make sense to open an issue for it to discuss a
> design for it. Maybe other contributors are interested in this feature as
> well.
>
> Regards,
> Timo
>
> Am 23.03.18 um 18:20 schrieb Christophe Jolif:
>
> Hi all,
>
> I'm using the nice topic pattern feature on the KafkaConsumer to read from
> multiple topics, automatically discovering new topics added into the system.
>
> At the end of the processing I'm sinking the result into a Hadoop
> Filesystem using a BucketingSink.
>
> All works great until I get the requirement to sink into a different
> Hadoop Filesystem based on the input topic.
>
> One way to do this would obviously be to get rid of the topic pattern and
> start a (similar) job per topic which would each get its own sink to its
> own filesystem. And start new jobs when new topics are added. But that's
> far from being ideal. This would lead to the usual issues with Flink and a
> dynamic number of jobs (requiring new task slots...) also obviously it
> would require some external machinery to know new topics have been added
> and create new jobs etc...
>
> What would be the recommended way to have a "dynamic" BucketingSink that
> can not only write to several basePath (not too hard I guess) but also
> dynamically add new base path when new topics are coming into the system.
>
> Thanks,
> --
> Christophe
>
>
>


Re: Secure TLS/SSL ElasticSearch connector for current and future connector

2018-03-26 Thread Christophe Jolif
Hi Fritz,

I think the High Level Rest Client implementation in this PR:
https://github.com/apache/flink/pull/5374 should work. If you don't get the
certificate properly available in your Java certs, you might want to
redefine the createClient method to do something along those lines to get
the context aware of it:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html

We might want to amend the code to make that even easier (and also manage
basic auth:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
)

That said I'm unsure how the community wants to pursue on this next
generation ES connector, because despite the obvious need from the number
of requests, it does not seem there is a lot of traction to get something
actually merged.

My current thinking would be to build a brand new Java REST High Level
client-only based client, possibly breaking some compatibility with old
APIs (that said my PR above it trying to keep the compatibility to the
price of a few cast). This would leave undone the 5.2-6.0 support. And
start back working on 6.1+. But at least there would be something "correct"
for the future.

--
Christophe

On Mon, Mar 26, 2018 at 11:38 PM, Fritz Budiyanto 
wrote:

> Hi All,
>
> Anyone know if Flink has TLS/SSL support for the current ES connector ?
> If yes, any sample configuration/code ?
> If not, would TLS/SSL be support in the upcoming ES connector using Java
> High Level client ?
>
> Thanks,
> Fritz


Re: SSL config on Kubernetes - Dynamic IP

2018-03-27 Thread Christophe Jolif
I suspect this relates to: https://issues.apache.org/jira/browse/FLINK-5030

For which there was a PR at some point but nothing has been done so far. It
seems the current code explicitly uses the IP vs Hostname for Netty SSL
configuration.

Without that I'm really wondering how people are reasonably using SSL on a
Kubernetes Flink-based cluster as every time a pod is (re-started) it can
theoretically take a different IP? Or do I miss something?

--
Christophe

On Tue, Mar 27, 2018 at 3:24 PM, Edward Alexander Rojas Clavijo <
edward.roja...@gmail.com> wrote:

> Hi all,
>
> Currently I have a Flink 1.4 cluster running on kubernetes and with SSL
> configuration based on https://ci.apache.org/proje
> cts/flink/flink-docs-master/ops/security-ssl.html.
>
> However, as the IP of the nodes are dynamic (from the nature of
> kubernetes), we are using only the DNS which we can control using
> kubernetes services. So we add to the Subject Alternative Name(SAN) the
> flink-jobmanager DNS and also the DNS for the task managers
> *.flink-taskmanager-svc (each task manager has a DNS in the form
> flink-taskmanager-0.flink-taskmanager-svc).
>
> Additionally we set the jobmanager.rpc.address property on all the nodes
> and each task manager sets the taskmanager.host property, all matching the
> ones on the certificate.
>
> This is working well when using Job with Parallelism set to 1. The SSL
> validations are good and the Jobmanager can communicate with Task manager
> and vice versa.
>
> But when we set the parallelism to more than 1 we have exceptions on the
> SSL validation like this:
>
> Caused by: java.security.cert.CertificateException: No subject
> alternative names matching IP address 172.30.247.163 found
> at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168)
> at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
> tManagerImpl.java:455)
> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
> tManagerImpl.java:436)
> at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509Trust
> ManagerImpl.java:252)
> at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X50
> 9TrustManagerImpl.java:136)
> at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHa
> ndshaker.java:1601)
> ... 21 more
>
>
> From the logs I see the Jobmanager is correctly registering the
> taskmanagers:
>
> org.apache.flink.runtime.instance.InstanceManager   - Registered
> TaskManager at flink-taskmanager-1 (akka.ssl.tcp://flink@taiga-fl
> ink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local:6122/user/taskmanager)
> as 1a3f59693cec8b3929ed8898edcc2700. Current number of registered hosts
> is 3. Current number of alive task slots is 6.
>
> And also each taskmanager is correctly registered to use the hostname for
> communication:
>
> org.apache.flink.runtime.taskmanager.TaskManager   - TaskManager will use
> hostname/address 'flink-taskmanager-1.flink-tas
> kmanager-svc.default.svc.cluster.local' (172.30.247.163) for
> communication.
> ...
> akka.remote.Remoting   - Remoting started; listening on addresses
> :[akka.ssl.tcp://flink@flink-taskmanager-1.flink-taskmanager
> -svc.default.svc.cluster.local:6122]
> ...
> org.apache.flink.runtime.io.network.netty.NettyConfig   - NettyConfig
> [server address: flink-taskmanager-1.flink-task
> manager-svc.default.svc.cluster.local/172.30.247.163, server port: 6121,
> ssl enabled: true, memory segment size (bytes): 32768, transport type: NIO,
> number of server threads: 2 (manual), number of client threads: 2 (manual),
> server connect backlog: 0 (use Netty's default), client connect timeout
> (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
> ...
> org.apache.flink.runtime.taskmanager.TaskManager   - TaskManager data
> connection information: bf4a9b50e57c99c17049adb66d65f685 @
> flink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local
> (dataPort=6121)
>
>
>
> But even with that, it seems like the taskmanagers are using the IP
> communicate between them and the SSL validation fails.
>
> Do you know if it's possible to make the taskmanagers to use the hostname
> to communicate instead of the IP ?
> or
> Do you have any advice to get the SSL configuration to work on this
> environment ?
>
> Thanks in advance.
>
> Regards,
> Edward
>



-- 
Christophe


Re: SSL config on Kubernetes - Dynamic IP

2018-04-09 Thread Christophe Jolif
By the way Fabian, any chance this issue is looked into / the PR considered
for 1.5?

--
Christophe

On Wed, Apr 4, 2018 at 2:41 PM, Fabian Hueske  wrote:

> Thank you Edward and Christophe!
>
> 2018-03-29 17:55 GMT+02:00 Edward Alexander Rojas Clavijo <
> edward.roja...@gmail.com>:
>
>> Hi all,
>>
>> I did some tests based on the PR Christophe mentioned above and by making
>> a change on the NettyClient to use CanonicalHostName instead of
>> HostNameAddress to identify the server, the SSL validation works!!
>>
>> I created a PR with this change: https://github.com/apa
>> che/flink/pull/5789
>>
>> Regards,
>> Edward
>>
>> 2018-03-28 17:22 GMT+02:00 Edward Alexander Rojas Clavijo <
>> edward.roja...@gmail.com>:
>>
>>> Hi Till,
>>>
>>> I just created the JIRA ticket: https://issues.apache.org/jira
>>> /browse/FLINK-9103
>>>
>>> I added the JobManager and TaskManager logs, Hope this helps to resolve
>>> the issue.
>>>
>>> Regards,
>>> Edward
>>>
>>> 2018-03-27 17:48 GMT+02:00 Till Rohrmann :
>>>
>>>> Hi Edward,
>>>>
>>>> could you please file a JIRA issue for this problem. It might be as
>>>> simple as that the TaskManager's network stack uses the IP instead of the
>>>> hostname as you suggested. But we have to look into this to be sure. Also
>>>> the logs of the JobManager as well as the TaskManagers could be helpful.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Mar 27, 2018 at 5:17 PM, Christophe Jolif 
>>>> wrote:
>>>>
>>>>>
>>>>> I suspect this relates to: https://issues.apache.org/
>>>>> jira/browse/FLINK-5030
>>>>>
>>>>> For which there was a PR at some point but nothing has been done so
>>>>> far. It seems the current code explicitly uses the IP vs Hostname for 
>>>>> Netty
>>>>> SSL configuration.
>>>>>
>>>>> Without that I'm really wondering how people are reasonably using SSL
>>>>> on a Kubernetes Flink-based cluster as every time a pod is (re-started) it
>>>>> can theoretically take a different IP? Or do I miss something?
>>>>>
>>>>> --
>>>>> Christophe
>>>>>
>>>>> On Tue, Mar 27, 2018 at 3:24 PM, Edward Alexander Rojas Clavijo <
>>>>> edward.roja...@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Currently I have a Flink 1.4 cluster running on kubernetes and with
>>>>>> SSL configuration based on https://ci.apache.org/proje
>>>>>> cts/flink/flink-docs-master/ops/security-ssl.html.
>>>>>>
>>>>>> However, as the IP of the nodes are dynamic (from the nature of
>>>>>> kubernetes), we are using only the DNS which we can control using
>>>>>> kubernetes services. So we add to the Subject Alternative Name(SAN) the
>>>>>> flink-jobmanager DNS and also the DNS for the task managers
>>>>>> *.flink-taskmanager-svc (each task manager has a DNS in the form
>>>>>> flink-taskmanager-0.flink-taskmanager-svc).
>>>>>>
>>>>>> Additionally we set the jobmanager.rpc.address property on all the
>>>>>> nodes and each task manager sets the taskmanager.host property, all
>>>>>> matching the ones on the certificate.
>>>>>>
>>>>>> This is working well when using Job with Parallelism set to 1. The
>>>>>> SSL validations are good and the Jobmanager can communicate with Task
>>>>>> manager and vice versa.
>>>>>>
>>>>>> But when we set the parallelism to more than 1 we have exceptions on
>>>>>> the SSL validation like this:
>>>>>>
>>>>>> Caused by: java.security.cert.CertificateException: No subject
>>>>>> alternative names matching IP address 172.30.247.163 found
>>>>>> at sun.security.util.HostnameChecker.matchIP(HostnameChecker.ja
>>>>>> va:168)
>>>>>> at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
>>>>>> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
>>>>>> tManagerImpl.java:455)
>>>>>> at sun.security.ssl.X509TrustManagerImpl.checkIden

keyBy and parallelism

2018-04-11 Thread Christophe Jolif
Hi all,

Imagine I have a default parallelism of 16 and I do something like

stream.keyBy("something").flatMap()

Now let's imagine I have less than 16 keys, maybe 8.

How many parallel executions of the flatMap function will I get? 8 because
I have 8 keys, or 16 because I have default parallelism at 16?

(and I will have follow up questions depending on the answer I suspect ;))

Thanks,
-- 
Christophe


Re: keyBy and parallelism

2018-04-12 Thread Christophe Jolif
Thanks Chesnay (and others).

That's what I was figuring out. Now let's go onto the follow up with my
exact use-case.

I have two streams A and B. A basically receives "rules" that the
processing of B should observe to process.

There is a "key" that allows me to know that a rule x coming in A is for
events with the same key coming in B.

I was planning to do (pseudo code):

A.connect(B).keyBy("thekey").flatMap(
   flatMap1()
  -> store in a ValueState the rule
   flatMap2()
  -> use the state to get the rule, transform the element according to
the rule, collect it
)


I think it should work, right, because the ValueState will be "per key" and
contain the rule for this key and so on?

Now, what I really care is not having all the elements of key1 in the same
parallelism, I just want to make sure key1 and key2 are isolated so I can
use the key state to store the corresponding rule and key2 rules are not
used for key1 and conversely.

So ideally instead of using 8 parallelisms, in order to use the full power
of my system, even with 8 keys I would like to use 16 parallelisms as I
don't care about all elements of key1 being in the same parallelism. All I
care is that the state contain the rule corresponding to this key.

What would be the recommended approach here?

Thanks again for your help,
--
Christophe


On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler 
wrote:

> You will get 16 parallel executions since you specify a parallellism of
> 16, however 8 of these will not get any data.
>
>
> On 11.04.2018 23:29, Hao Sun wrote:
>
> From what I learnt, you have to control parallelism your self. You can set
> parallelism on operator or set default one through flink-config.yaml.
> I might be wrong.
>
> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif  wrote:
>
>> Hi all,
>>
>> Imagine I have a default parallelism of 16 and I do something like
>>
>> stream.keyBy("something").flatMap()
>>
>> Now let's imagine I have less than 16 keys, maybe 8.
>>
>> How many parallel executions of the flatMap function will I get? 8
>> because I have 8 keys, or 16 because I have default parallelism at 16?
>>
>> (and I will have follow up questions depending on the answer I suspect ;))
>>
>> Thanks,
>> --
>> Christophe
>>
>
>


Re: keyBy and parallelism

2018-04-12 Thread Christophe Jolif
Sihua,

On Thu, Apr 12, 2018 at 10:04 AM, 周思华  wrote:

> Hi Christophe,
> I think what you want to do is "stream join", and I'm a bit confuse that
> if you have know there are only 8 keys   then why would you still like to
> use 16 parallelisms? 8 of them will be idle(a waste of CPU). In the
> KeyedStream, the tuples with the same key will be sent to the same
> parrallelism.
>


First my 8 keys, 16 parallelisms is just an example. Real life it is a bit
more complicated. But basically the idea is that I have a certain number of
task slots, and I want to get them busy so that my processing is as fast as
possible. Even if I have less keys that slots, I wants each slot to take
his share in the work.


>
> And I'm also a bit confuse about the pseudo code, it looks like you regard
> that the tuple with the same key in stream A will always arrive before the
> tuple in stream B? I think that can't be promised... you may need to store
> the tuple in stream B in case that tuple in stream B arrive before A, and
> do the "analysis logic" in both flatMap1() and flatMap2().
>


You are right. I just wanted to focus on my issue which is :

1/ having a co-processing that is considering only stuff of the same key
and that can store in the key-state the "rules" (and as you said I might
have to store other things for ordering reasons)
2/ but being able to parallelism a given key to use as much parallelism as
my cluster allow me to do so.


Regards,
> Sihua Zhou
>
> On 04/12/2018 15:44,Christophe Jolif 
> wrote:
>
> Thanks Chesnay (and others).
>
> That's what I was figuring out. Now let's go onto the follow up with my
> exact use-case.
>
> I have two streams A and B. A basically receives "rules" that the
> processing of B should observe to process.
>
> There is a "key" that allows me to know that a rule x coming in A is for
> events with the same key coming in B.
>
> I was planning to do (pseudo code):
>
> A.connect(B).keyBy("thekey").flatMap(
>flatMap1()
>   -> store in a ValueState the rule
>flatMap2()
>   -> use the state to get the rule, transform the element according to
> the rule, collect it
> )
>
>
> I think it should work, right, because the ValueState will be "per key"
> and contain the rule for this key and so on?
>
> Now, what I really care is not having all the elements of key1 in the same
> parallelism, I just want to make sure key1 and key2 are isolated so I can
> use the key state to store the corresponding rule and key2 rules are not
> used for key1 and conversely.
>
> So ideally instead of using 8 parallelisms, in order to use the full
> power of my system, even with 8 keys I would like to use 16 parallelisms as
> I don't care about all elements of key1 being in the same parallelism. All
> I care is that the state contain the rule corresponding to this key.
>
> What would be the recommended approach here?
>
> Thanks again for your help,
> --
> Christophe
>
>
> On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler 
> wrote:
>
>> You will get 16 parallel executions since you specify a parallellism of
>> 16, however 8 of these will not get any data.
>>
>>
>> On 11.04.2018 23:29, Hao Sun wrote:
>>
>> From what I learnt, you have to control parallelism your self. You can
>> set parallelism on operator or set default one through flink-config.yaml.
>> I might be wrong.
>>
>> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif 
>> wrote:
>>
>>> Hi all,
>>>
>>> Imagine I have a default parallelism of 16 and I do something like
>>>
>>> stream.keyBy("something").flatMap()
>>>
>>> Now let's imagine I have less than 16 keys, maybe 8.
>>>
>>> How many parallel executions of the flatMap function will I get? 8
>>> because I have 8 keys, or 16 because I have default parallelism at 16?
>>>
>>> (and I will have follow up questions depending on the answer I suspect
>>> ;))
>>>
>>> Thanks,
>>> --
>>> Christophe
>>>
>>
>>


Re: FlinkML

2018-04-18 Thread Christophe Jolif
Szymon,

The short answer is no. See:
http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3ccaadrtt39ciiec1uzwthzgnbkjxs-_h5yfzowhzph_zbidux...@mail.gmail.com%3E


On Mon, Apr 16, 2018 at 11:25 PM, Szymon Szczypiński 
wrote:

> Hi,
>
> i wonder if there are possibility to build FlinkML streaming job not a
> batch job. Examples on https://ci.apache.org/projects
> /flink/flink-docs-release-1.4/dev/libs/ml/ are only batch examples.
>
> Is there any possibility?
>
>
> Best regards.
>
>


-- 
Christophe