Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-26 Thread Jin Yi
Hi Yang,

regarding your statement below:

Since you are starting JM/TM with K8s deployment, when they failed new
JM/TM will be created. If you do not set the high
availability configuration, your jobs could recover when TM failed.
However, they could not recover when JM failed. With HA
configured, the jobs could always be recovered and you do not need to
re-submit again.

Does it also apply to Flink Job Cluster? When the JM pod restarted by
Kubernetes, the image contains the application jar also, so if the
statement also applies to the Flink Job Cluster mode, can you please
elaborate why?

Thanks a lot!
Eleanore

On Mon, Feb 24, 2020 at 6:36 PM Yang Wang  wrote:

> Hi M Singh,
>
> > Mans - If we use the session based deployment option for K8 - I thought
>> K8 will automatically restarts any failed TM or JM.
>> In the case of failed TM - the job will probably recover, but in the case
>> of failed JM - perhaps we need to resubmit all jobs.
>> Let me know if I have misunderstood anything.
>
>
> Since you are starting JM/TM with K8s deployment, when they failed new
> JM/TM will be created. If you do not set the high
> availability configuration, your jobs could recover when TM failed.
> However, they could not recover when JM failed. With HA
> configured, the jobs could always be recovered and you do not need to
> re-submit again.
>
> > Mans - Is there any safe way of a passing creds ?
>
>
> Yes, you are right, Using configmap to pass the credentials is not safe.
> On K8s, i think you could use secrets instead[1].
>
> > Mans - Does a task manager failure cause the job to fail ?  My
>> understanding is the JM failure are catastrophic while TM failures are
>> recoverable.
>
>
> What i mean is the job failed, and it could be restarted by your
> configured restart strategy[2].
>
> > Mans - So if we are saving checkpoint in S3 then there is no need for
>> disks - should we use emptyDir ?
>
>
> Yes, if you are saving the checkpoint in S3 and also set the
> `high-availability.storageDir` to S3. Then you do not need persistent
> volume. Since
> the local directory is only used for local cache, so you could directly
> use the overlay filesystem or empryDir(better io performance).
>
>
> [1].
> https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
>
> M Singh  于2020年2月25日周二 上午5:53写道:
>
>> Thanks Wang for your detailed answers.
>>
>> From what I understand the native_kubernetes also leans towards creating
>> a session and submitting a job to it.
>>
>> Regarding other recommendations, please my inline comments and advice.
>>
>> On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang <
>> danrtsey...@gmail.com> wrote:
>>
>>
>> Hi Singh,
>>
>> Glad to hear that you are looking to run Flink on the Kubernetes. I am
>> trying to answer your question based on my limited knowledge and
>> others could correct me and add some more supplements.
>>
>> I think the biggest difference between session cluster and per-job cluster
>> on Kubernetesis the isolation. Since for per-job, a dedicated Flink
>> cluster
>> will be started for the only one job and no any other jobs could be
>> submitted.
>> Once the job is finished, then the Flink cluster will be
>> destroyed immediately.
>> The second point is one-step submission. You do not need to start a Flink
>> cluster first and then submit a job to the existing session.
>>
>> > Are there any benefits with regards to
>> 1. Configuring the jobs
>> No matter you are using the per-job cluster or submitting to the existing
>> session cluster, they share the configuration mechanism. You do not have
>> to change any codes and configurations.
>>
>> 2. Scaling the taskmanager
>> Since you are using the Standalone cluster on Kubernetes, it do not
>> provide
>> an active resourcemanager. You need to use external tools to monitor and
>> scale up the taskmanagers. The active integration is still evolving and
>> you
>> could have a taste[1].
>>
>> Mans - If we use the session based deployment option for K8 - I thought
>> K8 will automatically restarts any failed TM or JM.
>> In the case of failed TM - the job will probably recover, but in the case
>> of failed JM - perhaps we need to resubmit all jobs.
>> Let me know if I have misunderstood anything.
>>
>> 3. Restarting jobs
>> For the session cluster, you could directly cancel the job and re-submit.
>> And
>> for per-job cluster, when the job is canceled, you need to start a new
>> per-job
>> cluster from the latest savepoint.
>>
>> 4. Managing the flink jobs
>> The rest api and flink command line could be used to managing the
>> jobs(e.g.
>> flink cancel, etc.). I think there is no difference for session and
>> per-job here.
>>
>> 5. Passing credentials (in case of AWS, etc)
>> I am not sure how do you provide your credentials. If you put them in
>> the
>> config map and then mount into the jobmanager/taskmanager pod, then b

Artificial streaming benchmarks for Flink

2020-02-26 Thread Robert Harrelson
Hi Flink community,

There are several artificial benchmarks available for Storm, such as
https://github.com/intel-hadoop/storm-benchmark

It has streaming stateful WordCount, Rolling Count, Rolling Sort streaming
Grep, streaming SOL, etc.

Please tell me if there are similar artificial benchmarks for Flink. I'm
especially interested in streaming benchmarks for Flink like stateful
WordCount, SOL, Rolling Count, Rolling Sort, streaming Grep, SOL, etc.

Thank you in advance,

Bob


Apache Beam Side input vs Flink Broadcast Stream

2020-02-26 Thread Jin Yi
Hi All,

there is a recent published article in the flink official website for
running beam on top of flink
https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html

In the article:

   - You get additional features like side inputs and cross-language
   pipelines that are not supported natively in Flink but only supported when
   using Beam with Flink

Ultimately, Beam pipeline will be translated into Flink job. So does beam's
side input translates into Flink Broadcast stream?

If I look at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators,
it looks like converting the the side input into broadcast stream, then why
it says Flink does not support it natively?

Thanks a lot!
Eleanore


Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-26 Thread Yang Wang
I think the only limitation is the disk size of your kubelet machine.
Please remember
to set the "sizeLimit" of your empty dir. Otherwise, your pod may be killed
due to
ephemeral storage is full.


Best,
Yang

M Singh  于2020年2月27日周四 上午8:34写道:

> BTW - Is there any limit to the amount of data that can be stored on
> emptyDir in K8 ?
>
> On Wednesday, February 26, 2020, 07:33:54 PM EST, M Singh <
> mans2si...@yahoo.com> wrote:
>
>
> Thanks Yang and Arvid for your advice and pointers.  Mans
>
> On Wednesday, February 26, 2020, 09:52:26 AM EST, Arvid Heise <
> ar...@ververica.com> wrote:
>
>
> Creds on AWS are typically resolved through roles assigned to K8s pods
> (for example with KIAM [1]).
>
> [1] https://github.com/uswitch/kiam
>
> On Tue, Feb 25, 2020 at 3:36 AM Yang Wang  wrote:
>
> Hi M Singh,
>
> > Mans - If we use the session based deployment option for K8 - I thought
> K8 will automatically restarts any failed TM or JM.
> In the case of failed TM - the job will probably recover, but in the case
> of failed JM - perhaps we need to resubmit all jobs.
> Let me know if I have misunderstood anything.
>
>
> Since you are starting JM/TM with K8s deployment, when they failed new
> JM/TM will be created. If you do not set the high
> availability configuration, your jobs could recover when TM failed.
> However, they could not recover when JM failed. With HA
> configured, the jobs could always be recovered and you do not need to
> re-submit again.
>
> > Mans - Is there any safe way of a passing creds ?
>
>
> Yes, you are right, Using configmap to pass the credentials is not safe.
> On K8s, i think you could use secrets instead[1].
>
> > Mans - Does a task manager failure cause the job to fail ?  My
> understanding is the JM failure are catastrophic while TM failures are
> recoverable.
>
>
> What i mean is the job failed, and it could be restarted by your
> configured restart strategy[2].
>
> > Mans - So if we are saving checkpoint in S3 then there is no need for
> disks - should we use emptyDir ?
>
>
> Yes, if you are saving the checkpoint in S3 and also set the
> `high-availability.storageDir` to S3. Then you do not need persistent
> volume. Since
> the local directory is only used for local cache, so you could directly
> use the overlay filesystem or empryDir(better io performance).
>
>
> [1].
> https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
>
> M Singh  于2020年2月25日周二 上午5:53写道:
>
> Thanks Wang for your detailed answers.
>
> From what I understand the native_kubernetes also leans towards creating a
> session and submitting a job to it.
>
> Regarding other recommendations, please my inline comments and advice.
>
> On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang <
> danrtsey...@gmail.com> wrote:
>
>
> Hi Singh,
>
> Glad to hear that you are looking to run Flink on the Kubernetes. I am
> trying to answer your question based on my limited knowledge and
> others could correct me and add some more supplements.
>
> I think the biggest difference between session cluster and per-job cluster
> on Kubernetesis the isolation. Since for per-job, a dedicated Flink cluster
> will be started for the only one job and no any other jobs could be
> submitted.
> Once the job is finished, then the Flink cluster will be
> destroyed immediately.
> The second point is one-step submission. You do not need to start a Flink
> cluster first and then submit a job to the existing session.
>
> > Are there any benefits with regards to
> 1. Configuring the jobs
> No matter you are using the per-job cluster or submitting to the existing
> session cluster, they share the configuration mechanism. You do not have
> to change any codes and configurations.
>
> 2. Scaling the taskmanager
> Since you are using the Standalone cluster on Kubernetes, it do not provide
> an active resourcemanager. You need to use external tools to monitor and
> scale up the taskmanagers. The active integration is still evolving and you
> could have a taste[1].
>
> Mans - If we use the session based deployment option for K8 - I thought K8
> will automatically restarts any failed TM or JM.
> In the case of failed TM - the job will probably recover, but in the case
> of failed JM - perhaps we need to resubmit all jobs.
> Let me know if I have misunderstood anything.
>
> 3. Restarting jobs
> For the session cluster, you could directly cancel the job and re-submit.
> And
> for per-job cluster, when the job is canceled, you need to start a new
> per-job
> cluster from the latest savepoint.
>
> 4. Managing the flink jobs
> The rest api and flink command line could be used to managing the jobs(e.g.
> flink cancel, etc.). I think there is no difference for session and
> per-job here.
>
> 5. Passing credentials (in case of AWS, etc)
> I am not sure how do you provide your credentials. If you put them in the
> config map and then mount in

Re: How JobManager and TaskManager find each other?

2020-02-26 Thread Yang Wang
So do you mean the ip address changes during running or the taskmanager
failed and relaunched with a same hostname, but the ip address is different?

I am not sure the TM registers to JM with hostname or ip address. We could
confirm that from @Zhijiang.

For the NAT, currently Flink could not work well with NAT. Since the
taskmanager
do not provide a bind host/port for others to connect. There's some tickets
to track
this problem[1].


[1]. https://issues.apache.org/jira/browse/FLINK-15911


Best,
Yang

KristoffSC  于2020年2月26日周三 下午11:34写道:

> Thank you very much,
> what about if node Ip changes? Does it also supports dns or "raw" IP
> addresses only.
> I'm thinking about cloud deployments where actual service/process can be
> rescheduled to a different box but there is name resolving mechanism.
>
> Also what if there is NAT between Task Manager and Job Manager.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

2020-02-26 Thread Jingsong Li
Hi Jark,

The matrix I see is SQL cast. If we need bring another conversion matrix
that is different from SQL cast, I don't understand the benefits. It makes
me difficult to understand.
And It seems bad to change the timestamp of different time zones to the
same value silently.

I have seen a lot of timestamp formats,  SQL, ISO, RFC. I can think that a
"timestampFormat" could help them to deal with various formats.
What way do you think can solve all the problems?

Best,
Jingsong Lee

On Wed, Feb 26, 2020 at 10:45 PM Jark Wu  wrote:

> Hi Jingsong,
>
> I don't think it should follow SQL CAST semantics, because it is out of
> SQL, it happens in connectors which converts users'/external's format into
> SQL types.
> I also doubt "timestampFormat" may not work in some cases, because the
> timestamp format maybe various and mixed in a topic.
>
> Best,
> Jark
>
> On Wed, 26 Feb 2020 at 22:20, Jingsong Li  wrote:
>
>> Thanks all for your discussion.
>>
>> Hi Dawid,
>>
>> +1 to apply the logic of parsing a SQL timestamp literal.
>>
>> I don't fully understand the matrix your list. Should this be the
>> semantics of SQL cast?
>> Do you mean this is implicit cast in JSON parser?
>> I doubt that because these implicit casts are not support
>> in LogicalTypeCasts. And it is not so good to understand when it occur
>> silently.
>>
>> How about add "timestampFormat" property to JSON parser? Its default
>> value is SQL timestamp literal format. And user can configure this.
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Feb 26, 2020 at 6:39 PM Jark Wu  wrote:
>>
>>> Hi Dawid,
>>>
>>> I agree with you. If we want to loosen the format constraint, the
>>> important piece is the conversion matrix.
>>>
>>> The conversion matrix you listed makes sense to me. From my
>>> understanding,
>>> there should be 6 combination.
>>> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH
>>> TIMEZONE to make the matrix complete.
>>> When the community reach an agreement on this, we should write it down on
>>> the documentation and follow the matrix in all text-based formats.
>>>
>>> Regarding to the RFC 3339 compatibility mode switch, it also sounds good
>>> to
>>> me.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz 
>>> wrote:
>>>
>>> > Hi all,
>>> >
>>> > @NiYanchun Thank you for reporting this. Yes I think we could improve
>>> the
>>> > behaviour of the JSON format.
>>> >
>>> > @Jark First of all I do agree we could/should improve the
>>> > "user-friendliness" of the JSON format (and unify the behavior across
>>> text
>>> > based formats). I am not sure though if it is as simple as just ignore
>>> the
>>> > time zone here.
>>> >
>>> > My suggestion would be rather to apply the logic of parsing a SQL
>>> > timestamp literal (if the expected type is of
>>> LogicalTypeFamily.TIMESTAMP),
>>> > which would actually also derive the "stored" type of the timestamp
>>> (either
>>> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql
>>> conversion.
>>> > Therefore if the
>>> >
>>> > parsed type |requested type|
>>> behaviour
>>> >
>>> > WITHOUT TIMEZONE| WITH TIMEZONE | store the local
>>> > timezone with the data
>>> >
>>> > WITHOUT TIMEZONE| WITH LOCAL TIMEZONE  | do nothing in the
>>> data,
>>> > interpret the time in local timezone
>>> >
>>> > WITH TIMEZONE  | WITH LOCAL TIMEZONE   | convert the
>>> timestamp
>>> > to local timezone and drop the time zone information
>>> >
>>> > WITH TIMEZONE  | WITHOUT TIMEZONE   | drop the time
>>> zone
>>> > information
>>> >
>>> > It might just boil down to what you said "being more lenient with
>>> regards
>>> > to parsing the time zone". Nevertheless I think this way it is a bit
>>> better
>>> > defined behaviour, especially as it has a defined behaviour when
>>> converting
>>> > between representation with or without time zone.
>>> >
>>> > An implementation note. I think we should aim to base the
>>> implementation
>>> > on the DataTypes already rather than going back to the TypeInformation.
>>> >
>>> > I would still try to leave the RFC 3339 compatibility mode, but maybe
>>> for
>>> > that mode it would make sense to not support any types WITHOUT
>>> TIMEZONE?
>>> > This would be enabled with a switch (disabled by default). As I
>>> understand
>>> > the RFC, making the time zone mandatory is actually a big part of the
>>> > standard as it makes time types unambiguous.
>>> >
>>> > What do you think?
>>> >
>>> > Ps. I cross posted this on the dev ML.
>>> >
>>> > Best,
>>> >
>>> > Dawid
>>> >
>>> >
>>> > On 26/02/2020 03:45, Jark Wu wrote:
>>> >
>>> > Yes, I'm also in favor of loosen the datetime format constraint.
>>> > I guess most of the users don't know there is a JSON standard which
>>> > follows RFC 3339.
>>> >
>>> > Best,
>>> > Jark
>>> >
>>> > On Wed, 26 Feb 2020 at 10:06, NiYanchun  wrote:
>>> >
>>> >> Yes, these Types definition are general. As a u

Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-26 Thread M Singh
 BTW - Is there any limit to the amount of data that can be stored on emptyDir 
in K8 ?  
On Wednesday, February 26, 2020, 07:33:54 PM EST, M Singh 
 wrote:  
 
  Thanks Yang and Arvid for your advice and pointers.  Mans
On Wednesday, February 26, 2020, 09:52:26 AM EST, Arvid Heise 
 wrote:  
 
 Creds on AWS are typically resolved through roles assigned to K8s pods (for 
example with KIAM [1]).
[1] https://github.com/uswitch/kiam
On Tue, Feb 25, 2020 at 3:36 AM Yang Wang  wrote:

Hi M Singh,

> Mans - If we use the session based deployment option for K8 - I thought K8 
>will automatically restarts any failed TM or JM. 
In the case of failed TM - the job will probably recover, but in the case of 
failed JM - perhaps we need to resubmit all jobs.
Let me know if I have misunderstood anything.

Since you are starting JM/TM with K8s deployment, when they failed new JM/TM 
will be created. If you do not set the highavailability configuration, your 
jobs could recover when TM failed. However, they could not recover when JM 
failed. With HA
configured, the jobs could always be recovered and you do not need to re-submit 
again.

> Mans - Is there any safe way of a passing creds ?

Yes, you are right, Using configmap to pass the credentials is not safe. On 
K8s, i think you could use secrets instead[1].

> Mans - Does a task manager failure cause the job to fail ?  My understanding 
> is the JM failure are catastrophic while TM failures are recoverable.

What i mean is the job failed, and it could be restarted by your configured 
restart strategy[2].

> Mans - So if we are saving checkpoint in S3 then there is no need for disks - 
>should we use emptyDir ?
 Yes, if you are saving the checkpoint in S3 and also set the 
`high-availability.storageDir` to S3. Then you do not need persistent volume. 
Sincethe local directory is only used for local cache, so you could directly 
use the overlay filesystem or empryDir(better io performance).

[1]. 
https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/[2].
 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
M Singh  于2020年2月25日周二 上午5:53写道:

 Thanks Wang for your detailed answers.
>From what I understand the native_kubernetes also leans towards creating a 
>session and submitting a job to it.  
Regarding other recommendations, please my inline comments and advice.
On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang 
 wrote:  
 
 Hi Singh,
Glad to hear that you are looking to run Flink on the Kubernetes. I amtrying to 
answer your question based on my limited knowledge andothers could correct me 
and add some more supplements.
I think the biggest difference between session cluster and per-job clusteron 
Kubernetesis the isolation. Since for per-job, a dedicated Flink clusterwill be 
started for the only one job and no any other jobs could be submitted.Once the 
job is finished, then the Flink cluster will be destroyed immediately.The 
second point is one-step submission. You do not need to start a Flinkcluster 
first and then submit a job to the existing session.
> Are there any benefits with regards to1. Configuring the jobsNo matter you 
>are using the per-job cluster or submitting to the existingsession cluster, 
>they share the configuration mechanism. You do not haveto change any codes and 
>configurations.
2. Scaling the taskmanagerSince you are using the Standalone cluster on 
Kubernetes, it do not providean active resourcemanager. You need to use 
external tools to monitor andscale up the taskmanagers. The active integration 
is still evolving and youcould have a taste[1].
Mans - If we use the session based deployment option for K8 - I thought K8 will 
automatically restarts any failed TM or JM. In the case of failed TM - the job 
will probably recover, but in the case of failed JM - perhaps we need to 
resubmit all jobs.Let me know if I have misunderstood anything.
3. Restarting jobsFor the session cluster, you could directly cancel the job 
and re-submit. Andfor per-job cluster, when the job is canceled, you need to 
start a new per-jobcluster from the latest savepoint.
4. Managing the flink jobsThe rest api and flink command line could be used to 
managing the jobs(e.g.flink cancel, etc.). I think there is no difference for 
session and per-job here.
5. Passing credentials (in case of AWS, etc)
I am not sure how do you provide your credentials. If you put them in the 
config map and then mount into the jobmanager/taskmanager pod, then bothsession 
and per-job could support this way.
Mans - Is there any safe way of a passing creds ?
6. Fault tolerence and recovery of jobs from failure
For session cluster, if one taskmanager crashed, then all the jobs which have 
taskson this taskmanager will failed. Both session and per-job could be 
configured with high availability and recoverfrom the latest checkpoint. 
Mans - Does a task manager failure cause the job to fail ?  My understandin

Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-26 Thread M Singh
 Thanks Yang and Arvid for your advice and pointers.  Mans
On Wednesday, February 26, 2020, 09:52:26 AM EST, Arvid Heise 
 wrote:  
 
 Creds on AWS are typically resolved through roles assigned to K8s pods (for 
example with KIAM [1]).
[1] https://github.com/uswitch/kiam
On Tue, Feb 25, 2020 at 3:36 AM Yang Wang  wrote:

Hi M Singh,

> Mans - If we use the session based deployment option for K8 - I thought K8 
>will automatically restarts any failed TM or JM. 
In the case of failed TM - the job will probably recover, but in the case of 
failed JM - perhaps we need to resubmit all jobs.
Let me know if I have misunderstood anything.

Since you are starting JM/TM with K8s deployment, when they failed new JM/TM 
will be created. If you do not set the highavailability configuration, your 
jobs could recover when TM failed. However, they could not recover when JM 
failed. With HA
configured, the jobs could always be recovered and you do not need to re-submit 
again.

> Mans - Is there any safe way of a passing creds ?

Yes, you are right, Using configmap to pass the credentials is not safe. On 
K8s, i think you could use secrets instead[1].

> Mans - Does a task manager failure cause the job to fail ?  My understanding 
> is the JM failure are catastrophic while TM failures are recoverable.

What i mean is the job failed, and it could be restarted by your configured 
restart strategy[2].

> Mans - So if we are saving checkpoint in S3 then there is no need for disks - 
>should we use emptyDir ?
 Yes, if you are saving the checkpoint in S3 and also set the 
`high-availability.storageDir` to S3. Then you do not need persistent volume. 
Sincethe local directory is only used for local cache, so you could directly 
use the overlay filesystem or empryDir(better io performance).

[1]. 
https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/[2].
 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
M Singh  于2020年2月25日周二 上午5:53写道:

 Thanks Wang for your detailed answers.
>From what I understand the native_kubernetes also leans towards creating a 
>session and submitting a job to it.  
Regarding other recommendations, please my inline comments and advice.
On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang 
 wrote:  
 
 Hi Singh,
Glad to hear that you are looking to run Flink on the Kubernetes. I amtrying to 
answer your question based on my limited knowledge andothers could correct me 
and add some more supplements.
I think the biggest difference between session cluster and per-job clusteron 
Kubernetesis the isolation. Since for per-job, a dedicated Flink clusterwill be 
started for the only one job and no any other jobs could be submitted.Once the 
job is finished, then the Flink cluster will be destroyed immediately.The 
second point is one-step submission. You do not need to start a Flinkcluster 
first and then submit a job to the existing session.
> Are there any benefits with regards to1. Configuring the jobsNo matter you 
>are using the per-job cluster or submitting to the existingsession cluster, 
>they share the configuration mechanism. You do not haveto change any codes and 
>configurations.
2. Scaling the taskmanagerSince you are using the Standalone cluster on 
Kubernetes, it do not providean active resourcemanager. You need to use 
external tools to monitor andscale up the taskmanagers. The active integration 
is still evolving and youcould have a taste[1].
Mans - If we use the session based deployment option for K8 - I thought K8 will 
automatically restarts any failed TM or JM. In the case of failed TM - the job 
will probably recover, but in the case of failed JM - perhaps we need to 
resubmit all jobs.Let me know if I have misunderstood anything.
3. Restarting jobsFor the session cluster, you could directly cancel the job 
and re-submit. Andfor per-job cluster, when the job is canceled, you need to 
start a new per-jobcluster from the latest savepoint.
4. Managing the flink jobsThe rest api and flink command line could be used to 
managing the jobs(e.g.flink cancel, etc.). I think there is no difference for 
session and per-job here.
5. Passing credentials (in case of AWS, etc)
I am not sure how do you provide your credentials. If you put them in the 
config map and then mount into the jobmanager/taskmanager pod, then bothsession 
and per-job could support this way.
Mans - Is there any safe way of a passing creds ?
6. Fault tolerence and recovery of jobs from failure
For session cluster, if one taskmanager crashed, then all the jobs which have 
taskson this taskmanager will failed. Both session and per-job could be 
configured with high availability and recoverfrom the latest checkpoint. 
Mans - Does a task manager failure cause the job to fail ?  My understanding is 
the JM failure are catastrophic while TM failures are recoverable.
> Is there any need for specifying volume for the pods?No, you do not need to 
>specify a vol

Re: Batch Flink Job S3 write performance vs Spark

2020-02-26 Thread sri hari kali charan Tummala
sorry for being lazy I would have gone through flink source code.

On Wed, Feb 26, 2020 at 9:35 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Ok, thanks for the clarification.
>
> On Wed, Feb 26, 2020 at 9:22 AM Arvid Heise  wrote:
>
>> Exactly. We use the hadoop-fs as an indirection on top of that, but Spark
>> probably does the same.
>>
>> On Wed, Feb 26, 2020 at 3:52 PM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Thank you  (the two systems running on Java and using the same set of
>>> libraries), so from my understanding, Flink uses AWS SDK behind the scenes
>>> same as spark.
>>>
>>> On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise  wrote:
>>>
 Fair benchmarks are notoriously difficult to setup.

 Usually, it's easy to find a workload where one system shines and as
 its vendor you report that. Then, the competitor benchmarks a different use
 case where his system outperforms ours. In the end, customers are more
 confused than before.

 You should do your own benchmarks for your own workloads. That is the
 only reliable way.

 In the end, both systems use similar setups and improvements in one
 system are often also incorporated into the other system with some delay,
 such that there should be no ground-breaking differences between the two
 systems running on Java and using the same set of libraries.
 Of course, if one system has a very specific optimization for your use
 case, that could be much faster.


 On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala <
 kali.tumm...@gmail.com> wrote:

> Hi All,
>
> have a question did anyone compared the performance of Flink batch job
> writing to s3 vs spark writing to s3?
>
> --
> Thanks & Regards
> Sri Tummala
>
>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala


Flink 1.10 - Hadoop libraries integration with plugins and class loading

2020-02-26 Thread Ricardo Cardante
Hi!

We're working on a project where data is being written to S3 within a Flink 
application.
Running the integration tests locally / IntelliJ (using 
MiniClusterWithClientResource) all the dependencies are correctly resolved and 
the program executes as expected. However, when fat JAR is submitted to a Flink 
setup running on docker, we're getting the following exception:

-
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path
-

Which refers to the usage of that class in a RichSinkFunction while building an 
AvroParquetWriter

-
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetFileWriter
// ...
Try {
val writer = AvroParquetWriter
.builder[GenericRecord](new Path(finalFilePath))
.withSchema(new Schema.Parser().parse(schema))
.withDataModel(GenericData.get)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()

elements.foreach(element => writer.write(element))
writer.close()
}
// ...
-

Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being 
dynamically loaded upon task/job manager(s) startup (also, we are keeping 
Flink's default inverted class loading strategy), shouldn't Hadoop dependencies 
be loaded by parent-first? (based on classloader.parent-first-patterns.default)

We also tried to put "flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" on Flink's 
/lib folder, but when doing that we got this error instead:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3a.S3AFileSystem not found

The only way we are being able to make the application work as expected is to 
include the dependency "hadoop-aws" with compile scope, but we get a fat JAR 
and transitive dependencies on Hadoop libraries that we would like to avoid.

What would be the most appropriate way to take advantage of cluster's 
"flink-s3-fs-hadoop" and avoid to deliver any Hadoop related library on our 
application JAR?

The dependencies we're using in the build.sbt file:
-
lazy val dependencies =
new {
val flinkVersion = "1.10.0"
val parquetAvroVersion = "1.10.1"
val hadoopVersion = "3.2.1"
val circeVersion = "0.12.3"
val rogachVersion = "3.3.1"
val loggingVersion = "3.7.2"
val scalatestVersion = "3.0.5"
val mockitoVersion = "1.10.0"
val kafkaVersion = "2.2.0"
val scalajVersion = "2.4.2"
val snakeYamlVersion = "1.25"
val slf4jVersion = "1.7.30"
val beanUtilsVersion = "1.9.4"
val shadedHadoopVersion = "2.8.3-10.0"

// Core libraries provided at runtime
val flink = "org.apache.flink" %% "flink-scala" % flinkVersion % "provided"
val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % 
flinkVersion % "provided"
val flinks3Hadoop = "org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion % 
"provided"

// Application specific dependencies.
val flinkConnectorKafka = "org.apache.flink" %% "flink-connector-kafka" % 
flinkVersion
val flinkStateBackendRocksDb = "org.apache.flink" %% 
"flink-statebackend-rocksdb" % flinkVersion
val flinkParquet = "org.apache.flink" %% "flink-parquet" % flinkVersion
val flinkDropwizard = "org.apache.flink" % "flink-metrics-dropwizard" % 
flinkVersion
val parquetAvro = "org.apache.parquet" % "parquet-avro" % parquetAvroVersion
val circeCore = "io.circe" %% "circe-core" % circeVersion
val circeParser = "io.circe" %% "circe-parser" % circeVersion
val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
val scallop = "org.rogach" %% "scallop" % rogachVersion
val logging = "com.typesafe.scala-logging" %% "scala-logging" % loggingVersion
val snakeYaml = "org.yaml" % "snakeyaml" % snakeYamlVersion
val slf4j = "org.slf4j" % "slf4j-log4j12" % slf4jVersion
val beanUtils = "commons-beanutils" % "commons-beanutils" % beanUtilsVersion

// Test libraries
val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion % "test"
val mockito = "org.mockito" %% "mockito-scala" % mockitoVersion % "test"
val flinkTestUtils = "org.apache.flink" %% "flink-test-utils" % flinkVersion % 
"test"
val kafkaStreams = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test"
val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test"
val kafka = "org.apache.kafka" %% "kafka" % kafkaVersion % "test"
val hadoopClient = "org.apache.hadoop" % "hadoop-client" % hadoopVersion % 
"test"

// Test classifiers only
val flinkRuntimeTest = "org.apache.flink" %% "flink-runtime" % flinkVersion % 
"test" classifier "tests"
val kafkaTest = "org.apache.kafka" %% "kafka" % kafkaVersion % "test" 
classifier "test"
val kafkaStreamsTest = "org.apache.kafka" % "kafka-streams" % kafkaVersion % 
"test" classifier "test"
val kafkaClientsTest = "org.apache.kafka" % "kafka-clients" % kafkaVersion % 
"test" classifier "test"
}
-



This is the Dockerfile:
-
FROM flink:1.10.0

Re: Getting javax.management.InstanceAlreadyExistsException when upgraded to 1.10

2020-02-26 Thread John Smith
Just curious is this the reason why also some jobs in the UI show their
metrics and others do not?

Looking at 2 jobs, one displays how may bytes in and out it has received.
While another one show all zeros. But I know it's working though.

On Wed, 26 Feb 2020 at 11:19, John Smith  wrote:

> This is what I got from the logs.
>
> 2020-02-25 00:13:38,124 WARN  org.apache.kafka.common.utils.AppInfoParser
>   - Error registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=app-info,id=consumer-1
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
> at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
> On Tue, 25 Feb 2020 at 15:50, John Smith  wrote:
>
>> Ok as soon as I can tomorrow.
>>
>> Thanks
>>
>> On Tue, 25 Feb 2020 at 11:51, Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi John,
>>>
>>> Seems like this is another instance of
>>> https://issues.apache.org/jira/browse/FLINK-8093
>>> Could you please provide the full stacktrace?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Feb 24, 2020 at 10:48 PM John Smith 
>>> wrote:
>>>
 Hi. Just upgraded to 1.10.0 And getting the bellow error when I deploy
 my tasks.

 The first 1 seems to deploy ok, but subsequent ones seem to this throw
 this error. But The seem to work still.

 javax.management.InstanceAlreadyExistsException:
 kafka.consumer:type=app-info,id=consumer-2
 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
 at
 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
 at
 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
 at
 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
 at
 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
 at
 com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
 at
 org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
 at
 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
 at
 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
 at
 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
 at
 org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:477)
 at
 org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:167)

>>>


Re: Getting javax.management.InstanceAlreadyExistsException when upgraded to 1.10

2020-02-26 Thread John Smith
This is what I got from the logs.

2020-02-25 00:13:38,124 WARN  org.apache.kafka.common.utils.AppInfoParser
- Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=consumer-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)




On Tue, 25 Feb 2020 at 15:50, John Smith  wrote:

> Ok as soon as I can tomorrow.
>
> Thanks
>
> On Tue, 25 Feb 2020 at 11:51, Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi John,
>>
>> Seems like this is another instance of
>> https://issues.apache.org/jira/browse/FLINK-8093
>> Could you please provide the full stacktrace?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Feb 24, 2020 at 10:48 PM John Smith 
>> wrote:
>>
>>> Hi. Just upgraded to 1.10.0 And getting the bellow error when I deploy
>>> my tasks.
>>>
>>> The first 1 seems to deploy ok, but subsequent ones seem to this throw
>>> this error. But The seem to work still.
>>>
>>> javax.management.InstanceAlreadyExistsException:
>>> kafka.consumer:type=app-info,id=consumer-2
>>> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>> at
>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>> at
>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:477)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:167)
>>>
>>


Re: Question: Determining Total Recovery Time

2020-02-26 Thread Zhijiang
Hi Morgan,

Your idea is great and i am also interested in it.
 I think it is valuable for some users to estimate the maximum throughput 
capacity based on certain metrics or models.
But I am not quite sure whether it is feasible to do that based on existing 
metrics, at-least exist some limitations as Arvid mentioned.

If I understand correctly, the maximum throughput you want to measure is based 
on source emitting,
 that means some data are still buffered in mid topology and not processed yet. 
If so, we might refer to the metrics of `inputQueueLength`
and `inPoolUsage` together. Note if the `inPoolUsage` reaches 100%, it does not 
mean all the buffers are already filled with data, and just mean
all the available buffers are requested away. So `inputQueueLength` would be 
more precise to predict the available condition if we are aware of the
total buffer amount. In general we can make use of these two together.

 We can find the largest value of above metrics from all the topology tasks, 
which probably hint the bottleneck in the whole view. Then we can estimate
how many available buffers are left to hold more source emitting throughput. 
But there is a limitation if all the metrics are `0` in light-weight situation,
which i mentioned above. So we can not estimate the saturation unless we 
increase the source emit.

Wish good news sharing from you!

Best,
Zhijiang


--
From:Arvid Heise 
Send Time:2020 Feb. 26 (Wed.) 22:29
To:Morgan Geldenhuys 
Cc:Timo Walther ; user 
Subject:Re: Question: Determining Total Recovery Time

Hi Morgan,

doing it in a very general way sure is challenging.

I'd assume that your idea of using the buffer usage has some shortcomings 
(which I don't know), but I also think it's a good starting point.

Have you checked the PoolUsage metrics? [1] You could use them to detect the 
bottleneck and then estimate the max capacity of the whole job.

Btw, I'd be interested in results. We have the idea of adjustable buffer sizes 
and the insights would help us.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#default-shuffle-service
On Tue, Feb 25, 2020 at 6:36 PM Morgan Geldenhuys 
 wrote:

 Hi Arvid, Timo,

 Really appreciate the feedback. I have one final question on this topic and 
hope you dont mind me posing it to you directly. I posted the question earlier 
to the mailing list, but am looking at this more from an academic perspective 
as apposed to manually optimizing a specific job for a specific production 
environment. I do not know the flink internals well enough to determine if I 
can accomplish what I am looking for.

 For an experiment, I need to work out the Total Recovery Time (TRT). I define 
this as the time it takes the system to "catch up" to the current timestamp 
assuming event time processing after a node failure.

 I would like to follow a heuristic approach which is: 

job+environment agnostic, 
does not involve load testing, 
does not involve modifying the job or flink codebase, and 
relies solely on the metrics supplied.  As far as I know (and correct me if im 
wrong): TRT = heartbeat.timeout + recoveryTime+ time to reprocess 
uncheckpointed messages + lag to catch up to current timestamp.

 In order to predict TRT, I need some kind of resource utilization model based 
on the current processing capacity and maximum processing limit, let me explain:

Backpressure is essentially the point at which utilization has reached 100% for 
any particular streaming pipeline and means that the application has reached 
the max limit of messages that it can process per second. 
Lets consider an example: The system is running along perfectly fine under 
normal conditions, accessing external sources, and processing at an average of 
100,000 messages/sec. Lets assume the maximum capacity is around 130,000 
message/sec before back pressure starts propagating messages back up the 
stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at present we 
dont know that 130,000 is the limit without load testing. 
For this example, is there a way of finding this maximum capacity (and hence 
the utilization) without pushing the system to its limit based solely on the 
average current throughput? Possibly by measuring the saturation of certain 
buffers between the operators? 
 If this is possible, the unused utilization can be used to predict how fast a 
system would get back to the current timestamp. Again, its a heuristic so it 
doesn't have to be extremely precise. Any hints would be greatly appreciated.

 Thank you very much!

 Regards,
 Morgan.

On 21.02.20 14:44, Arvid Heise wrote:
Hi Morgan,

sorry for the late reply. In general, that should work. You need to ensure that 
the same task is processing the same record though.

Local copy needs to be state or else the last message would be lost upon 
restart. Performance will take a hit but if that is significant depends on

Re: Batch Flink Job S3 write performance vs Spark

2020-02-26 Thread sri hari kali charan Tummala
Ok, thanks for the clarification.

On Wed, Feb 26, 2020 at 9:22 AM Arvid Heise  wrote:

> Exactly. We use the hadoop-fs as an indirection on top of that, but Spark
> probably does the same.
>
> On Wed, Feb 26, 2020 at 3:52 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Thank you  (the two systems running on Java and using the same set of
>> libraries), so from my understanding, Flink uses AWS SDK behind the scenes
>> same as spark.
>>
>> On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise  wrote:
>>
>>> Fair benchmarks are notoriously difficult to setup.
>>>
>>> Usually, it's easy to find a workload where one system shines and as its
>>> vendor you report that. Then, the competitor benchmarks a different use
>>> case where his system outperforms ours. In the end, customers are more
>>> confused than before.
>>>
>>> You should do your own benchmarks for your own workloads. That is the
>>> only reliable way.
>>>
>>> In the end, both systems use similar setups and improvements in one
>>> system are often also incorporated into the other system with some delay,
>>> such that there should be no ground-breaking differences between the two
>>> systems running on Java and using the same set of libraries.
>>> Of course, if one system has a very specific optimization for your use
>>> case, that could be much faster.
>>>
>>>
>>> On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
 Hi All,

 have a question did anyone compared the performance of Flink batch job
 writing to s3 vs spark writing to s3?

 --
 Thanks & Regards
 Sri Tummala


>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: How JobManager and TaskManager find each other?

2020-02-26 Thread KristoffSC
Thank you very much,
what about if node Ip changes? Does it also supports dns or "raw" IP
addresses only. 
I'm thinking about cloud deployments where actual service/process can be
rescheduled to a different box but there is name resolving mechanism.

Also what if there is NAT between Task Manager and Job Manager.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Batch Flink Job S3 write performance vs Spark

2020-02-26 Thread Arvid Heise
Exactly. We use the hadoop-fs as an indirection on top of that, but Spark
probably does the same.

On Wed, Feb 26, 2020 at 3:52 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Thank you  (the two systems running on Java and using the same set of
> libraries), so from my understanding, Flink uses AWS SDK behind the scenes
> same as spark.
>
> On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise  wrote:
>
>> Fair benchmarks are notoriously difficult to setup.
>>
>> Usually, it's easy to find a workload where one system shines and as its
>> vendor you report that. Then, the competitor benchmarks a different use
>> case where his system outperforms ours. In the end, customers are more
>> confused than before.
>>
>> You should do your own benchmarks for your own workloads. That is the
>> only reliable way.
>>
>> In the end, both systems use similar setups and improvements in one
>> system are often also incorporated into the other system with some delay,
>> such that there should be no ground-breaking differences between the two
>> systems running on Java and using the same set of libraries.
>> Of course, if one system has a very specific optimization for your use
>> case, that could be much faster.
>>
>>
>> On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> have a question did anyone compared the performance of Flink batch job
>>> writing to s3 vs spark writing to s3?
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Re: Async Datastream Checkpointing

2020-02-26 Thread Arvid Heise
Hi Alexandru,

the most likely reason is that you are using *AsyncDataStream* incorrectly.
You have to ensure that all work is done in a separate thread.

AsyncIO will only guarantee that async results are merged back into the
sync stream. The reason is that many libraries have their own thread pool
to send async requests and we didn't want to duplicate that.
In the easiest way, you spawn an executor with
Executors.newFixedThreadPool(10) and submit the jobs and then feed the
results back.

On Wed, Feb 26, 2020 at 2:27 PM Alexandru Vasiu <
alexandru.va...@complyadvantage.com> wrote:

> Hi,
>
> We have a pipeline which has somewhere a step of 
> *AsyncDataStream.unorderedWait
> *where some web requests are executed. The pipeline works, but when it
> tries to make checkpoints it fails always with a timeout error (and it
> stops at the component containing this async data stream). We are using
> Flink 1.10.0 in Scala 2.12.10 and this config for checkpoints:
>
> "checkpoints_interval": 18,
> "min_pause_between_checkpoints": 1,
> "checkpoints_timeout": 60,
> "tolerable_checkpoints_failure_number": 20,
> "max_concurrent_checkpoints": 1,
> "checkpoint_mode": CheckpointingMode.EXACTLY_ONCE
>
> Do you know why checkpointing doesn't work in this case?
>
> Thank you,
> Alex Vasiu
>
> ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This
> message, including any attachments, is intended only for the use of the
> individual(s) to whom it is addressed and may contain information that is
> strictly privileged/confidential. Any other distribution, copying or
> disclosure is strictly prohibited. If you are not the intended recipient or
> have received this message in error, please notify the sender immediately
> by reply email and permanently delete this message including any
> attachments, without reading it or making a copy. Contact us
> . Website
> .


Re: How JobManager and TaskManager find each other?

2020-02-26 Thread Zhijiang
I guess you are indicating the data shuffle process among different task 
managers.

While task manager(TM) registering itself to the job manager(JM), it also 
carries the infos of ip address and data port that it listens to.
During the process of scheduling tasks, the upstream TM's address info(ip, 
port) would be covered inside the data structure of task
 deployment descriptor for respective downstream tasks. Then the downstream 
tasks can connect to the remote upstream TM
to request data.

In short words, JM knows all the addresses of TMs via registration, then these 
addresses would be sent to the required peers during task schedule and 
deployment.

Best,
Zhijiang


--
From:KristoffSC 
Send Time:2020 Feb. 26 (Wed.) 19:39
To:user 
Subject:Re: How JobManager and TaskManager find each other?

Thanks all for the answers,

One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Batch Flink Job S3 write performance vs Spark

2020-02-26 Thread sri hari kali charan Tummala
Thank you  (the two systems running on Java and using the same set of
libraries), so from my understanding, Flink uses AWS SDK behind the scenes
same as spark.

On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise  wrote:

> Fair benchmarks are notoriously difficult to setup.
>
> Usually, it's easy to find a workload where one system shines and as its
> vendor you report that. Then, the competitor benchmarks a different use
> case where his system outperforms ours. In the end, customers are more
> confused than before.
>
> You should do your own benchmarks for your own workloads. That is the only
> reliable way.
>
> In the end, both systems use similar setups and improvements in one system
> are often also incorporated into the other system with some delay, such
> that there should be no ground-breaking differences between the two systems
> running on Java and using the same set of libraries.
> Of course, if one system has a very specific optimization for your use
> case, that could be much faster.
>
>
> On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi All,
>>
>> have a question did anyone compared the performance of Flink batch job
>> writing to s3 vs spark writing to s3?
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-26 Thread Arvid Heise
Creds on AWS are typically resolved through roles assigned to K8s pods (for
example with KIAM [1]).

[1] https://github.com/uswitch/kiam

On Tue, Feb 25, 2020 at 3:36 AM Yang Wang  wrote:

> Hi M Singh,
>
> > Mans - If we use the session based deployment option for K8 - I thought
>> K8 will automatically restarts any failed TM or JM.
>> In the case of failed TM - the job will probably recover, but in the case
>> of failed JM - perhaps we need to resubmit all jobs.
>> Let me know if I have misunderstood anything.
>
>
> Since you are starting JM/TM with K8s deployment, when they failed new
> JM/TM will be created. If you do not set the high
> availability configuration, your jobs could recover when TM failed.
> However, they could not recover when JM failed. With HA
> configured, the jobs could always be recovered and you do not need to
> re-submit again.
>
> > Mans - Is there any safe way of a passing creds ?
>
>
> Yes, you are right, Using configmap to pass the credentials is not safe.
> On K8s, i think you could use secrets instead[1].
>
> > Mans - Does a task manager failure cause the job to fail ?  My
>> understanding is the JM failure are catastrophic while TM failures are
>> recoverable.
>
>
> What i mean is the job failed, and it could be restarted by your
> configured restart strategy[2].
>
> > Mans - So if we are saving checkpoint in S3 then there is no need for
>> disks - should we use emptyDir ?
>
>
> Yes, if you are saving the checkpoint in S3 and also set the
> `high-availability.storageDir` to S3. Then you do not need persistent
> volume. Since
> the local directory is only used for local cache, so you could directly
> use the overlay filesystem or empryDir(better io performance).
>
>
> [1].
> https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
>
> M Singh  于2020年2月25日周二 上午5:53写道:
>
>> Thanks Wang for your detailed answers.
>>
>> From what I understand the native_kubernetes also leans towards creating
>> a session and submitting a job to it.
>>
>> Regarding other recommendations, please my inline comments and advice.
>>
>> On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang <
>> danrtsey...@gmail.com> wrote:
>>
>>
>> Hi Singh,
>>
>> Glad to hear that you are looking to run Flink on the Kubernetes. I am
>> trying to answer your question based on my limited knowledge and
>> others could correct me and add some more supplements.
>>
>> I think the biggest difference between session cluster and per-job cluster
>> on Kubernetesis the isolation. Since for per-job, a dedicated Flink
>> cluster
>> will be started for the only one job and no any other jobs could be
>> submitted.
>> Once the job is finished, then the Flink cluster will be
>> destroyed immediately.
>> The second point is one-step submission. You do not need to start a Flink
>> cluster first and then submit a job to the existing session.
>>
>> > Are there any benefits with regards to
>> 1. Configuring the jobs
>> No matter you are using the per-job cluster or submitting to the existing
>> session cluster, they share the configuration mechanism. You do not have
>> to change any codes and configurations.
>>
>> 2. Scaling the taskmanager
>> Since you are using the Standalone cluster on Kubernetes, it do not
>> provide
>> an active resourcemanager. You need to use external tools to monitor and
>> scale up the taskmanagers. The active integration is still evolving and
>> you
>> could have a taste[1].
>>
>> Mans - If we use the session based deployment option for K8 - I thought
>> K8 will automatically restarts any failed TM or JM.
>> In the case of failed TM - the job will probably recover, but in the case
>> of failed JM - perhaps we need to resubmit all jobs.
>> Let me know if I have misunderstood anything.
>>
>> 3. Restarting jobs
>> For the session cluster, you could directly cancel the job and re-submit.
>> And
>> for per-job cluster, when the job is canceled, you need to start a new
>> per-job
>> cluster from the latest savepoint.
>>
>> 4. Managing the flink jobs
>> The rest api and flink command line could be used to managing the
>> jobs(e.g.
>> flink cancel, etc.). I think there is no difference for session and
>> per-job here.
>>
>> 5. Passing credentials (in case of AWS, etc)
>> I am not sure how do you provide your credentials. If you put them in
>> the
>> config map and then mount into the jobmanager/taskmanager pod, then both
>> session and per-job could support this way.
>>
>> Mans - Is there any safe way of a passing creds ?
>>
>> 6. Fault tolerence and recovery of jobs from failure
>> For session cluster, if one taskmanager crashed, then all the jobs which
>> have tasks
>> on this taskmanager will failed.
>> Both session and per-job could be configured with high availability and
>> recover
>> from the latest checkpoint.
>>
>> Mans - Does a task manager failure cause the job to fail ?  My
>> und

Re: Batch Flink Job S3 write performance vs Spark

2020-02-26 Thread Arvid Heise
Fair benchmarks are notoriously difficult to setup.

Usually, it's easy to find a workload where one system shines and as its
vendor you report that. Then, the competitor benchmarks a different use
case where his system outperforms ours. In the end, customers are more
confused than before.

You should do your own benchmarks for your own workloads. That is the only
reliable way.

In the end, both systems use similar setups and improvements in one system
are often also incorporated into the other system with some delay, such
that there should be no ground-breaking differences between the two systems
running on Java and using the same set of libraries.
Of course, if one system has a very specific optimization for your use
case, that could be much faster.


On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> have a question did anyone compared the performance of Flink batch job
> writing to s3 vs spark writing to s3?
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-02-26 Thread godfrey he
Hi Kant, if you want the store the catalog data in Local Filesystem/HDFS,
you can implement a user defined catalog (just need to implement Catalog
interface)

Bests,
Godfrey

kant kodali  于2020年2月26日周三 下午12:28写道:

> Hi Jingsong,
>
> Can I store it in Local Filesystem/HDFS?
>
> Thanks!
>
> On Mon, Jan 20, 2020 at 6:59 PM Jingsong Li 
> wrote:
>
>> Hi Kant,
>>
>> If you want your view persisted, you must to dock a catalog like hive
>> catalog, it stores views in the metastore with mysql.
>> - In 1.10, you can store views in catalog through "Catalog.createTable",
>> you can create a "CatalogViewImpl". This is an internal API, which is not
>> easy to use.
>> - In 1.11, we will introduce create view DDL for "TableEnv.sqlUpdate"
>> and "TableEnv.createView". It will be easy to use.
>>
>> Best,
>> Jingsong Lee
>>
>> On Tue, Jan 21, 2020 at 10:03 AM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> The TableEnv#createTemporaryView and CREATE VIEW in SQL Cli both creates
>>> temporary views which is not persisted and will lost after session close.
>>> I think the persisted views will be supported in 1.11.
>>>
>>> Best,
>>> Jark
>>>
>>> 2020年1月20日 18:46,kant kodali  写道:
>>>
>>> Hi Jingsong,
>>>
>>> Thanks a lot, I think I can live with
>>> TableEnvironment.createTemporaryView in Flink 1.10 (which I am expecting to
>>> be released this month) but are these views persisted somewhere? for
>>> example across sessions? or say I stop my application and start again will
>>> it work as expected?
>>>
>>> Thanks!
>>>
>>>
>>> On Mon, Jan 20, 2020 at 1:12 AM Jingsong Li 
>>> wrote:
>>>
 Hi Kant,

 Sorry, 1.10 not support "CREATE VIEW" in raw SQL too. Workaround is:
 - Using TableEnvironment.createTemporaryView...
 - Or using "create view" and "drop view" in the sql-client.
 - Or using hive catalog, in 1.10, we support query catalog views.

 FLIP-71 will be finished  in 1.11 soon.

 Best,
 Jingsong Lee

 On Sun, Jan 19, 2020 at 4:10 PM kant kodali  wrote:

> I tried the following.
>
> bsTableEnv.sqlUpdate("CREATE VIEW my_view AS SELECT * FROM sample1 FULL 
> OUTER JOIN sample2 on sample1.f0=sample2.f0");
>
> Table result = bsTableEnv.sqlQuery("select * from my_view");
>
> It looks like
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-71+-+E2E+View+support+in+FLINK+SQL
>  Views
> are not supported. Can I expect them to be supported in Flink 1.10?
>
> Currently, with Spark SQL when the query gets big I break it down into
> views and this is one of the most important features my application relies
> on. is there any workaround for this at the moment?
>
> Thanks!
>
> On Sat, Jan 18, 2020 at 6:24 PM kant kodali 
> wrote:
>
>> Hi All,
>>
>> Does Flink 1.9 support create or replace views syntax in raw SQL?
>> like spark streaming does?
>>
>> Thanks!
>>
>

 --
 Best, Jingsong Lee

>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

2020-02-26 Thread Jark Wu
Hi Jingsong,

I don't think it should follow SQL CAST semantics, because it is out of
SQL, it happens in connectors which converts users'/external's format into
SQL types.
I also doubt "timestampFormat" may not work in some cases, because the
timestamp format maybe various and mixed in a topic.

Best,
Jark

On Wed, 26 Feb 2020 at 22:20, Jingsong Li  wrote:

> Thanks all for your discussion.
>
> Hi Dawid,
>
> +1 to apply the logic of parsing a SQL timestamp literal.
>
> I don't fully understand the matrix your list. Should this be the
> semantics of SQL cast?
> Do you mean this is implicit cast in JSON parser?
> I doubt that because these implicit casts are not support
> in LogicalTypeCasts. And it is not so good to understand when it occur
> silently.
>
> How about add "timestampFormat" property to JSON parser? Its default value
> is SQL timestamp literal format. And user can configure this.
>
> Best,
> Jingsong Lee
>
> On Wed, Feb 26, 2020 at 6:39 PM Jark Wu  wrote:
>
>> Hi Dawid,
>>
>> I agree with you. If we want to loosen the format constraint, the
>> important piece is the conversion matrix.
>>
>> The conversion matrix you listed makes sense to me. From my understanding,
>> there should be 6 combination.
>> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH
>> TIMEZONE to make the matrix complete.
>> When the community reach an agreement on this, we should write it down on
>> the documentation and follow the matrix in all text-based formats.
>>
>> Regarding to the RFC 3339 compatibility mode switch, it also sounds good
>> to
>> me.
>>
>> Best,
>> Jark
>>
>> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz 
>> wrote:
>>
>> > Hi all,
>> >
>> > @NiYanchun Thank you for reporting this. Yes I think we could improve
>> the
>> > behaviour of the JSON format.
>> >
>> > @Jark First of all I do agree we could/should improve the
>> > "user-friendliness" of the JSON format (and unify the behavior across
>> text
>> > based formats). I am not sure though if it is as simple as just ignore
>> the
>> > time zone here.
>> >
>> > My suggestion would be rather to apply the logic of parsing a SQL
>> > timestamp literal (if the expected type is of
>> LogicalTypeFamily.TIMESTAMP),
>> > which would actually also derive the "stored" type of the timestamp
>> (either
>> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql
>> conversion.
>> > Therefore if the
>> >
>> > parsed type |requested type|
>> behaviour
>> >
>> > WITHOUT TIMEZONE| WITH TIMEZONE | store the local
>> > timezone with the data
>> >
>> > WITHOUT TIMEZONE| WITH LOCAL TIMEZONE  | do nothing in the data,
>> > interpret the time in local timezone
>> >
>> > WITH TIMEZONE  | WITH LOCAL TIMEZONE   | convert the
>> timestamp
>> > to local timezone and drop the time zone information
>> >
>> > WITH TIMEZONE  | WITHOUT TIMEZONE   | drop the time zone
>> > information
>> >
>> > It might just boil down to what you said "being more lenient with
>> regards
>> > to parsing the time zone". Nevertheless I think this way it is a bit
>> better
>> > defined behaviour, especially as it has a defined behaviour when
>> converting
>> > between representation with or without time zone.
>> >
>> > An implementation note. I think we should aim to base the implementation
>> > on the DataTypes already rather than going back to the TypeInformation.
>> >
>> > I would still try to leave the RFC 3339 compatibility mode, but maybe
>> for
>> > that mode it would make sense to not support any types WITHOUT TIMEZONE?
>> > This would be enabled with a switch (disabled by default). As I
>> understand
>> > the RFC, making the time zone mandatory is actually a big part of the
>> > standard as it makes time types unambiguous.
>> >
>> > What do you think?
>> >
>> > Ps. I cross posted this on the dev ML.
>> >
>> > Best,
>> >
>> > Dawid
>> >
>> >
>> > On 26/02/2020 03:45, Jark Wu wrote:
>> >
>> > Yes, I'm also in favor of loosen the datetime format constraint.
>> > I guess most of the users don't know there is a JSON standard which
>> > follows RFC 3339.
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 26 Feb 2020 at 10:06, NiYanchun  wrote:
>> >
>> >> Yes, these Types definition are general. As a user/developer, I would
>> >> support “loosen it for usability”. If not, may add some explanation
>> >> about JSON.
>> >>
>> >>
>> >>
>> >>  Original Message
>> >> *Sender:* Jark Wu
>> >> *Recipient:* Outlook; Dawid Wysakowicz<
>> >> dwysakow...@apache.org>
>> >> *Cc:* godfrey he; Leonard Xu;
>> >> user
>> >> *Date:* Wednesday, Feb 26, 2020 09:55
>> >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>> >>
>> >> Hi Outlook,
>> >>
>> >> The explanation in DataTypes is correct, it is compliant to SQL
>> standard.
>> >> The problem is that JsonRowDeserializationSchema only support
>> RFC-3339.
>> >> On the other hand, CsvRowDeserializationSchema supports to parse
>> >> "2019-07-09 02:02:0

Re: async io parallelism

2020-02-26 Thread Arvid Heise
Hi Alexey,

no there are as many instances as configured, but each would operate on the
current key group range and maintain the order on that.
So messages with the same key are never shuffled with ordered async.
Messages with different key would be processed potentially independently
and could change order.

Best,

Arvid

On Mon, Feb 24, 2020 at 4:43 PM Alexey Trenikhun  wrote:

> Arvid, thank you.
> So there is single instance of FIFO per async IO operator regardless of
> parallelism of the async IO operator?
> Thanks,
> Alexey
>
> --
> *From:* Arvid Heise 
> *Sent:* Saturday, February 22, 2020 1:23:01 PM
> *To:* Alexey Trenikhun 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: async io parallelism
>
> Hi Alexey,
>
> the short answer is: order is preserved in all cases.
>
> Basically, ordered asyncIO maintains an internal FIFO queue where all
> pending elements reside. All async results are saved into this queue, but
> elements will only be outputted when the head element has a result.
>
> So assume you have three input records i1, i2, i3 and get the outputs
> asynchronously in the order o2, o1, o3 after 100 ms each, then there is no
> output after receiving o2, then o1 and o2 are outputted after 200 ms, and
> then o3 after 300 ms.
>
> Best,
>
> Arvid
>
> On Sat, Feb 22, 2020 at 2:22 AM Alexey Trenikhun  wrote:
>
> Hello,
> Let's say, my elements are simple key-value pairs, elements are coming
> from Kafka, where they were partitioned by "key", then I do processing
> using KeyedProcessFunction (keyed by same "key"), then I enrich elements
> using ordered RichAsyncFunction, then output to another
> KeyedProcessFunction (keyed by same "key") and then write to Kafka topic,
> again partitioned by same "key", something like this:
>
> FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction)
> -> AsyncDataStream.orderedWait() -> keyBy("key")->Output(
> KeyedProcessFunction)->FlinkKafkaProducer
>
> Will it preserve order of events with same "key"?
>
>- Will Output function receive elements with same "key" in same order
>as they were originally in Kafka?
>- Will FlinkKafkaProducer writes elements with same "key" in same
>order as they were originally in Kafka?
>- Does it depend on parallelism of async IO? Documentation says "the
>stream order is preserved", but if there are multiple parallel instances of
>async function, does it mean order relative to each single instance? Or
>total stream order?
>
> Thanks,
> Alexey
>
>


Re: Can Connected Components run on a streaming dataset using iterate delta?

2020-02-26 Thread Arvid Heise
Hi Kant,

there has not been high demand yet and it's always a matter of priority for
the scarce manpower.
I'd probably get inspired by gelly and implement it on DataStream in your
stead.

On Sat, Feb 22, 2020 at 11:23 AM kant kodali  wrote:

> Hi,
>
> Thanks for that but Looks like it is already available
> https://github.com/vasia/gelly-streaming in streaming but I wonder why
> this is not part of Flink? there are no releases either.
>
> Thanks!
>
> On Tue, Feb 18, 2020 at 9:13 AM Yun Gao  wrote:
>
>>Hi Kant,
>>
>>   As far as I know, I think the current example connected
>> components implementation based on DataSet API could not be extended to
>> streaming data or incremental batch directly.
>>
>>   From the algorithm's perspective, if the graph only add
>> edge and never remove edge, I think the connected components should be able
>> to be updated incrementally when the graph changes: When some edges are
>> added, a new search should be started from the sources of the added edges
>> to propagate its component ID. This will trigger a new pass of update of
>> the following vertices, and the updates continues until no vertices'
>> component ID get updated. However, if there are also edge removes, I think
>> the incremental computation should not be easily achieved.
>>
>>   To implement the above logic on Flink, I think currently
>> there should be two possible methods:
>> 1) Use DataSet API and DataSet iteration, maintains
>> the graph structure and the latest computation result in a storage, and
>> whenever there are enough changes to the graph, submits a new DataSet job
>> to recompute the result. The job should load the edges, the latest
>> component id and whether it is the source of the newly added edges for each
>> graph vertex, and then start the above incremental computation logic.
>> 2) Flink also provide DataStream iteration API[1]
>> that enables iterating on the unbounded data. In this case the graph
>> modification should be modeled as a datastream, and some operators inside
>> the iteration should maintain the graph structure and current component id.
>> whenever there are enough changes, it starts a new pass of computation.
>>
>> Best,
>>  Yun
>>
>> [1] Flink DataStream iteration,
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#iterations
>>
>> --
>> From:kant kodali 
>> Send Time:2020 Feb. 18 (Tue.) 15:11
>> To:user 
>> Subject:Can Connected Components run on a streaming dataset using iterate
>> delta?
>>
>> Hi All,
>>
>> I am wondering if connected components
>> 
>> can run on a streaming data? or say incremental batch?
>>
>> I see that with delta iteration not all vertices need to participate at
>> every iteration which is great but in my case the graph is evolving over
>> time other words new edges are getting added over time. If so, does the
>> algorithm needs to run on the entire graph or can it simply run on the new
>> batch of edges?
>>
>> Finally, What is the best way to compute connected components on Graphs
>> evolving over time? Should I use streaming or batch or any custom
>> incremental approach? Also, the documentation take maxIterations as an
>> input. How can I come up with a good number for max iterations? and once I
>> come up with a good number for max Iterations is the algorithm guaranteed
>> to converge?
>>
>>
>> Thanks,
>> Kant
>>
>>
>>


Re: state schema evolution for case classes

2020-02-26 Thread Khachatryan Roman
Hi Apoorv,

You can achieve this by implementing custom serializers for your state.
Please refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html

Regards,
Roman


On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Hi Roman,
>
> I have successfully migrated to flink 1.8.2 with the savepoint created by
> flink 1.6.2.
> Now I have to modify few case classes due to new requirement I have
> created a savepoint and when I run the app with modified class from the
> savepoint it throws error "state not compatible"
> Previously there were no serializer used.
> I now wish to support state schema Hence need suggestion how can i achieve
> that ?
>
> Regards
>
> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi ApoorvK,
>>
>> I understand that you have a savepoint created by Flink 1.6.2 and you
>> want to use it with Flink 1.8.2. The classes themselves weren't modified.
>> Is that correct?
>> Which serializer did you use?
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Feb 25, 2020 at 8:38 AM ApoorvK 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Earlier we have developed on flink 1.6.2 , So there are lots of case
>>> classes
>>> which have Map,Nested case class within them for example below :
>>>
>>> case class MyCaseClass(var a: Boolean,
>>>  var b: Boolean,
>>>  var c: Boolean,
>>>  var d: NestedCaseClass,
>>>  var e:Int){
>>> def this(){this(false,false,new NestedCaseClass,0)}
>>> }
>>>
>>>
>>> Now we have migrated to flink 1.8.2 , I need help to figure out how can I
>>> achieve state schema evolution for such classes.
>>>
>>> 1. Is creating avro for these classes now, and implement avro
>>> serialisation
>>> will that work ?
>>> 2. Or if I register kyroserialiser with protobuf serialiser at env?
>>>
>>> Please suggest what can be done here, or redirect for the avros
>>> serialisation example.
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Question: Determining Total Recovery Time

2020-02-26 Thread Arvid Heise
Hi Morgan,

doing it in a very general way sure is challenging.

I'd assume that your idea of using the buffer usage has some shortcomings
(which I don't know), but I also think it's a good starting point.

Have you checked the PoolUsage metrics? [1] You could use them to detect
the bottleneck and then estimate the max capacity of the whole job.

Btw, I'd be interested in results. We have the idea of adjustable buffer
sizes and the insights would help us.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#default-shuffle-service

On Tue, Feb 25, 2020 at 6:36 PM Morgan Geldenhuys <
morgan.geldenh...@tu-berlin.de> wrote:

> Hi Arvid, Timo,
>
> Really appreciate the feedback. I have one final question on this topic
> and hope you dont mind me posing it to you directly. I posted the question
> earlier to the mailing list, but am looking at this more from an academic
> perspective as apposed to manually optimizing a specific job for a specific
> production environment. I do not know the flink internals well enough to
> determine if I can accomplish what I am looking for.
>
> For an experiment, I need to work out the Total Recovery Time (TRT). I
> define this as the time it takes the system to "catch up" to the current
> timestamp assuming event time processing after a node failure.
>
> I would like to follow a heuristic approach which is:
>
>1. job+environment agnostic,
>2. does not involve load testing,
>3. does not involve modifying the job or flink codebase, and
>4. relies solely on the metrics supplied.
>
> As far as I know (and correct me if im wrong): TRT = heartbeat.timeout +
> recoveryTime+ time to reprocess uncheckpointed messages + lag to catch up
> to current timestamp.
>
> In order to predict TRT, I need some kind of resource utilization model
> based on the current processing capacity and maximum processing limit, let
> me explain:
>
>- Backpressure is essentially the point at which utilization has
>reached 100% for any particular streaming pipeline and means that the
>application has reached the max limit of messages that it can process per
>second.
>- Lets consider an example: The system is running along perfectly fine
>under normal conditions, accessing external sources, and processing at an
>average of 100,000 messages/sec. Lets assume the maximum capacity is around
>130,000 message/sec before back pressure starts propagating messages back
>up the stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at
>present we dont know that 130,000 is the limit without load testing.
>- For this example, is there a way of finding this maximum capacity
>(and hence the utilization) without pushing the system to its limit based
>solely on the average current throughput? Possibly by measuring the
>saturation of certain buffers between the operators?
>
> If this is possible, the unused utilization can be used to predict how
> fast a system would get back to the current timestamp. Again, its a
> heuristic so it doesn't have to be extremely precise. Any hints would be
> greatly appreciated.
>
> Thank you very much!
>
> Regards,
> Morgan.
>
> On 21.02.20 14:44, Arvid Heise wrote:
>
> Hi Morgan,
>
> sorry for the late reply. In general, that should work. You need to ensure
> that the same task is processing the same record though.
>
> Local copy needs to be state or else the last message would be lost upon
> restart. Performance will take a hit but if that is significant depends on
> the remaining pipeline.
>
> Btw, at least once should be enough for that, since you implicitly
> deduplicating.
>
> Best,
>
> Arvid
>
> On Tue, Feb 11, 2020 at 11:24 AM Morgan Geldenhuys <
> morgan.geldenh...@tu-berlin.de> wrote:
>
>> Thanks for the advice, i will look into it.
>>
>> Had a quick think about another simple solution but we would need a hook
>> into the checkpoint process from the task/operator perspective, which I
>> haven't looked into yet. It would work like this:
>>
>> - The sink operators (?) would keep a local copy of the last message
>> processed (or digest?), the current timestamp, and a boolean value
>> indicating whether or not the system is in recovery or not.
>> - While not in recovery, update the local copy and timestamp with each
>> new event processed.
>> - When a failure is detected and the taskmanagers are notified to
>> rollback, we use the hook into this process to switch the boolean value to
>> true.
>> - While true, it compares each new message with the last one processed
>> before the recovery process was initiated.
>> - When a match is found, the difference between the previous and current
>> timestamp is calculated and outputted as a custom metric and the boolean is
>> reset to false.
>>
>> From here, the mean total recovery time could be calculated across the
>> operators. Not sure how it would impact on performance, but i doubt it
>> would be significant. We would need to ens

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

2020-02-26 Thread Jingsong Li
Thanks all for your discussion.

Hi Dawid,

+1 to apply the logic of parsing a SQL timestamp literal.

I don't fully understand the matrix your list. Should this be the semantics
of SQL cast?
Do you mean this is implicit cast in JSON parser?
I doubt that because these implicit casts are not support
in LogicalTypeCasts. And it is not so good to understand when it occur
silently.

How about add "timestampFormat" property to JSON parser? Its default value
is SQL timestamp literal format. And user can configure this.

Best,
Jingsong Lee

On Wed, Feb 26, 2020 at 6:39 PM Jark Wu  wrote:

> Hi Dawid,
>
> I agree with you. If we want to loosen the format constraint, the
> important piece is the conversion matrix.
>
> The conversion matrix you listed makes sense to me. From my understanding,
> there should be 6 combination.
> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH
> TIMEZONE to make the matrix complete.
> When the community reach an agreement on this, we should write it down on
> the documentation and follow the matrix in all text-based formats.
>
> Regarding to the RFC 3339 compatibility mode switch, it also sounds good to
> me.
>
> Best,
> Jark
>
> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz 
> wrote:
>
> > Hi all,
> >
> > @NiYanchun Thank you for reporting this. Yes I think we could improve the
> > behaviour of the JSON format.
> >
> > @Jark First of all I do agree we could/should improve the
> > "user-friendliness" of the JSON format (and unify the behavior across
> text
> > based formats). I am not sure though if it is as simple as just ignore
> the
> > time zone here.
> >
> > My suggestion would be rather to apply the logic of parsing a SQL
> > timestamp literal (if the expected type is of
> LogicalTypeFamily.TIMESTAMP),
> > which would actually also derive the "stored" type of the timestamp
> (either
> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql
> conversion.
> > Therefore if the
> >
> > parsed type |requested type|
> behaviour
> >
> > WITHOUT TIMEZONE| WITH TIMEZONE | store the local
> > timezone with the data
> >
> > WITHOUT TIMEZONE| WITH LOCAL TIMEZONE  | do nothing in the data,
> > interpret the time in local timezone
> >
> > WITH TIMEZONE  | WITH LOCAL TIMEZONE   | convert the
> timestamp
> > to local timezone and drop the time zone information
> >
> > WITH TIMEZONE  | WITHOUT TIMEZONE   | drop the time zone
> > information
> >
> > It might just boil down to what you said "being more lenient with regards
> > to parsing the time zone". Nevertheless I think this way it is a bit
> better
> > defined behaviour, especially as it has a defined behaviour when
> converting
> > between representation with or without time zone.
> >
> > An implementation note. I think we should aim to base the implementation
> > on the DataTypes already rather than going back to the TypeInformation.
> >
> > I would still try to leave the RFC 3339 compatibility mode, but maybe for
> > that mode it would make sense to not support any types WITHOUT TIMEZONE?
> > This would be enabled with a switch (disabled by default). As I
> understand
> > the RFC, making the time zone mandatory is actually a big part of the
> > standard as it makes time types unambiguous.
> >
> > What do you think?
> >
> > Ps. I cross posted this on the dev ML.
> >
> > Best,
> >
> > Dawid
> >
> >
> > On 26/02/2020 03:45, Jark Wu wrote:
> >
> > Yes, I'm also in favor of loosen the datetime format constraint.
> > I guess most of the users don't know there is a JSON standard which
> > follows RFC 3339.
> >
> > Best,
> > Jark
> >
> > On Wed, 26 Feb 2020 at 10:06, NiYanchun  wrote:
> >
> >> Yes, these Types definition are general. As a user/developer, I would
> >> support “loosen it for usability”. If not, may add some explanation
> >> about JSON.
> >>
> >>
> >>
> >>  Original Message
> >> *Sender:* Jark Wu
> >> *Recipient:* Outlook; Dawid Wysakowicz<
> >> dwysakow...@apache.org>
> >> *Cc:* godfrey he; Leonard Xu;
> >> user
> >> *Date:* Wednesday, Feb 26, 2020 09:55
> >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
> >>
> >> Hi Outlook,
> >>
> >> The explanation in DataTypes is correct, it is compliant to SQL
> standard.
> >> The problem is that JsonRowDeserializationSchema only support  RFC-3339.
> >> On the other hand, CsvRowDeserializationSchema supports to parse
> >> "2019-07-09 02:02:00.040".
> >>
> >> So the question is shall we insist on the RFC-3339 "standard"? Shall we
> >> loosen it for usability?
> >> What do you think @Dawid Wysakowicz  ?
> >>
> >> Best,
> >> Jark
> >>
> >> On Wed, 26 Feb 2020 at 09:29, Outlook  wrote:
> >>
> >>> Thanks Godfrey and Leonard, I tried your answers, result is OK.
> >>>
> >>>
> >>> BTW, I think if only accept such format for a long time, the  TIME and
> >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may be
> >>> better to update,
> >>>
> >>> be

Re: Kafka consumer group id and Flink

2020-02-26 Thread Benchao Li
Hi Debasish,

AFAIK, Flink Kafka Connector still works like that, the community will keep
the document updated.

Flink Kafka connector has three main modes (and also specific offsets and
specific timestamp):
- earliest-offset: no matter what your offset of "group.id" is currently,
it always consumes from the earliest offset.
- group-offset: consumes from offsets kept in kafka of the "group.id" (or
zookeeper in older kafka versions)
- latest-offset: no matter what your offset of "group.id" is currently, it
always consumes from the latest offset.
And it will automatically commit offsets to the kafka for "group.id" you
specified.


Plus the checkpoint, if you enable checkpoint, it always start from the
offsets kept in checkpoint, no matter what mode you set.
And will commit offsets to the kafka for "group.id" when checkpoint
succeeds by default.

Different jobs won't affect each other for consuming. However, if you
specify same "group.id" for different jobs, maybe your offsets save to
kafka will be messed.

Debasish Ghosh  于2020年2月26日周三 下午9:47写道:

>  Hello -
>
> Can someone please point me to some document / code snippet as to how
> Flink uses Kafka consumer group property "group.id". In the message
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html#none
> I see the following ..
>
> Internally, the Flink Kafka connectors don’t use the consumer group
>> management functionality because they are using lower-level APIs
>> (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each
>> parallel instance for more control on individual partition consumption. So,
>> essentially, the “group.id” setting in the Flink Kafka connector is only
>> used for committing offsets back to ZK / Kafka brokers.
>
>
> This message is quite old - has anything changed since then? Looks like
> this property is a mandatory setting though.
>
> If I have multiple flink streaming jobs, since each job tracks the offsets
> individually and saves it by the internal checkpoint mechanism, is there no
> need to specify a different groupd.id for each job ? And in the case when
> there are two jobs reading the same topic but has different business logic
> will that work correctly although the consumers will be in the same
> consumer-group?
>
> Thanks for any help.
> regards.
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Kafka consumer group id and Flink

2020-02-26 Thread Debasish Ghosh
 Hello -

Can someone please point me to some document / code snippet as to how Flink
uses Kafka consumer group property "group.id". In the message
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html#none
I see the following ..

Internally, the Flink Kafka connectors don’t use the consumer group
> management functionality because they are using lower-level APIs
> (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each
> parallel instance for more control on individual partition consumption. So,
> essentially, the “group.id” setting in the Flink Kafka connector is only
> used for committing offsets back to ZK / Kafka brokers.


This message is quite old - has anything changed since then? Looks like
this property is a mandatory setting though.

If I have multiple flink streaming jobs, since each job tracks the offsets
individually and saves it by the internal checkpoint mechanism, is there no
need to specify a different groupd.id for each job ? And in the case when
there are two jobs reading the same topic but has different business logic
will that work correctly although the consumers will be in the same
consumer-group?

Thanks for any help.
regards.
-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Async Datastream Checkpointing

2020-02-26 Thread Alexandru Vasiu
Hi,

We have a pipeline which has somewhere a step of *AsyncDataStream.unorderedWait
*where some web requests are executed. The pipeline works, but when it
tries to make checkpoints it fails always with a timeout error (and it
stops at the component containing this async data stream). We are using
Flink 1.10.0 in Scala 2.12.10 and this config for checkpoints:

"checkpoints_interval": 18,
"min_pause_between_checkpoints": 1,
"checkpoints_timeout": 60,
"tolerable_checkpoints_failure_number": 20,
"max_concurrent_checkpoints": 1,
"checkpoint_mode": CheckpointingMode.EXACTLY_ONCE

Do you know why checkpointing doesn't work in this case?

Thank you,
Alex Vasiu

-- 
ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, 
including any attachments, is intended only for the use of the 
individual(s) to whom it is addressed and may contain information that is 
strictly privileged/confidential. Any other distribution, copying or 
disclosure is strictly prohibited. If you are not the intended recipient or 
have received this message in error, please notify the sender immediately 
by reply email and permanently delete this message including any 
attachments, without reading it or making a copy. Contact us 
. Website 
.


How to set unorderedWait/orderedWait properties in Table API when using Async I/O

2020-02-26 Thread 郑泽辉
Hi all,
I'm using Blink Planner(flink v1.9) and I create a AsyncJdbcTableSource
class implements LookupableTableSource, but when I override the
getAsyncLookupFunction(), I found the results of async method(by Vertx) is
in order.

But I don't need the stream order is preserved and just want the result
records emitted out of order to improve processing speed. In DataStream API
I can easily set the result order guarantee(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#order-of-results),
and my question is,

how to set this in Table API or SQL API with Blink Planner?

Thanks. Regards


Scala string interpolation weird behaviour with Flink Streaming Java tests dependency.

2020-02-26 Thread David Magalhães
I'm testing a custom sink that uses TwoPhaseCommit with the test harness
provided by flink-streaming-java.

"org.apache.flink" %% "flink-streaming-java" % flinkVersion % "test"
classifier "tests"

Using this, in some tests that I use scala string interpolation, the string
output have a strange behaviour, like it changes the place where values
goes.

Example:

val account_id = "account0"
val partitionDate = "202002"
val fileName = "2020-02-26_11-09-46.parquet"

s"account_id=$accountId/partition_date=$partitionDate/$fileName"

Should be:
account_id=account0/partition_date=202002/2020-02-26_11-09-46.parquet
Actual result:
account_id=account0/partition_date=2020-02-26_11-09-46.parquet/202002

The variables values after the string interpolation do change values.

Concat behaviour is not affected:

"account_id=".concat(accountId).concat("/partition_date=").concat(partitionDate).concat("/").concat(fileName)

If I remove the flink-streaming-java dependency is works as expected.

Any thoughts why is behaving this way ?


Process parquet files in batch mode with blink planner

2020-02-26 Thread olivier_brazet
Hi community,

For a PoC I need to process some parquet files in batch mode.

I managed to implement some processing using the DataSet API. It is working fine. 
Now, I would like to test the SQL API and the blink planner.

If I do well understand, the ParquetTableSource is not compatible with the blink planner. Thus I am wondering if there is a TableSource compatible with the blink planner which can be used to read parquet files and if there are some examples available.

Thanks,

Olivier

Re: How JobManager and TaskManager find each other?

2020-02-26 Thread KristoffSC
Thanks all for the answers,

One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to write a retract table to Kafka?

2020-02-26 Thread izual
Hi community:SQL contains aggregate functions, GROUP BY, etc,will generate a 
RetractStream, which type is DataStream[(Boolean, Row)].
It's not allowed to write to Kafka because kafka table is based on 
AppendStreamTableSink.


If I only need to write ADD message to Kafka, is it possible to achieve this 
only by SQL?
What is the classical usage for this? 
tableEnv.toRetractStream[Row](...).filter(_._1).map(_._2).addSink(new 
KafkaProducer)
or
How to convert a RetractStream to a DataStream, then use SQL to output to Kafka?

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

2020-02-26 Thread Jark Wu
Hi Dawid,

I agree with you. If we want to loosen the format constraint, the
important piece is the conversion matrix.

The conversion matrix you listed makes sense to me. From my understanding,
there should be 6 combination.
We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH
TIMEZONE to make the matrix complete.
When the community reach an agreement on this, we should write it down on
the documentation and follow the matrix in all text-based formats.

Regarding to the RFC 3339 compatibility mode switch, it also sounds good to
me.

Best,
Jark

On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz 
wrote:

> Hi all,
>
> @NiYanchun Thank you for reporting this. Yes I think we could improve the
> behaviour of the JSON format.
>
> @Jark First of all I do agree we could/should improve the
> "user-friendliness" of the JSON format (and unify the behavior across text
> based formats). I am not sure though if it is as simple as just ignore the
> time zone here.
>
> My suggestion would be rather to apply the logic of parsing a SQL
> timestamp literal (if the expected type is of LogicalTypeFamily.TIMESTAMP),
> which would actually also derive the "stored" type of the timestamp (either
> WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql conversion.
> Therefore if the
>
> parsed type |requested type| behaviour
>
> WITHOUT TIMEZONE| WITH TIMEZONE | store the local
> timezone with the data
>
> WITHOUT TIMEZONE| WITH LOCAL TIMEZONE  | do nothing in the data,
> interpret the time in local timezone
>
> WITH TIMEZONE  | WITH LOCAL TIMEZONE   | convert the timestamp
> to local timezone and drop the time zone information
>
> WITH TIMEZONE  | WITHOUT TIMEZONE   | drop the time zone
> information
>
> It might just boil down to what you said "being more lenient with regards
> to parsing the time zone". Nevertheless I think this way it is a bit better
> defined behaviour, especially as it has a defined behaviour when converting
> between representation with or without time zone.
>
> An implementation note. I think we should aim to base the implementation
> on the DataTypes already rather than going back to the TypeInformation.
>
> I would still try to leave the RFC 3339 compatibility mode, but maybe for
> that mode it would make sense to not support any types WITHOUT TIMEZONE?
> This would be enabled with a switch (disabled by default). As I understand
> the RFC, making the time zone mandatory is actually a big part of the
> standard as it makes time types unambiguous.
>
> What do you think?
>
> Ps. I cross posted this on the dev ML.
>
> Best,
>
> Dawid
>
>
> On 26/02/2020 03:45, Jark Wu wrote:
>
> Yes, I'm also in favor of loosen the datetime format constraint.
> I guess most of the users don't know there is a JSON standard which
> follows RFC 3339.
>
> Best,
> Jark
>
> On Wed, 26 Feb 2020 at 10:06, NiYanchun  wrote:
>
>> Yes, these Types definition are general. As a user/developer, I would
>> support “loosen it for usability”. If not, may add some explanation
>> about JSON.
>>
>>
>>
>>  Original Message
>> *Sender:* Jark Wu
>> *Recipient:* Outlook; Dawid Wysakowicz<
>> dwysakow...@apache.org>
>> *Cc:* godfrey he; Leonard Xu;
>> user
>> *Date:* Wednesday, Feb 26, 2020 09:55
>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>
>> Hi Outlook,
>>
>> The explanation in DataTypes is correct, it is compliant to SQL standard.
>> The problem is that JsonRowDeserializationSchema only support  RFC-3339.
>> On the other hand, CsvRowDeserializationSchema supports to parse
>> "2019-07-09 02:02:00.040".
>>
>> So the question is shall we insist on the RFC-3339 "standard"? Shall we
>> loosen it for usability?
>> What do you think @Dawid Wysakowicz  ?
>>
>> Best,
>> Jark
>>
>> On Wed, 26 Feb 2020 at 09:29, Outlook  wrote:
>>
>>> Thanks Godfrey and Leonard, I tried your answers, result is OK.
>>>
>>>
>>> BTW, I think if only accept such format for a long time, the  TIME and
>>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may be
>>> better to update,
>>>
>>> because the document now is not what the method really support. For
>>> example,
>>>
>>>
>>> ```
>>> /**
>>> * Data type of a time WITHOUT time zone {@code TIME} with no fractional
>>> seconds by default.
>>> *
>>> * An instance consists of {@code hour:minute:second} with up to
>>> second precision
>>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
>>> *
>>> * Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61)
>>> are not supported as the
>>> * semantics are closer to {@link java.time.LocalTime}. A time WITH time
>>> zone is not provided.
>>> *
>>> * @see #TIME(int)
>>> * @see TimeType
>>> */
>>> public static DataType TIME() {
>>> return new AtomicDataType(new TimeType());
>>>
>>> }```
>>>
>>>
>>> Thanks again.
>>>
>>>  Original Message
>>> *Sender:* Leonard Xu
>>> *Recipient:* godfrey he
>>> *Cc:* Outlook; us

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

2020-02-26 Thread Dawid Wysakowicz
Hi all,

@NiYanchun Thank you for reporting this. Yes I think we could improve
the behaviour of the JSON format.

@Jark First of all I do agree we could/should improve the
"user-friendliness" of the JSON format (and unify the behavior across
text based formats). I am not sure though if it is as simple as just
ignore the time zone here.

My suggestion would be rather to apply the logic of parsing a SQL
timestamp literal (if the expected type is of
LogicalTypeFamily.TIMESTAMP), which would actually also derive the
"stored" type of the timestamp (either WITHOUT TIMEZONE or WITH
TIMEZONE) and then apply a proper sql conversion. Therefore if the

parsed type |    requested type        | behaviour

WITHOUT TIMEZONE    | WITH TIMEZONE | store the local
timezone with the data

WITHOUT TIMEZONE    | WITH LOCAL TIMEZONE  | do nothing in the data,
interpret the time in local timezone

WITH TIMEZONE  | WITH LOCAL TIMEZONE   | convert the
timestamp to local timezone and drop the time zone information

WITH TIMEZONE          | WITHOUT TIMEZONE   | drop the time zone
information 

It might just boil down to what you said "being more lenient with
regards to parsing the time zone". Nevertheless I think this way it is a
bit better defined behaviour, especially as it has a defined behaviour
when converting between representation with or without time zone.

An implementation note. I think we should aim to base the implementation
on the DataTypes already rather than going back to the TypeInformation.

I would still try to leave the RFC 3339 compatibility mode, but maybe
for that mode it would make sense to not support any types WITHOUT
TIMEZONE? This would be enabled with a switch (disabled by default). As
I understand the RFC, making the time zone mandatory is actually a big
part of the standard as it makes time types unambiguous.

What do you think?

Ps. I cross posted this on the dev ML.

Best,

Dawid


On 26/02/2020 03:45, Jark Wu wrote:
> Yes, I'm also in favor of loosen the datetime format constraint. 
> I guess most of the users don't know there is a JSON standard which
> follows RFC 3339.
>
> Best,
> Jark
>
> On Wed, 26 Feb 2020 at 10:06, NiYanchun  > wrote:
>
> Yes, these Types definition are general. As a user/developer, I
> would support “loosen it for usability”. If not, may add
> some explanation about JSON.
>
>
>
>  Original Message 
> *Sender:* Jark Wumailto:imj...@gmail.com>>
> *Recipient:* Outlook >; Dawid
> Wysakowiczmailto:dwysakow...@apache.org>>
> *Cc:* godfrey he >; Leonard Xu >; user >
> *Date:* Wednesday, Feb 26, 2020 09:55
> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>
> Hi Outlook,
>
> The explanation in DataTypes is correct, it is compliant to SQL
> standard. The problem is that JsonRowDeserializationSchema only
> support  RFC-3339. 
> On the other hand, CsvRowDeserializationSchema supports to parse
> "2019-07-09 02:02:00.040".
>
> So the question is shall we insist on the RFC-3339 "standard"?
> Shall we loosen it for usability? 
> What do you think @Dawid Wysakowicz  ?
>
> Best,
> Jark
>
> On Wed, 26 Feb 2020 at 09:29, Outlook  > wrote:
>
> Thanks Godfrey and Leonard, I tried your answers, result is OK. 
>
>
> BTW, I think if only accept such format for a long time, the
>  TIME and TIMESTAMP methods' doc in
> `org.apache.flink.table.api.DataTypes` may be better to update,
>
> because the document now is not what the method really
> support. For example, 
>
>
> ```
>
> /**
> * Data type of a time WITHOUT time zone {@code TIME} with no
> fractional seconds by default.
> *
> * An instance consists of {@code hour:minute:second} with
> up to second precision
> * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
> *
> * Compared to the SQL standard, leap seconds (23:59:60 and
> 23:59:61) are not supported as the
> * semantics are closer to {@link java.time.LocalTime}. A time
> WITH time zone is not provided.
> *
> * @see #TIME(int)
> * @see TimeType
> */
> public static DataType TIME() {
> return new AtomicDataType(new TimeType());
>
> }```
>
>
> Thanks again.
>
>
>  Original Message 
> *Sender:* Leonard Xumailto:xbjt...@gmail.com>>
> *Recipient:* godfrey he >
> *Cc:* Outlook >; user >
> *Date:* Tuesday, Feb 25, 2020 22:56
> *Subject:* Re:

Re: Timeseries aggregation with many IoT devices off of one Kafka topic.

2020-02-26 Thread Theo Diefenthal
Hi, 

Ververica has great tutorials online on how to write Flink pipelines, also with 
a small training section with regards to Process functions: 

[ https://training.ververica.com/lessons/processfunction.html | 
https://training.ververica.com/lessons/processfunction.html ] 

Best regards 
Theo 


Von: "Khachatryan Roman"  
An: "Avinash Tripathy"  
CC: "Theo Diefenthal" , "hemant singh" 
, "Marco Villalobos" , "user" 
 
Gesendet: Dienstag, 25. Februar 2020 19:08:16 
Betreff: Re: Timeseries aggregation with many IoT devices off of one Kafka 
topic. 

Hi, 
I think conceptually the pipeline could look something like this: 
env 
.addSource(...) 
.keyBy("device_id") 
.window(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(10))) 
.trigger(new Trigger { 
def onElement(el, timestamp, window, ctx) = { 
if (window.start == TimeWindow.getWindowStartWithOffset(timestamp, 0, 10_000)) 
{ 
ctx.registerEventTimeTimer(window.end) 
} 
TriggerResult.CONTINUE 
} 
def onEventTime(time, window, ctx) = { 
TriggerResult.FIRE 
} 
})) 
.aggregate(...) 

(slide 10s needs to be adjusted) 

Regards, 
Roman 


On Tue, Feb 25, 2020 at 3:44 PM Avinash Tripathy < [ 
mailto:avinash.tripa...@stellapps.com | avinash.tripa...@stellapps.com ] > 
wrote: 



Hi Theo, 

We also have the same scenario. If it would be great if you could provide some 
examples or more details about flink process function. 

Thanks, 
Avinash 

On Tue, Feb 25, 2020 at 12:29 PM [ mailto:theo.diefent...@scoop-software.de | 
theo.diefent...@scoop-software.de ] < [ 
mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] 
> wrote: 

BQ_BEGIN

Hi, 
At last flink forward in Berlin I spoke with some persons about the same 
problem, where they had construction devices as IoT sensors which could even be 
offline for multiple days. 

They told me that the major problem for them was that the watermark in Flink is 
maintained per operator /subtask, even if you group by key. 

They solved their problem via a Flink process function where they have full 
control over state and timers, so you can deal with each device as you like and 
can e. g. maintain something similar to a per device watermark. I also think 
that it is the best way to go for this usecase. 

Best regards 
Theo 




 Ursprüngliche Nachricht  
Von: hemant singh < [ mailto:hemant2...@gmail.com | hemant2...@gmail.com ] > 
Datum: Di., 25. Feb. 2020, 06:19 
An: Marco Villalobos < [ mailto:mvillalo...@beyond.ai | mvillalo...@beyond.ai ] 
> 
Cc: [ mailto:user@flink.apache.org | user@flink.apache.org ] 
Betreff: Re: Timeseries aggregation with many IoT devices off of one Kafka 
topic. 

BQ_BEGIN

Hello, 
I am also working on something similar. Below is the pipeline design I have, 
sharing may be it can be helpful. 

topic -> keyed stream on device-id -> window operation -> sink. 

You can PM me on further details. 

Thanks, 
Hemant 

On Tue, Feb 25, 2020 at 1:54 AM Marco Villalobos < [ 
mailto:mvillalo...@beyond.ai | mvillalo...@beyond.ai ] > wrote: 

BQ_BEGIN



I need to collect timeseries data from thousands of IoT devices. Each device 
has name, value, and timestamp published to one Kafka topic. The event time 
timestamps are in order only relation with the individual device, but out of 
order with respect to other devices. 



Is there a way to aggregate a 15 minute window of this in which each IoT 
devices gets aggregated with its own event time? 



I would deeply appreciate if somebody could guide me to an approach for solving 
this in Flink. 



I wish there was a group chat for these type of problems. 






BQ_END


BQ_END


BQ_END



-- 
SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln 
Theo Diefenthal 

T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 
theo.diefent...@scoop-software.de - www.scoop-software.de 
Sitz der Gesellschaft: Köln, Handelsregister: Köln, 
Handelsregisternummer: HRB 36625 
Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen, 
Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel