Flink Kafka consumer with low latency requirement

2019-06-20 Thread wang xuchen
Dear Flink experts,

I am experimenting Flink for a use case where there is a tight latency
requirements.

A stackoverflow article suggests that I can use setParallism(n) to process
a Kafka partition in a multi-threaded way. My understanding is there is
still one kafka consumer per partition, but by using setParallelism, I can
spin up multiple worker threads to process the messages read from the
consumer.

And according to Fabian`s comments in this link:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-Flink-Kafka-connector-has-max-pending-offsets-concept-td28119.html
Flink is able to manage the offset correctly (commit in the right order).

Here is my questions, let`s say there is a Kafka topic with only one
partition, and I setup a consumer with setParallism(2). Hypothetically,
worker threads call out to a REST service which may get slow or stuck
periodically. If I want to make sure that the consumer overall is making
progress even in face of a 'slow woker'. In other words, I`d like to have
multiple pending but uncommitted offsets by the fast worker even when the
other worker is stuck. Is there such a knob  to tune in Flink?

>From my own experiment, I use Kafka consume group tool to to monitor the
offset lag,  soon as one worker thread is stuck, the other cannot make any
progress either. I really want the fast worker still progress to certain
extend. For this use case, exactly once processing is not required.

Thanks for helping.
Ben


[External] Using scala parallel collection with ForkJoinPool

2019-06-20 Thread Vishal Sharma
Hi,

We are doing a computationally expensive window aggregation in our flink
job. Expectedly, the aggregation takes a lot of time. We are experimenting
with scala parallel collections in order to speed up our computation. Are
there any guidelines regarding this ?

As per my understanding, ForkJoinPool by default uses 1 thread per core,
can it cause flink task slots to starve for resources ?
Also, are there any potential concurrency issues with this approach ?

Thanks,
Vishal Sharma

-- 
*_Grab is hiring. Learn more at _**https://grab.careers 
*


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to the processing of your 
personal data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ 


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email Please notify Grab Group immediately if you have received 
this by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.



Re: dynamic metric

2019-06-20 Thread David Morin
Thanks Till.
Ok, I've got it.
So, to prevent from register the metric twice I have to keep an index
(Hasmap for example) to check if the metric already exists ?

Le ven. 21 juin 2019 à 01:27, Till Rohrmann  a écrit :

> Hi David,
>
> I think it is not strictly required that you register the metric in the
> open method. It is just convenient because otherwise you have to make sure
> that you register the metric only once (e.g. when doing it in the map
> function).
>
> What you need in order to register a metric is the runtime context which
> you get if you implement a RichFunction:
>
> getRuntimeContext().getMetricGroup().gauge("MyGauge", new Gauge()
> {
> @Override
> public Integer getValue() {
>   return valueToExpose;
> }
>   });
>
> Cheers,
> Till
>
> On Fri, Jun 21, 2019 at 12:36 AM David Morin 
> wrote:
>
>> Hi,
>>
>> I want to create one metric related to the number of errors but in fact I
>> would like to add some contextual labels ?
>> What is the best way to do that ? gauge ?
>> How to create this kind of metric dynamically during the run of the task
>> (because open method is not possible because too early) ?
>> Thanks in advance
>>
>> David
>>
>


Re: dynamic metric

2019-06-20 Thread Till Rohrmann
Hi David,

I think it is not strictly required that you register the metric in the
open method. It is just convenient because otherwise you have to make sure
that you register the metric only once (e.g. when doing it in the map
function).

What you need in order to register a metric is the runtime context which
you get if you implement a RichFunction:

getRuntimeContext().getMetricGroup().gauge("MyGauge", new Gauge() {
@Override
public Integer getValue() {
  return valueToExpose;
}
  });

Cheers,
Till

On Fri, Jun 21, 2019 at 12:36 AM David Morin 
wrote:

> Hi,
>
> I want to create one metric related to the number of errors but in fact I
> would like to add some contextual labels ?
> What is the best way to do that ? gauge ?
> How to create this kind of metric dynamically during the run of the task
> (because open method is not possible because too early) ?
> Thanks in advance
>
> David
>


dynamic metric

2019-06-20 Thread David Morin
Hi,

I want to create one metric related to the number of errors but in fact I
would like to add some contextual labels ?
What is the best way to do that ? gauge ?
How to create this kind of metric dynamically during the run of the task
(because open method is not possible because too early) ?
Thanks in advance

David


Re: Unable to set S3 like object storage for state backend.

2019-06-20 Thread Ken Krugler
Hi Vishwas,

It might be that you’ve got a legacy bucket name (“aip_featuretoolkit”), as AWS 
no longer allows bucket names to contain an underscore 
.

I’m guessing that the Hadoop S3 code is trying to treat your path as a valid 
URI, but the bucket name doesn’t conform, and thus you get the "null uri host” 
issue.

Could you try with a compliant bucket name?

— Ken

> On Jun 20, 2019, at 2:46 PM, Vishwas Siravara  wrote:
> 
> Hi,
> I am using flink version 1.7.2 , I am trying to use S3 like object
> storage EMC ECS(
> https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) .
> 
> I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for
> s3 filesystem and I have placed it under the lib folder and is
> available to flink in its class path.
> 
> My flink-conf.yaml looks like this :
> 
> s3.endpoint: SU73ECSG1P1d.***.COM:9021
> s3.access-key: vdna_np_user
> security.ssl.rest.enabled: false
> web.timeout: 1
> s3.secret-key: J***
> 
> And my code for statebackend is like this :
> 
> env.setStateBackend(new 
> FsStateBackend("s3://aip_featuretoolkit/checkpoints/"))
> 
> I have a bucket called aip_featuretoolkit in my s3 instance. I can
> connect to s3 form s3 command line utilities. However I cannot
> checkpoint with this configuration in flink. I get the following error
> message
> 
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Could not
> retrieve JobResult.
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62)
> at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19)
> at com.visa.flink.cli.Main$.main(Main.scala:22)
> at com.visa.flink.cli.Main.main(Main.scala)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> Failed to submit job.
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set
> up JobManager
> at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> ... 4 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Could not set up JobManager
> at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.la

Unable to set S3 like object storage for state backend.

2019-06-20 Thread Vishwas Siravara
Hi,
I am using flink version 1.7.2 , I am trying to use S3 like object
storage EMC ECS(
https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) .

I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for
s3 filesystem and I have placed it under the lib folder and is
available to flink in its class path.

My flink-conf.yaml looks like this :

s3.endpoint: SU73ECSG1P1d.***.COM:9021
s3.access-key: vdna_np_user
security.ssl.rest.enabled: false
web.timeout: 1
s3.secret-key: J***

And my code for statebackend is like this :

env.setStateBackend(new FsStateBackend("s3://aip_featuretoolkit/checkpoints/"))

I have a bucket called aip_featuretoolkit in my s3 instance. I can
connect to s3 form s3 command line utilities. However I cannot
checkpoint with this configuration in flink. I get the following error
message

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Could not
retrieve JobResult.
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62)
at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19)
at com.visa.flink.cli.Main$.main(Main.scala:22)
at com.visa.flink.cli.Main.main(Main.scala)
Caused by: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit job.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set
up JobManager
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException:
Could not set up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID
counter: null uri host. This can be caused by unencoded / in the
password string
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionG

Re: Updated Flink Documentation and tutorials

2019-06-20 Thread Pankaj Chand
Thank you!

On Thu, Jun 20, 2019, 5:49 AM Chesnay Schepler  wrote:

> There is no version of the documentation that is more up-to-date. The
> documentation was simply not updated yet for the new architecture.
>
> On 20/06/2019 11:45, Pankaj Chand wrote:
>
> Based on the below conversation (reverse chronological order) regarding my
> previous question on the role of Job Manager in Flink:
>
>
> Hi Biao,
>
> Thank you for your reply!
>
> Please let me know the url of the updated Flink documentation.
>
> The url of the outdated document is:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html
>
>
> Another page which (tacitly) supports the outdated concept is:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
>
>
> The website that hosts these pages is also the first result that comes up
> when you Google Search for "Flink documentation", and it claims it is a
> stable version. The url is:
> https://ci.apache.org/projects/flink/flink-docs-stable/
>
> Again, please let me know the url of the updated Flink documentation.
>
> Thank you Biao and Eduardo!
>
> Pankaj
> Hide quoted text
>
> On Tue, Jun 18, 2019 at 11:49 PM Biao Liu  wrote:
>
> Hi Pankaj,
>
> That's really a good question. There was a refactor of architecture
> before[1]. So there might be some descriptions used the outdated concept.
>
> Before refactoring, Job Manager is a centralized role. It controls whole
> cluster and all jobs which is described in your interpretation 1.
>
> After refactoring, the old Job Manager is separated into several roles,
> Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is
> responsible for only one job, which is described in your interpretation 2.
>
> So the document you refer to is outdated. Would you mind telling us the
> URL of this document? I think we should update it to avoid misleading more
> people.
>
> 1.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>
> Eduardo Winpenny Tejedor  于2019年6月19日周三
> 上午1:12写道:
>
> Hi Pankaj,
>
> I have no experience with Hadoop but from the book I gathered there's one
> Job Manager per application i.e. per jar (as in the example in the first
> chapter). This is not to say there's one Job Manager per job. Actually I
> don't think the word Job is defined in the book, I've seen Task defined,
> and those do have Task Managers
>
> Hope this is along the right lines
>
> Regards,
> Eduardo
>
> On Tue, 18 Jun 2019, 08:42 Pankaj Chand, 
> wrote:
>
> I am trying to understand the role of Job Manager in Flink, and have come
> across two possibly distinct interpretations.
>
> 1. The online documentation v1.8 signifies that there is at least one Job
> Manager in a cluster, and it is closely tied to the cluster of machines, by
> managing all jobs in that cluster of machines.
>
> This signifies that Flink's Job Manager is much like Hadoop's Application
> Manager.
>
> 2. The book, "Stream Processing with Apache Flink", writes that, "The Job
> Manager is the master process that controls the execution of a single
> application—each application is controlled by a different Job Manager."
>
> This signifies that Flink defaults to one Job Manager per job, and the Job
> Manager is closely tied to that single job, much like Hadoop's Application
> Master for each job.
>
> Please let me know which one is correct.
>
> Pankaj
>
>
> On Thu, Jun 20, 2019, 4:54 AM Chesnay Schepler  wrote:
>
>> What makes you believe that they are out-dated?
>>
>> On 19/06/2019 19:17, Pankaj Chand wrote:
>> > Hello,
>> >
>> > Please let me know how to get the updated documentation and tutorials
>> > of Apache Flink.
>> > The stable v1.8 and v1.9-snapshot release of the documentation seems
>> > to be outdated.
>> >
>> > Thanks!
>> >
>> > Pankaj
>>
>>
>>
>>
>


metrics for checking whether a broker throttles requests based on its quota limits?

2019-06-20 Thread Yu Yang
Hi,

Recently we enabled Kafka quota management for our Kafka clusters. We are
looking for Kafka metrics that can be used for alerting on whether a Kafka
broker throttles requests based on quota.

There are a few throttle related metrics on Kafka. But none of them can
tell accurately whether the broker is throttling the requests.  Could
anyone share insights on this?

kafka.network.produce.throttletimems.requestmetrics.95thPercentile

kafka.network.produce.throttletimems.requestmetrics.Count

Thanks!

Regards,
-Yu


Re: Apache Flink - Question about metric registry and reporter and context information

2019-06-20 Thread Chesnay Schepler
You cannot access context information in the metric reporter itself /in 
a nice way/. You can wait for the first metric to be registered, and 
then extract arbitrary metric variables from the metric group.


On 15/06/2019 19:31, Yun Tang wrote:

Hi

1) Yes, the metrics reporter was instantiated per task manager, you 
could refer to [1] to confirm.


2) You could get your runtime context by calling #getRuntimeContext() 
in RichFunction. And you could get your metric group within 
runtimeContext then. The task manager name could be found by 
#getAllVariables() within MetricGroup.



[1] 
https://github.com/apache/flink/blob/8558548a37437ab4f8049b82eb07d1b3aa6ed1f5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java#L139


Best
Yun Tang


*From:* M Singh 
*Sent:* Saturday, June 15, 2019 2:13
*To:* User
*Subject:* Apache Flink - Question about metric registry and reporter 
and context information

Hi:

I wanted to find if the metric reporter and registry are instantiated 
per task manager (which is a single JVM process) or per slot.  I 
believe it per task manager (JVM process) but just wanted to confirm.


Also, is there a way to access context information (eg: task manager 
name etc) in the metric reporter or registry just like in the rich 
function open method ?


Thanks





Re: [ANNOUNCEMENT] March 2019 Bay Area Apache Flink Meetup

2019-06-20 Thread Robert Metzger
Hey Xuefu,
thanks a lot for organizing the Bay Area Flink Meetup.

For others following this email thread, here is the (deep) link to the
event: https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/262216929/


I see that only 19 people have RSVPed so far. In my experience, it's
possible to attract much larger audiences in the Bay Area.

I noticed a few things with that meetup
a) It is not present on Twitter at all. If you or one other meetup
organizer could tweet about it, I would retweet it from @ApacheFlink (which
has a lot of reach) (ideally we tweet about it after we fix b) )
b) the description page doesn't look very inviting. Do you have talk
abstracts of the speakers? Will there be food or drinks provided?

Best,
Robert


On Mon, Jun 17, 2019 at 7:33 PM Xuefu Zhang  wrote:

> Hi all,
>
> The scheduled meetup is only about a week away. Please note that RSVP at
> meetup.com is required.  In order for us to get the actual headcount to
> prepare for the event, please sign up as soon as possible if you plan to
> join. Thank you very much for your cooperation.
>
> Regards,
> Xuefu
>
> On Thu, Feb 14, 2019 at 4:32 PM Xuefu Zhang  wrote:
>
> > Hi all,
> >
> > I'm very excited to announce that the community is planning the next
> > meetup in Bay Area on March 25, 2019. The event is just announced on
> > Meetup.com [1].
> >
> > To make the event successful, your participation and help will be needed.
> > Currently, we are looking for an organization that can host the event.
> > Please let me know if you have any leads.
> >
> > Secondly, we encourage Flink users and developers to take this as an
> > opportunity to share experience or development. Thus, please let me know
> if
> > you like to give a short talk.
> >
> > I look forward to meeting you all in the Meetup.
> >
> > Regards,
> > Xuefu
> >
> > [1] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/258975465
> >
>


Re: [DISCUSS] Deprecate previous Python APIs

2019-06-20 Thread Chesnay Schepler
I have created a JIRA 
 and PR 
 for removing the Python APIs.


On 11/06/2019 15:30, Stephan Ewen wrote:

Hi all!

I would suggest to deprecating the existing python APIs for DataSet 
and DataStream API with the 1.9 release.


Background is that there is a new Python API under development.
The new Python API is initially against the Table API. Flink 1.9 will 
support Table API programs without UDFs, 1.10 is planned to support 
UDFs. Future versions would support also the DataStream API.


In the long term, Flink should have one Python API for DataStream and 
Table APIs. We should not maintain multiple different implementations 
and confuse users that way.
Given that the existing Python APIs are a bit limited and not under 
active development, I would suggest to deprecate them in favor of the 
new API.


Best,
Stephan





CoFlatMapFunction vs BroadcastProcessFunction

2019-06-20 Thread Andy Hoang
Hi guys,

I read about
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-Rule-Evaluation-in-Flink-td21125.html#a21241
 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/BroadcastStream-vs-Broadcasted-DataStream-td23712.html
 


I tried to use those 2 classes for my problem: One stream as config stream to 
change behavior on another event stream 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Weird-behavior-with-CoFlatMapFunction-td28140.html
 

 (solved, thanks to Fabian)

2 of that implementation basically the same, each of classes we have to 
implement to method:
flatmap1 vs processElement: process the “event” stream 
flatmap2 vs processBroadcastElement: process the “config” stream

While those implementation is quite similar, I’m not sure which one I should 
pick.
My gut make me feel like I haven’t harness all the angles of 
BroadcastProcessFunction yet. I’m curious in which situation we should use this 
classes, because even with the example in Doc page: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/broadcast_state.html#important-considerations
 

 we can still use CoFlatMapFunction to do it.

There come another sample I got from google which is using both: 
https://gist.github.com/syhily/932e0d1e0f12b3e951236d6c36e5a7ed#file-twostreamingjoining-scala-L178
 

 I haven’t got the idea what would it want to do:

rule.broadcast.connect…
and connect again
.connect(broadcastStream)

Maybe this is the missing piece that I haven’t understand about 
BroadcastProcessFunction

I hope you guys can point me some direction on how/when I would choose which 
classes.

Thanks,

Andy



Re: Updated Flink Documentation and tutorials

2019-06-20 Thread Chesnay Schepler
There is no version of the documentation that is more up-to-date. The 
documentation was simply not updated yet for the new architecture.


On 20/06/2019 11:45, Pankaj Chand wrote:
Based on the below conversation (reverse chronological order) 
regarding my previous question on the role of Job Manager in Flink:



Hi Biao,

Thank you for your reply!

Please let me know the url of the updated Flink documentation.

The url of the outdated document is:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html 



Another page which (tacitly) supports the outdated concept is:
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html 



The website that hosts these pages is also the first result that comes 
up when you Google Search for "Flink documentation", and it claims it 
is a stable version. The url is:

https://ci.apache.org/projects/flink/flink-docs-stable/

Again, please let me know the url of the updated Flink documentation.

Thank you Biao and Eduardo!

Pankaj
Hide quoted text

On Tue, Jun 18, 2019 at 11:49 PM Biao Liu > wrote:


Hi Pankaj,

That's really a good question. There was a refactor of
architecture before[1]. So there might be some descriptions used
the outdated concept.

Before refactoring, Job Manager is a centralized role. It controls
whole cluster and all jobs which is described in your
interpretation 1.

After refactoring, the old Job Manager is separated into several
roles, Resource Manager, Dispatcher, new Job Manager, etc. The new
Job Manager is responsible for only one job, which is described in
your interpretation 2.

So the document you refer to is outdated. Would you mind telling
us the URL of this document? I think we should update it to avoid
misleading more people.

1.
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

Eduardo Winpenny Tejedor mailto:eduardo.winpe...@gmail.com>> 于2019年6月19日周三
上午1:12写道:

Hi Pankaj,

I have no experience with Hadoop but from the book I gathered
there's one Job Manager per application i.e. per jar (as in
the example in the first chapter). This is not to say there's
one Job Manager per job. Actually I don't think the word Job
is defined in the book, I've seen Task defined, and those do
have Task Managers

Hope this is along the right lines

Regards,
Eduardo

On Tue, 18 Jun 2019, 08:42 Pankaj Chand,
mailto:pankajchanda...@gmail.com>>
wrote:

I am trying to understand the role of Job Manager in
Flink, and have come across two possibly distinct
interpretations.

1. The online documentation v1.8 signifies that there is
at least one Job Manager in a cluster, and it is closely
tied to the cluster of machines, by managing all jobs in
that cluster of machines.

This signifies that Flink's Job Manager is much like
Hadoop's Application Manager.

2. The book, "Stream Processing with Apache Flink", writes
that, "The Job Manager is the master process that controls
the execution of a single application—each application is
controlled by a different Job Manager."

This signifies that Flink defaults to one Job Manager per
job, and the Job Manager is closely tied to that single
job, much like Hadoop's Application Master for each job.

Please let me know which one is correct.

Pankaj


On Thu, Jun 20, 2019, 4:54 AM Chesnay Schepler > wrote:


What makes you believe that they are out-dated?

On 19/06/2019 19:17, Pankaj Chand wrote:
> Hello,
>
> Please let me know how to get the updated documentation and
tutorials
> of Apache Flink.
> The stable v1.8 and v1.9-snapshot release of the documentation
seems
> to be outdated.
>
> Thanks!
>
> Pankaj







Re: Updated Flink Documentation and tutorials

2019-06-20 Thread Pankaj Chand
Based on the below conversation (reverse chronological order) regarding my
previous question on the role of Job Manager in Flink:


Hi Biao,

Thank you for your reply!

Please let me know the url of the updated Flink documentation.

The url of the outdated document is:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html


Another page which (tacitly) supports the outdated concept is:
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html


The website that hosts these pages is also the first result that comes up
when you Google Search for "Flink documentation", and it claims it is a
stable version. The url is:
https://ci.apache.org/projects/flink/flink-docs-stable/

Again, please let me know the url of the updated Flink documentation.

Thank you Biao and Eduardo!

Pankaj
Hide quoted text

On Tue, Jun 18, 2019 at 11:49 PM Biao Liu  wrote:

Hi Pankaj,

That's really a good question. There was a refactor of architecture
before[1]. So there might be some descriptions used the outdated concept.

Before refactoring, Job Manager is a centralized role. It controls whole
cluster and all jobs which is described in your interpretation 1.

After refactoring, the old Job Manager is separated into several roles,
Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is
responsible for only one job, which is described in your interpretation 2.

So the document you refer to is outdated. Would you mind telling us the URL
of this document? I think we should update it to avoid misleading more
people.

1. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

Eduardo Winpenny Tejedor  于2019年6月19日周三
上午1:12写道:

Hi Pankaj,

I have no experience with Hadoop but from the book I gathered there's one
Job Manager per application i.e. per jar (as in the example in the first
chapter). This is not to say there's one Job Manager per job. Actually I
don't think the word Job is defined in the book, I've seen Task defined,
and those do have Task Managers

Hope this is along the right lines

Regards,
Eduardo

On Tue, 18 Jun 2019, 08:42 Pankaj Chand,  wrote:

I am trying to understand the role of Job Manager in Flink, and have come
across two possibly distinct interpretations.

1. The online documentation v1.8 signifies that there is at least one Job
Manager in a cluster, and it is closely tied to the cluster of machines, by
managing all jobs in that cluster of machines.

This signifies that Flink's Job Manager is much like Hadoop's Application
Manager.

2. The book, "Stream Processing with Apache Flink", writes that, "The Job
Manager is the master process that controls the execution of a single
application—each application is controlled by a different Job Manager."

This signifies that Flink defaults to one Job Manager per job, and the Job
Manager is closely tied to that single job, much like Hadoop's Application
Master for each job.

Please let me know which one is correct.

Pankaj


On Thu, Jun 20, 2019, 4:54 AM Chesnay Schepler  wrote:

> What makes you believe that they are out-dated?
>
> On 19/06/2019 19:17, Pankaj Chand wrote:
> > Hello,
> >
> > Please let me know how to get the updated documentation and tutorials
> > of Apache Flink.
> > The stable v1.8 and v1.9-snapshot release of the documentation seems
> > to be outdated.
> >
> > Thanks!
> >
> > Pankaj
>
>
>
>


Re: [External] checkpoint metadata not in configured directory state.checkpoints.dir

2019-06-20 Thread Vishal Sharma
Hi Congxian,

I am not sure how can I track the checkpoint path. Can you suggestion
regarding this ?

Thanks,
Vishal Sharma

On Thu, Jun 20, 2019 at 11:17 AM Congxian Qiu 
wrote:

> Hi, Vishal
> If you want to restart from the last competed external checkpoint of the
> previous stoped job, you need to track the checkpoint path and restart from
> it.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>
> Best,
> Congxian
>
>
> Vishal Sharma  于2019年6月19日周三 下午11:38写道:
>
>> Hi Chesnay,
>>
>> Can you suggest, How should I go about automating job restart from last
>> completed externalised checkpoint in case of failure ? I am not sure about
>> the path for the latest completed checkpoint.
>>
>> Thanks,
>> Vishal Sharma
>>
>> On Wed, Jun 19, 2019 at 11:11 PM Chesnay Schepler 
>> wrote:
>>
>>> The _metadata is always stored in the same directory as the checkpoint
>>> data.
>>>
>>> As outlined here
>>> 
>>> "state.checkpoints.dir" serves as a cluster-wide configuration that _can_
>>> be overwritten with a job-specific setting when creating the state-backend.
>>>
>>> If you want the state-backend to use the configured directory you must
>>> configure the state-backend in the configuration as well, as outlined
>>> here
>>> 
>>> .
>>>
>>> On 19/06/2019 16:26, Vishal Sharma wrote:
>>>
>>> Hi Folks,
>>>
>>> I am using flink 1.8 with externalised checkpointing enabled and saving
>>> the checkpoints to aws S3.
>>>
>>> My configuration is as follows :
>>>
>>> flink-conf.yaml :
>>> state.checkpoints.dir: s3a://test-bucket/checkpoint-metadata
>>>
>>> In application code :
>>> env.setStateBackend(new
>>> RocksDBStateBackend("s3a://test-bucket/checkpoints", true))
>>>
>>> As per my understanding, the externalized checkpoint’s meta data is
>>> determined from the configuration key "state.checkpoints.dir" and
>>> checkpoint data is stored in state backend path.
>>>
>>> However, In my case, I don't see anything in the metadata directory. The
>>> _metadata file is present inside each of the checkpoint directory (chk-6043
>>> ...).
>>>
>>> Is this the expected behavior ? If yes, what is the use of
>>> "state.checkpoints.dir" configuration ?
>>>
>>> My goal is to establish a process to automatically restart the job from
>>> last completed externalised checkpoint in case of failure. For this
>>> to happen, I need to able to figure out path for the metadata of latest
>>> checkpoint.
>>>
>>> Thanks,
>>> Vishal Sharma
>>>
>>> *Grab is hiring. Learn more at https://grab.careers
>>> *
>>>
>>> By communicating with Grab Inc and/or its subsidiaries, associate
>>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>>> have consented to the processing of your personal data as set out in the
>>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>>
>>> This email contains confidential information and is only for the
>>> intended recipient(s). If you are not the intended recipient(s), please do
>>> not disseminate, distribute or copy this email Please notify Grab Group
>>> immediately if you have received this by mistake and delete this email from
>>> your system. Email transmission cannot be guaranteed to be secure or
>>> error-free as any information therein could be intercepted, corrupted,
>>> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
>>> not accept liability for any errors or omissions in the contents of this
>>> email arises as a result of email transmission. All intellectual property
>>> rights in this email and attachments therein shall remain vested in Grab
>>> Group, unless otherwise provided by law.
>>>
>>>
>>>
>> *Grab is hiring. Learn more at https://grab.careers
>> *
>>
>> By communicating with Grab Inc and/or its subsidiaries, associate
>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>> have consented to the processing of your personal data as set out in the
>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>
>> This email contains confidential information and is only for the intended
>> recipient(s). If you are not the intended recipient(s), please do not
>> disseminate, distribute or copy this email Please notify Grab Group
>> immediately if you have received this by mistake and delete this email from
>> your system. Email transmission cannot be guaranteed to be secure or
>> error-free as any information therein could be intercepted, corrupted,
>> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
>> not accept liability for any errors or omissions in the contents of this
>> email arises as a result of email transmission. All intellectual property

Re: Updated Flink Documentation and tutorials

2019-06-20 Thread Chesnay Schepler

What makes you believe that they are out-dated?

On 19/06/2019 19:17, Pankaj Chand wrote:

Hello,

Please let me know how to get the updated documentation and tutorials 
of Apache Flink.
The stable v1.8 and v1.9-snapshot release of the documentation seems 
to be outdated.


Thanks!

Pankaj