Re: Get access to unmatching events in Apache Flink Cep

2024-05-22 Thread Anton Sidorov
In answer Biao said "currently there is no such API to access the middle
NFA state". May be that API exist in plan? Or I can create issue or pull
request that add API?

пт, 17 мая 2024 г. в 12:04, Anton Sidorov :

> Ok, thanks for the reply.
>
> пт, 17 мая 2024 г. в 09:22, Biao Geng :
>
>> Hi Anton,
>>
>> I am afraid that currently there is no such API to access the middle NFA
>> state in your case. For patterns that contain 'within()' condition, the
>> timeout events could be retrieved via TimedOutPartialMatchHandler
>> interface, but other unmatching events would be pruned immediately once
>> they are considered as unnecessary to keep.
>>
>> Best,
>> Biao Geng
>>
>>
>> Anton Sidorov  于2024年5月16日周四 16:12写道:
>>
>>> Hello!
>>>
>>> I have a Flink Job with CEP pattern.
>>>
>>> Pattern example:
>>>
>>> // Strict Contiguity
>>> // a b+ c d e
>>> Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
>>> .next("b").where(...).oneOrMore()
>>> .next("c").where(...)
>>> .next("d").where(...)
>>> .next("e").where(...);
>>>
>>> I have events with wrong order stream on input:
>>>
>>> a b d c e
>>>
>>> On output I haven`t any matching. But I want have access to events, that
>>> not matching.
>>>
>>> Can I have access to middle NFA state in CEP pattern, or get some other
>>> way to view unmatching events?
>>>
>>> Example project with CEP pattern on github
>>> <https://github.com/A-Kinski/apache-flink-cep/tree/main>, and my question
>>> on SO
>>> <https://stackoverflow.com/questions/78483004/get-access-to-unmatching-events-in-apache-flink-cep>
>>>
>>> Thanks in advance
>>>
>>
>
> --
> С уважением, Антон.
>


-- 
С уважением, Антон.


Re: Get access to unmatching events in Apache Flink Cep

2024-05-17 Thread Anton Sidorov
Ok, thanks for the reply.

пт, 17 мая 2024 г. в 09:22, Biao Geng :

> Hi Anton,
>
> I am afraid that currently there is no such API to access the middle NFA
> state in your case. For patterns that contain 'within()' condition, the
> timeout events could be retrieved via TimedOutPartialMatchHandler
> interface, but other unmatching events would be pruned immediately once
> they are considered as unnecessary to keep.
>
> Best,
> Biao Geng
>
>
> Anton Sidorov  于2024年5月16日周四 16:12写道:
>
>> Hello!
>>
>> I have a Flink Job with CEP pattern.
>>
>> Pattern example:
>>
>> // Strict Contiguity
>> // a b+ c d e
>> Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
>> .next("b").where(...).oneOrMore()
>> .next("c").where(...)
>> .next("d").where(...)
>> .next("e").where(...);
>>
>> I have events with wrong order stream on input:
>>
>> a b d c e
>>
>> On output I haven`t any matching. But I want have access to events, that
>> not matching.
>>
>> Can I have access to middle NFA state in CEP pattern, or get some other
>> way to view unmatching events?
>>
>> Example project with CEP pattern on github
>> <https://github.com/A-Kinski/apache-flink-cep/tree/main>, and my question
>> on SO
>> <https://stackoverflow.com/questions/78483004/get-access-to-unmatching-events-in-apache-flink-cep>
>>
>> Thanks in advance
>>
>

-- 
С уважением, Антон.


RE: monitoring message latency for flink sql app

2024-05-16 Thread Anton Sidorov
Hello mete.

I found this SO article
https://stackoverflow.com/questions/54293808/measuring-event-time-latency-with-flink-cep

If I'm not mistake, you can use Flink metrics system for operators and get
time of processing event in operator.

On 2024/05/16 11:54:44 mete wrote:

> Hello,

>

> For an sql application using kafka as source (and kafka as sink) what
would

> be the recommended way to monitor for processing delay? For example, i
want

> to be able to alert if the app has a certain delay compared to some event

> time field in the message.

>

> Best,

> Mete

>


Get access to unmatching events in Apache Flink Cep

2024-05-16 Thread Anton Sidorov
Hello!

I have a Flink Job with CEP pattern.

Pattern example:

// Strict Contiguity
// a b+ c d e
Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
.next("b").where(...).oneOrMore()
.next("c").where(...)
.next("d").where(...)
.next("e").where(...);

I have events with wrong order stream on input:

a b d c e

On output I haven`t any matching. But I want have access to events, that
not matching.

Can I have access to middle NFA state in CEP pattern, or get some other way
to view unmatching events?

Example project with CEP pattern on github
, and my question
on SO


Thanks in advance


Get access to unmatching events in Apache Flink Cep

2024-05-16 Thread Anton Sidorov
Hello!

I have a Flink Job with CEP pattern.

Pattern example:

// Strict Contiguity
// a b+ c d e
Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
.next("b").where(...).oneOrMore()
.next("c").where(...)
.next("d").where(...)
.next("e").where(...);

I have events with wrong order stream on input:

a b d c e

On output I haven`t any matching. But I want have access to events, that
not matching.

Can I have access to middle NFA state in CEP pattern, or get some other way
to view unmatching events?

Example project with CEP pattern on github
, and my question
on SO


Thanks in advance


Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Anton Ippolitov via user
Makes sense, thank you!

On Tue, Jan 31, 2023 at 10:48 AM Gyula Fóra  wrote:

> Thanks @Anton Ippolitov 
> At this stage I would highly recommend the native mode if you have the
> liberty to try that.
> I think that has better production characteristics and will work out of
> the box with the autoscaler. (the standalone mode won't)
>
> Gyula
>
> On Tue, Jan 31, 2023 at 10:41 AM Anton Ippolitov <
> anton.ippoli...@datadoghq.com> wrote:
>
>> I am using the Standalone Mode indeed, should've mentioned it right away.
>> This fix looks exactly like what I need, thank you!!
>>
>> On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra  wrote:
>>
>>> There is also a pending fix for the standalone + k8s HA case :
>>> https://github.com/apache/flink-kubernetes-operator/pull/518
>>>
>>> You could maybe try and review the fix :)
>>>
>>> Gyula
>>>
>>> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang 
>>> wrote:
>>>
>>>> I assume you are using the standalone mode. Right?
>>>>
>>>> For the native K8s mode, the leader address should be 
>>>> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
>>>> *when HA enabled.
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>>>>
>>>>> This is actually what I'm already doing, I'm only setting 
>>>>> high-availability:
>>>>> kubernetes myself. The other values are either defaults or set by the
>>>>> Operator:
>>>>> - jobmanager.rpc.port: 6123 is the default value (docs
>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#hosts-and-ports>
>>>>> )
>>>>> -  high-availability.jobmanager.port: 6123 is set by the Operator here
>>>>> <https://github.com/apache/flink-kubernetes-operator/blob/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java#L141-L144>
>>>>>
>>>>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
>>>>> the Operator here
>>>>> <https://github.com/apache/flink-kubernetes-operator/blob/261fed2076efe385ede148152c946eb7c5f1f48d/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java#L80>
>>>>>  (the
>>>>> actual code which gets executed is here
>>>>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
>>>>> )
>>>>>
>>>>>  Looking at what the Lyft Operator is doing here
>>>>> <https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L215>,
>>>>>  I thought
>>>>> this would be a common issue but since you've never seen this error 
>>>>> before,
>>>>> not sure what to do 樂
>>>>>
>>>>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra 
>>>>> wrote:
>>>>>
>>>>>> We never encountered this problem before but also we don't configure
>>>>>> those settings.
>>>>>> Can you simply try:
>>>>>>
>>>>>> high-availability: kubernetes
>>>>>>
>>>>>> And remove the other configs? I think that can only cause problems
>>>>>> and should not achieve anything :)
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>>>>>> user@flink.apache.org> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I've been experimenting with Kubernetes HA and the Kubernetes
>>>>>>> Operator and ran into the following issue which is happening regularly 
>>>>>>> on
>>>>>>> TaskManagers with Flink 1.16:
>>>>>>>
>>>>>>> Error while retrieving the leader gateway. Retrying to connect to 
>>>>>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>>>>>>> o

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Anton Ippolitov via user
I am using the Standalone Mode indeed, should've mentioned it right away.
This fix looks exactly like what I need, thank you!!

On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra  wrote:

> There is also a pending fix for the standalone + k8s HA case :
> https://github.com/apache/flink-kubernetes-operator/pull/518
>
> You could maybe try and review the fix :)
>
> Gyula
>
> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang  wrote:
>
>> I assume you are using the standalone mode. Right?
>>
>> For the native K8s mode, the leader address should be 
>> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
>> *when HA enabled.
>>
>>
>> Best,
>> Yang
>>
>> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>>
>>> This is actually what I'm already doing, I'm only setting high-availability:
>>> kubernetes myself. The other values are either defaults or set by the
>>> Operator:
>>> - jobmanager.rpc.port: 6123 is the default value (docs
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#hosts-and-ports>
>>> )
>>> -  high-availability.jobmanager.port: 6123 is set by the Operator here
>>> <https://github.com/apache/flink-kubernetes-operator/blob/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java#L141-L144>
>>>
>>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
>>> the Operator here
>>> <https://github.com/apache/flink-kubernetes-operator/blob/261fed2076efe385ede148152c946eb7c5f1f48d/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java#L80>
>>>  (the
>>> actual code which gets executed is here
>>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
>>> )
>>>
>>>  Looking at what the Lyft Operator is doing here
>>> <https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L215>,
>>>  I thought
>>> this would be a common issue but since you've never seen this error before,
>>> not sure what to do 樂
>>>
>>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra 
>>> wrote:
>>>
>>>> We never encountered this problem before but also we don't configure
>>>> those settings.
>>>> Can you simply try:
>>>>
>>>> high-availability: kubernetes
>>>>
>>>> And remove the other configs? I think that can only cause problems and
>>>> should not achieve anything :)
>>>>
>>>> Gyula
>>>>
>>>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>>>> user@flink.apache.org> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I've been experimenting with Kubernetes HA and the Kubernetes Operator
>>>>> and ran into the following issue which is happening regularly on
>>>>> TaskManagers with Flink 1.16:
>>>>>
>>>>> Error while retrieving the leader gateway. Retrying to connect to 
>>>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>>>>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>>>>> complete the operation. Number of retries has been exhausted.
>>>>>
>>>>> (The whole stacktrace is quite long, I put it in a Github Gist here
>>>>> <https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>.
>>>>> Note that I put placeholder values for the Kubernetes Service name and the
>>>>> Namespace name)
>>>>>
>>>>> The job configuration has the following values which should be
>>>>> relevant:
>>>>> high-availability: kubernetes
>>>>> high-availability.jobmanager.port: 6123
>>>>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>>>>> jobmanager.rpc.port: 6123
>>>>>
>>>>> Looking a bit more into the logs, I can see that the Akka Actor System
>>>>> is started with an external address pointing to the Kubernetes Service
>>>>> defined by jobmanager.rpc.address:
>>>>> Trying to start actor system, external
>

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-30 Thread Anton Ippolitov via user
This is actually what I'm already doing, I'm only setting high-availability:
kubernetes myself. The other values are either defaults or set by the
Operator:
- jobmanager.rpc.port: 6123 is the default value (docs
<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#hosts-and-ports>
)
-  high-availability.jobmanager.port: 6123 is set by the Operator here
<https://github.com/apache/flink-kubernetes-operator/blob/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java#L141-L144>

- jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by the
Operator here
<https://github.com/apache/flink-kubernetes-operator/blob/261fed2076efe385ede148152c946eb7c5f1f48d/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java#L80>
(the
actual code which gets executed is here
<https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
)

 Looking at what the Lyft Operator is doing here
<https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L215>,
I thought
this would be a common issue but since you've never seen this error before,
not sure what to do 樂

On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra  wrote:

> We never encountered this problem before but also we don't configure those
> settings.
> Can you simply try:
>
> high-availability: kubernetes
>
> And remove the other configs? I think that can only cause problems and
> should not achieve anything :)
>
> Gyula
>
> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
> user@flink.apache.org> wrote:
>
>> Hi everyone,
>>
>> I've been experimenting with Kubernetes HA and the Kubernetes Operator
>> and ran into the following issue which is happening regularly on
>> TaskManagers with Flink 1.16:
>>
>> Error while retrieving the leader gateway. Retrying to connect to 
>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>> complete the operation. Number of retries has been exhausted.
>>
>> (The whole stacktrace is quite long, I put it in a Github Gist here
>> <https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>.
>> Note that I put placeholder values for the Kubernetes Service name and the
>> Namespace name)
>>
>> The job configuration has the following values which should be relevant:
>> high-availability: kubernetes
>> high-availability.jobmanager.port: 6123
>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>> jobmanager.rpc.port: 6123
>>
>> Looking a bit more into the logs, I can see that the Akka Actor System is
>> started with an external address pointing to the Kubernetes Service defined
>> by jobmanager.rpc.address:
>> Trying to start actor system, external
>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
>> Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
>> :6123
>>
>> (I believe the external address for the Akka Actor System is set to
>> jobmanager.rpc.address from this place
>> <https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L367>
>> in the code but I might be wrong)
>>
>> I can also see these logs for the Dispatcher RPC endpoint:
>> Starting RPC endpoint for
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
>> akka://flink/user/rpc/dispatcher_1 .
>> Successfully wrote leader information
>> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
>> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
>> for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.
>>
>> I confirmed that the HA ConfigMap contains an address which also uses the
>> Kubernetes Service defined by jobmanager.rpc.address:
>> $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
>> '.data["org.apache.flink.k8s.leader.dispatcher"]'
>>
>> ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
>> :6123/user/rpc/dispatcher_1
>>
>> When looking at the code of the Operator and Flink i

"Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-27 Thread Anton Ippolitov via user
Hi everyone,

I've been experimenting with Kubernetes HA and the Kubernetes Operator and
ran into the following issue which is happening regularly on TaskManagers
with Flink 1.16:

Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.

(The whole stacktrace is quite long, I put it in a Github Gist here
<https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>. Note
that I put placeholder values for the Kubernetes Service name and the
Namespace name)

The job configuration has the following values which should be relevant:
high-availability: kubernetes
high-availability.jobmanager.port: 6123
jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
jobmanager.rpc.port: 6123

Looking a bit more into the logs, I can see that the Akka Actor System is
started with an external address pointing to the Kubernetes Service defined
by jobmanager.rpc.address:
Trying to start actor system, external
address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
:6123

(I believe the external address for the Akka Actor System is set to
jobmanager.rpc.address from this place
<https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L367>
in the code but I might be wrong)

I can also see these logs for the Dispatcher RPC endpoint:
Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_1 .
Successfully wrote leader information
LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.

I confirmed that the HA ConfigMap contains an address which also uses the
Kubernetes Service defined by jobmanager.rpc.address:
$ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
'.data["org.apache.flink.k8s.leader.dispatcher"]'
ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
:6123/user/rpc/dispatcher_1

When looking at the code of the Operator and Flink itself, I can see
that jobmanager.rpc.address is set automatically by the
InternalServiceDecorator
<https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
and
it points to the Kubernetes Service.
However, the comment
<https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L34-L38>
above clearly says that "only the non-HA scenario relies on this Service
for internal communication, since in the HA mode, the TaskManager(s)
directly connects to the JobManager via IP address." According to the docs
<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#basic-setup>,
jobmanager.rpc.address "is ignored on setups with high-availability where
the leader election mechanism is used to discover this automatically."

This is not what I'm observing as it seems that despite enabling HA, the
TaskManagers don't use IP addresses but still use this Kubernetes Service
for JM communication.

Moreover, I've used the Lyft Kubernetes Operator before and it has these
interesting lines in the code:
https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216
It explicitly sets jobmanager.rpc.address to the host IPs.

Am I misconfiguring or misunderstanding something? Is there any way to fix
these errors?

Thanks!
Anton


Getting S3 client metrics from the flink-s3-fs-presto library

2022-12-15 Thread Anton Ippolitov via user
Hi,

We recently switched to the flink-s3-fs-presto library for checkpointing in
Flink 1.16.0 and we would like to get client-side metrics from the Presto
S3 client (request rate, throttling rate, etc).

I can see that the upstream client from Presto 0.272 already comes with a
metric collector (PrestoS3FileSystemMetricCollector.java
)
that records all sorts of interesting metrics in a PrestoS3FileSystemStats

 object.

If I understand correctly, in the Presto project those metrics are being
exposed via JMX in the HiveS3Module class here

but
we don't use this class to instantiate the client in flink-s3-fs-presto.

Has anyone managed to actually expose those metrics from a Flink
application? If not, what would be recommended way of doing it?

Thank you!


RE: Direct buffer memory in job with hbase client

2021-12-17 Thread Anton
Looks like I set wrong parameter. Is should have been 
taskmanager.memory.task.off-heap.size.

 

From: Anton [mailto:anton...@yandex.ru] 
Sent: Friday, December 17, 2021 10:12 PM
To: 'Xintong Song' 
Cc: 'user' 
Subject: RE: Direct buffer memory in job with hbase client

 

Hi Xintong,

 

After recent job failure I’ve set taskmanager.memory.task.heap.size to 128m, 
but the cluster was unable to start with next output:

 

Starting cluster.

Starting standalonesession daemon on host ***.

Password:

[ERROR] The execution result is empty.

[ERROR] Could not get JVM parameters and dynamic configurations properly.

[ERROR] Raw output from BashJavaUtils:

WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will 
impact performance.

INFO  [] - Loading configuration property: jobmanager.rpc.address, ***

INFO  [] - Loading configuration property: jobmanager.rpc.port, 6123

INFO  [] - Loading configuration property: jobmanager.memory.process.size, 
16m

INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
172800m

INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 31

INFO  [] - Loading configuration property: parallelism.default, 1

INFO  [] - Loading configuration property: 
jobmanager.execution.failover-strategy, region

INFO  [] - Loading configuration property: taskmanager.memory.task.heap.size, 
128m

INFO  [] - The derived from fraction jvm overhead memory (16.875gb (18119393550 
bytes)) is greater than its max value 1024.000mb (1073741824 bytes), max value 
will be used instead

Exception in thread "main" 
org.apache.flink.configuration.IllegalConfigurationException: TaskManager 
memory configuration failed: If Total Flink, Task Heap and (or) Managed Memory 
sizes are explicitly configured then the Network Memory size is the rest of the 
Total Flink memory after subtracting all other configured types of memory, but 
the derived Network Memory is inconsistent with its configuration.

at 
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.java:163)

at 
org.apache.flink.runtime.util.bash.BashJavaUtils.getTmResourceParams(BashJavaUtils.java:85)

at 
org.apache.flink.runtime.util.bash.BashJavaUtils.runCommand(BashJavaUtils.java:67)

at 
org.apache.flink.runtime.util.bash.BashJavaUtils.main(BashJavaUtils.java:56)

Caused by: org.apache.flink.configuration.IllegalConfigurationException: If 
Total Flink, Task Heap and (or) Managed Memory sizes are explicitly configured 
then the Network Memory size is the rest of the Total Flink memory after 
subtracting all other configured types of memory, but the derived Network 
Memory is inconsistent with its configuration.

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(TaskExecutorFlinkMemoryUtils.java:344)

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.deriveFromTotalFlinkMemory(TaskExecutorFlinkMemoryUtils.java:147)

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.deriveFromTotalFlinkMemory(TaskExecutorFlinkMemoryUtils.java:42)

at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.deriveProcessSpecWithTotalProcessMemory(ProcessMemoryUtils.java:119)

at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.memoryProcessSpecFromConfig(ProcessMemoryUtils.java:84)

at 
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.java:160)

... 3 more

Caused by: org.apache.flink.configuration.IllegalConfigurationException: 
Derived Network Memory size (100.125gb (107508399056 bytes)) is not in 
configured Network Memory range [64.000mb (67108864 bytes), 1024.000mb 
(1073741824 bytes)].

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.sanityCheckNetworkMemory(TaskExecutorFlinkMemoryUtils.java:378)

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(TaskExecutorFlinkMemoryUtils.java:342)

... 8 more

 

How to choose memory parameters properly?

 

 

From: Xintong Song [mailto:tonysong...@gmail.com] 
Sent: Wednesday, December 15, 2021 12:17 PM
To: Anton mailto:anton...@yandex.ru> >
Cc: user mailto:user@flink.apache.org> >
Subject: Re: Direct buffer memory in job with hbase client

 

Hi Anton,

 

You may want to try increasing the task off-heap memory, as your tasks are 
using hbase client which needs off-heap (direct) memory. The default task 
off-heap memory is 0 because most tasks do not use off-heap memory.

 

Unfortunately, I cannot advise on how much task off-heap memory your job needs, 
which pr

RE: Direct buffer memory in job with hbase client

2021-12-17 Thread Anton
Hi Xintong,

 

After recent job failure I’ve set taskmanager.memory.task.heap.size to 128m, 
but the cluster was unable to start with next output:

 

Starting cluster.

Starting standalonesession daemon on host ***.

Password:

[ERROR] The execution result is empty.

[ERROR] Could not get JVM parameters and dynamic configurations properly.

[ERROR] Raw output from BashJavaUtils:

WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will 
impact performance.

INFO  [] - Loading configuration property: jobmanager.rpc.address, ***

INFO  [] - Loading configuration property: jobmanager.rpc.port, 6123

INFO  [] - Loading configuration property: jobmanager.memory.process.size, 
16m

INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
172800m

INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 31

INFO  [] - Loading configuration property: parallelism.default, 1

INFO  [] - Loading configuration property: 
jobmanager.execution.failover-strategy, region

INFO  [] - Loading configuration property: taskmanager.memory.task.heap.size, 
128m

INFO  [] - The derived from fraction jvm overhead memory (16.875gb (18119393550 
bytes)) is greater than its max value 1024.000mb (1073741824 bytes), max value 
will be used instead

Exception in thread "main" 
org.apache.flink.configuration.IllegalConfigurationException: TaskManager 
memory configuration failed: If Total Flink, Task Heap and (or) Managed Memory 
sizes are explicitly configured then the Network Memory size is the rest of the 
Total Flink memory after subtracting all other configured types of memory, but 
the derived Network Memory is inconsistent with its configuration.

at 
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.java:163)

at 
org.apache.flink.runtime.util.bash.BashJavaUtils.getTmResourceParams(BashJavaUtils.java:85)

at 
org.apache.flink.runtime.util.bash.BashJavaUtils.runCommand(BashJavaUtils.java:67)

at 
org.apache.flink.runtime.util.bash.BashJavaUtils.main(BashJavaUtils.java:56)

Caused by: org.apache.flink.configuration.IllegalConfigurationException: If 
Total Flink, Task Heap and (or) Managed Memory sizes are explicitly configured 
then the Network Memory size is the rest of the Total Flink memory after 
subtracting all other configured types of memory, but the derived Network 
Memory is inconsistent with its configuration.

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(TaskExecutorFlinkMemoryUtils.java:344)

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.deriveFromTotalFlinkMemory(TaskExecutorFlinkMemoryUtils.java:147)

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.deriveFromTotalFlinkMemory(TaskExecutorFlinkMemoryUtils.java:42)

at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.deriveProcessSpecWithTotalProcessMemory(ProcessMemoryUtils.java:119)

at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.memoryProcessSpecFromConfig(ProcessMemoryUtils.java:84)

at 
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.java:160)

... 3 more

Caused by: org.apache.flink.configuration.IllegalConfigurationException: 
Derived Network Memory size (100.125gb (107508399056 bytes)) is not in 
configured Network Memory range [64.000mb (67108864 bytes), 1024.000mb 
(1073741824 bytes)].

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.sanityCheckNetworkMemory(TaskExecutorFlinkMemoryUtils.java:378)

at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(TaskExecutorFlinkMemoryUtils.java:342)

... 8 more

 

How to choose memory parameters properly?

 

 

From: Xintong Song [mailto:tonysong...@gmail.com] 
Sent: Wednesday, December 15, 2021 12:17 PM
To: Anton 
Cc: user 
Subject: Re: Direct buffer memory in job with hbase client

 

Hi Anton,

 

You may want to try increasing the task off-heap memory, as your tasks are 
using hbase client which needs off-heap (direct) memory. The default task 
off-heap memory is 0 because most tasks do not use off-heap memory.

 

Unfortunately, I cannot advise on how much task off-heap memory your job needs, 
which probably depends on your hbase client configurations.




Thank you~

Xintong Song

 

 

On Wed, Dec 15, 2021 at 1:40 PM Anton mailto:anton...@yandex.ru> > wrote:

Hi, from time to time my job is stopping to process messages with warn message 
listed below. Tried to increase jobmanager.memory.process.size and 
taskmanager.memory.process.si

Direct buffer memory in job with hbase client

2021-12-14 Thread Anton
Hi, from time to time my job is stopping to process messages with warn
message listed below. Tried to increase jobmanager.memory.process.size and
taskmanager.memory.process.size but it didn't help.

What else can I try? "Framework Off-heap" is 128mb now as seen is task
manager dashboard and Task Off-heap is 0b. Documentation says that "You
should only change this value if you are sure that the Flink framework needs
more memory." And I'm not sure about it.

Flink version is 1.13.2.

 

2021-11-29 14:06:53,659 WARN
org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline [] - An
exceptionCaught() event was fired, and it reached at the tail of the
pipeline. It usually means the last handler in the pipeline did not handle
the exception.

org.apache.hbase.thirdparty.io.netty.channel.ChannelPipelineException:
org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler.handlerAdded
() has thrown an exception; removed.

at
org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline.callHand
lerAdded0(DefaultChannelPipeline.java:624)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline.addFirst
(DefaultChannelPipeline.java:181)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline.addFirst
(DefaultChannelPipeline.java:358)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPipeline.addFirst
(DefaultChannelPipeline.java:339)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hadoop.hbase.ipc.NettyRpcConnection.saslNegotiate(NettyRpcConnect
ion.java:215)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hadoop.hbase.ipc.NettyRpcConnection.access$600(NettyRpcConnection
.java:76)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hadoop.hbase.ipc.NettyRpcConnection$2.operationComplete(NettyRpcC
onnection.java:289)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hadoop.hbase.ipc.NettyRpcConnection$2.operationComplete(NettyRpcC
onnection.java:277)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.notifyLi
stener0(DefaultPromise.java:578)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.notifyLi
steners0(DefaultPromise.java:571)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.notifyLi
stenersNow(DefaultPromise.java:550)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.notifyLi
steners(DefaultPromise.java:491)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.setValue
0(DefaultPromise.java:616)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.setSucce
ss0(DefaultPromise.java:605)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultPromise.trySucce
ss(DefaultPromise.java:104)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.channel.DefaultChannelPromise.trySucces
s(DefaultChannelPromise.java:84)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.channel.nio.AbstractNioChannel$Abstract
NioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:300)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.channel.nio.AbstractNioChannel$Abstract
NioUnsafe.finishConnect(AbstractNioChannel.java:335)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoop.processSelecte
dKey(NioEventLoop.java:707)
[blob_p-6eb282e9e614ab47d8c0b446632a1a9cba8a3955-6e6e09bc9b5fae2679cbbb261ca
a9da2:?]

at
org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoop.processSelecte
dKeysOptimized(NioEventLoop.java:655)

RE: Working with HBase inside RichMapFunction

2021-11-23 Thread Anton
Hi Caizhi,

 

Flink version is 1.13.2 and yes I’m putting data into HBase inside 
RichMapFunction. But even if I try to use a sink (Table API sink?), wouldn't 
enrichment process in RichMapFunction still be able to skip messages in case of 
problems on HBase side?

. 

 

From: Caizhi Weng [mailto:tsreape...@gmail.com] 
Sent: Wednesday, November 24, 2021 4:47 AM
To: Anton 
Cc: user 
Subject: Re: Working with HBase inside RichMapFunction

 

Hi!

 

Which Flink version are you using? Are you putting data into HBase inside 
RichMapFunction, instead of using an HBase sink? If yes could you share your 
user code? Actually using an HBase sink is recommended because it will check if 
any error occurs during the operation and will fail the job (and restarts from 
checkpoint) if it spots any errors, so that no data loss will occur.

 

Anton mailto:anton...@yandex.ru> > 于2021年11月24日周三 上午4:29写道:

Hi, I’m using RichMapFunction to enrich data from stream generated from Kafka 
topic and put rich data again to HBase. And when there is a failure on HBase 
side I’m seeing in Flink’s log that HBase client attempts several times to get 
necessary data from HBase - I believe it makes it `hbase.client.retries.number` 
times - and after retry count exceeded the data become just lost and Flink job 
moves to next record from stream. So the question is how to avoid this data 
loss? I guess making `hbase.client.retries.number` just bigger is not ideal 
solution.



Working with HBase inside RichMapFunction

2021-11-23 Thread Anton
Hi, I'm using RichMapFunction to enrich data from stream generated from
Kafka topic and put rich data again to HBase. And when there is a failure on
HBase side I'm seeing in Flink's log that HBase client attempts several
times to get necessary data from HBase - I believe it makes it
`hbase.client.retries.number` times - and after retry count exceeded the
data become just lost and Flink job moves to next record from stream. So the
question is how to avoid this data loss? I guess making
`hbase.client.retries.number` just bigger is not ideal solution.



HBase sink connector - HBaseSinkFunction vs Table API

2021-10-18 Thread Anton
Hello. Please suggest best method to write data to HBase (stream going from
Kafka being enriched with HBase data and need to be written to HBase). There
is only one connector on flink.apache.org related to Table API. At the same
time there is HBaseSinkFunction in the source code and I beleive it relates
to DataStream API. But there is an issue
https://issues.apache.org/jira/browse/FLINK-22623 which states that
HBaseTableSource/Sink and related classes have been removed. Is
HBaseSinkFunction really deleted/deprecated? What are pros and cons of Table
API's connector and HBaseSinkFunction if it's still supported?



Updating elements of a window in regular intervals

2021-01-15 Thread Anton W. Haubner

Hello!

I hope this is the correct mailing list for newb questions regarding 
flink stream processing.


Essentially, I have a question about how to apply a transformation to 
each individual element of a sliding window in regular intervals.



I think a little background to the problem I'm trying to solve could be 
helpful before asking the concrete question:
I have a service *A* which continuously produces events, and another 
service *B* which accepts collections of processed events.


The collections accepted by the latter service are produced from the 
events received within the last minute. So what i am currently doing is 
using timeWindowAll to buffer a sliding window of 1 minute size and a 
aggregate function which produces arrays of events from the windows. 
These arrays are then sent to the consumer service:



...
.timeWindowAll(Time.seconds(60), Time.seconds(1))
.aggregate()
.addSink()

/This works,/ but i need to add another functionality: Before being sent 
off to the consumer service *B*, all events have to be annotated with a 
value which needs to be computed based on the time that passed since the 
event was produced. What I am currently doing is, is applying a map 
function to the stream of produced arrays. This seems awfully 
inefficient to me, since each call of the map function has to work on 
the whole content of a window (now contained in the array):



...
.timeWindowAll(Time.seconds(60), Time.seconds(1))
.aggregate()
.map()
.addSink()

Instead, i wonder if it was possible to apply a map function to the 
elements of a window. As I understand it, this is not currently possible 
(http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filtering-and-mapping-data-after-window-opertator-td21449.html#a21458).


Another idea would be, to add keyed windows before the timeWindowAll 
using the event timestamp value modulo parallelism as keys and perform 
the event transformation while aggregating each window into an array. 
Then the computation could be performed in parallel on these smaller 
windows and afterwards I join the produced arrays:


...
.keyBy(event -> event.timestamp % parallelism)
.timeWindow(Time.seconds(60), Time.seconds(1))
.reduce()
.timeWindowAll(Time.seconds(60), Time.seconds(1))
.aggregate()
.addSink()

What do you think of this idea? Is there a better way to handle this?

Thank you for your help.
Best regards,

Anton



OpenPGP_signature
Description: OpenPGP digital signature


StreamingFileSink rolling callback Inbox

2019-09-09 Thread Anton Parkhomenko
Hello,

I’m writing a Flink job that reads heterogenius (one row contains several
types that need to be partitioned downstream) data from AWS Kinesis and
writes to S3 directory structure like s3://bucket/year/month/day/hour/type,
this all works great with StreamingFileSink in Flink 1.9, but problem is
that I need to immedietely (or “as soon as possible” rather) let know
another application to know when “hour” bucket has rolled (i.e. we’re 100%
sure it won’t write any more data for this hour). Another problem is that
data can be very skewed in types, e.g. one hour can contain 90% of rows
with typeA, 30% of rows with typeB and 1% of rows with typeC.

My current plan is to:

1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t
care about event time at all)
2. Assign every row its bucket in a windowing function
3. Write a stateful BucketAssigner that:
3.1. Keeps its last window in a mutable variable
3.2. Once we received a row with newer window sends a message to SQS and
increments the window

My biggest concern now is about 3rd point. For me BucketAssigner looks like
a pure function of (Row, Time) -> Bucket and I’m not sure that introducing
state and side-effect there would be reasonable. Is there any other ways to
do it? I’m also thinking on how I should couple this with checkpointing
mechanism as ideally I’d like to not invoke this callback before checkpoint
is written.

StreamingFileSink provides not much ways to extend it. I tried to
re-implement it for my purposes, but stumbled upon many private methods and
classes, so even though it looks possible, the end result probably will be
too ugly.

To make things a little bit easier, I don’t care too much about delivery
semantics of those final SQS messages - if I get only ~99% of them - that’s
fine, if some of them will be duplicated - that’s also fine.

Regards,
Anton


TaskManager process continue to work after termination

2019-09-02 Thread Ustinov Anton
sk resources for Source: Custom Source -> Filter (7/8) 
(f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
{"time":"2019-09-02 
11:33:14.801","loglevel":"ERROR","class":"org.apache.flink.runtime.taskmanager.Task","message":"FATAL
 - exception in resource cleanup of task Source: Custom Source -> Filter (7/8) 
(f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
at 
org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02 
11:33:14.803","loglevel":"ERROR","class":"org.apache.flink.runtime.taskexecutor.TaskExecutor","message":"FATAL
 - exception in resource cleanup of task Source: Custom Source -> Filter (7/8) 
(f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
at 
org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02 
11:33:14.803","loglevel":"ERROR","class":"org.apache.flink.runtime.taskexecutor.TaskManagerRunner","message":"Fatal
 error occurred while executing the TaskManager. Shutting it 
down...","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
at 
org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02 
11:33:14.809","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Shutting
 down remote daemon.","host":"clickstream-flink08"}
{"time":"2019-09-02 
11:33:14.810","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Remote
 daemon shut down; proceeding with flushing remote 
transports.","host":"clickstream-flink08"}
{"time":"2019-09-02 
11:33:14.827","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Remoting
 shut down.","host":"clickstream-flink08"}
{"time":"2019-09-02 
11:33:14.827","loglevel":"INFO","class":"akka.remote.RemoteActorRefProvider$RemotingTerminator","message":"Remoting
 shut down.","host":"clickstream-flink08"}
{"time":"2019-09-02 
11:33:14.836","loglevel":"INFO","class":"org.apache.flink.runtime.rpc.akka.AkkaRpcService","message":"Stopped
 Akka RPC service.","host":"clickstream-flink08”}

But task manager process is still alive:

flink 29078  423  7.0 49191076 27790920 ?   Sl   10:13 828:28 java 
-Djava.net.preferIPv4Stack=true 
-Dlog.file=/opt/flink/log/flink--taskexecutor-0-clickstream-flink08.log 
-Dlog4j.configuration=file:/opt/flink/conf/log4j.properties 
-Dlogback.configurationFile=file:/opt/flink/conf/logback.xml -classpath 
/opt/flink/lib/flink-cep_2.12-1.8.0.jar:/opt/flink/lib/flink-queryable-state-runtime_2.12-1.8.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.8.0.jar:/opt/flink/lib/flink-table_2.12-1.8.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.8.0.jar:::
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir 
/opt/flink/conf

Is it acceptable behaviour?

Best regards,
Anton Ustinov



unsubscribe

2019-06-24 Thread Anton Hughes



Idle windows

2019-06-21 Thread Ustinov Anton
I have a simple job that reads JSON messages from Kafka topic and proccesses 
them like this:

SingleOutputStreamOperator result = ds
.filter(ev -> ev.has(cookieFieldName))
.map(ev -> ev.get(cookieFieldName).asText())
.keyBy(new CookieKeySelector(env.getParallelism()))
.timeWindow(Time.seconds(period))
.aggregate(new CookieAggregate())
.timeWindowAll(Time.seconds(period))
.reduce((v1, v2) -> v1 + v2);

CookieKeySelector counts MD5 hash from cookie value and calculate remainder 
from division on job parallelism. CookieAggreage counts unique cookie values in 
window. I see in Flink Dashboard that only half of windows are getting messages 
to process. Number of working windows depends on job parallelism. Why only part 
of windows compute useful aggregates? I’ve tried to use random numbers as a key 
and still get same result.

Additional information: Flink 1.8.0, runs on a single node with 56 CPUs, 256G 
RAM, 10GB/s network.


Anton Ustinov
ustinov@gmail.com <mailto:ustinov@gmail.com>



Re: Integrating Flink CEP with a Rules Engine

2017-07-22 Thread Anton
We also have a requirement of using Drools in Flink. Drools brings a very
mature and usable business rules editor. And to be able to integrate Drools
into Flink would be very useful.

On 23 June 2017 at 22:09, Suneel Marthi  wrote:

> Sorry I didn't read the whole thread.
>
> We have a similar rqmt wherein the users would like to add/update/delete
> CEP patterns via UX or REST api and we started discussing building a REST
> api for that, glad to see that this is a common ask and if there's already
> a community effort around this - that's great to know.
>
> On Fri, Jun 23, 2017 at 9:54 AM, Sridhar Chellappa 
> wrote:
>
>> Folks,
>>
>> Plenty of very good points but I see this discussion digressing from what
>> I originally asked for. We need a dashboard to let the Business Analysts to
>> define rules and the CEP to run them.
>>
>> My original question was how to solve this with Flink CEP?
>>
>> From what I see, this is not a solved problem. Correct me if I am wrong.
>>
>> On Fri, Jun 23, 2017 at 6:52 PM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi all,
>>>
>>> Currently there is an ongoing effort to integrate FlinkCEP with Flink's
>>> SQL API.
>>> There is already an open FLIP for this:
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A
>>> +Integration+of+SQL+and+CEP
>>> 
>>>
>>> So, if there was an effort for integration of different
>>> libraries/tools/functionality as well, it
>>> would be nice to go a bit more into details on i) what is already there,
>>> ii) what is planned to be
>>> integrated for the SQL effort, and iii) what else is required, and
>>> consolidate the resources
>>> available.
>>>
>>> This will allow the community to move faster and with a clear roadmap.
>>>
>>> Kostas
>>>
>>> On Jun 23, 2017, at 2:51 PM, Suneel Marthi  wrote:
>>>
>>> FWIW, here's an old Cloudera blog about using Drools with Spark.
>>>
>>> https://blog.cloudera.com/blog/2015/11/how-to-build-a-comple
>>> x-event-processing-app-on-apache-spark-and-drools/
>>>
>>> It should be possible to invoke Drools from Flink in a similar way (I
>>> have not tried it).
>>>
>>> It all depends on what the use case and how much of present Flink CEP
>>> satisfies the use case before considering integration with more complex
>>> rule engines.
>>>
>>>
>>> Disclaimer: I work for Red Hat
>>>
>>> On Fri, Jun 23, 2017 at 8:43 AM, Ismaël Mejía  wrote:
>>>
 Hello,

 It is really interesting to see this discussion because that was one
 of the questions on the presentation on CEP at Berlin Buzzwords, and
 this is one line of work that may eventually make sense to explore.

 Rule engines like drools implement the Rete algorithm that if I
 understood correctly optimizes the analysis of a relatively big set of
 facts (conditions) into a simpler evaluation graph. For more details
 this is a really nice explanation.
 https://www.sparklinglogic.com/rete-algorithm-demystified-part-2/

 On flink's CEP I have the impression that you define this graph by
 hand. Using a rule engine you could infer an optimal graph from the
 set of rules, and then this graph could be translated into CEP
 patterns.

 Of course take all of this with a grain of salt because I am not an
 expert on both CEP or the Rete algorithm, but I start to see the
 connection of both worlds more clearly now. So if anyone else has
 ideas of the feasibility of this or can see some other
 issues/consequences please comment. I also have the impression that
 distribution is less of an issue because the rete network is
 calculated only once and updates are not 'dynamic' (but I might be
 wrong).

 Ismaël

 ps. I add Thomas in copy who was who made the question in the
 conference in case he has some comments/ideas.


 On Fri, Jun 23, 2017 at 1:48 PM, Kostas Kloudas
  wrote:
 > Hi Jorn and Sridhar,
 >
 > It would be worth describing a bit more what these tools are and what
 are
 > your needs.
 > In addition, and to see what the CEP library already offers here you
 can
 > find the documentation:
 >
 > https://ci.apache.org/projects/flink/flink-docs-release-1.3/
 dev/libs/cep.html
 >
 >
 > Thanks,
 > Kostas
 >
 > On Jun 23, 2017, at 1:41 PM, Jörn Franke 
 wrote:
 >
 > Hallo,
 >
 > It si possible, but some caveat : flink is a distributed system, but
 in
 > drools the fact are only locally available. This may lead to strange
 effects
 > when rules update the fact base.
 >
 > Best regards
 >
 > On 23. Jun 2017, at 12:49, Sridhar Chellappa 
 wrote:
 >
 > Folks,
 

RE: Flink batch processing fault tolerance

2017-02-16 Thread Anton Solovev
Hi Aljoscha,
Could you share your plans of resolving it?

Best,
Anton


From: Aljoscha Krettek [mailto:aljos...@apache.org]
Sent: Thursday, February 16, 2017 2:48 PM
To: user@flink.apache.org
Subject: Re: Flink batch processing fault tolerance

Hi,
yes, this is indeed true. We had some plans for how to resolve this but they 
never materialised because of the focus on Stream Processing. We might unite 
the two in the future and then you will get fault-tolerant batch/stream 
processing in the same API.

Best,
Aljoscha

On Wed, 15 Feb 2017 at 09:28 Renjie Liu 
<liurenjie2...@gmail.com<mailto:liurenjie2...@gmail.com>> wrote:
Hi, all:
I'm learning flink's doc and curious about the fault tolerance of batch process 
jobs. It seems that when one of task execution fails, the whole job will be 
restarted, is it true? If so, isn't it impractical to deploy large flink batch 
jobs?
--
Liu, Renjie
Software Engineer, MVAD


RE: 1.2 release date

2017-02-06 Thread Anton Solovev
Hi,

Could you update List of contributors after that? ☺

Anton Solovev
Software Engineer

Office: +7 846 200 09 70 x 55621<tel:+7%20846%20200%2009%2070;ext=55621>   
Email: anton_solo...@epam.com<mailto:anton_solo...@epam.com>
Samara, Russia (GMT+4)   epam.com<http://www.epam.com>

CONFIDENTIALITY CAUTION AND DISCLAIMER
This message is intended only for the use of the individual(s) or entity(ies) 
to which it is addressed and contains information that is legally privileged 
and confidential. If you are not the intended recipient, or the person 
responsible for delivering the message to the intended recipient, you are 
hereby notified that any dissemination, distribution or copying of this 
communication is strictly prohibited. All unintended recipients are obliged to 
delete this message and destroy any printed copies.

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Monday, February 6, 2017 12:20 PM
To: user@flink.apache.org
Subject: Re: 1.2 release date

Hi Tarandeep,

afaik, Flink 1.2 will be released today.

Cheers,
Till

On Sun, Feb 5, 2017 at 10:00 PM, Tarandeep Singh 
<tarand...@gmail.com<mailto:tarand...@gmail.com>> wrote:
Hi,

Looking forward to 1.2 version of Flink (lots of exciting features have been 
added).
Has the date finalized yet?

Thanks,
Tarandeep



Adding 3rd party moving average and other 'indicators'

2016-06-24 Thread Anton
Hello

I'm currently trying to learn Flink. And so far am really impressed by it.

However I've still got a lot to learn. So apologies if this a terribly
newbie question.

I have created a streaming example based on
https://flink.apache.org/news/2015/02/09/streaming-example.html.

Next I would like to apply a moving average, using TA-Lib. You can see an
example here -
https://github.com/ishanthilina/TA-Lib-Java-Examples/blob/master/src/main/java/SimpleMovingAverageExample.java

Can someone tell me what would be the best/most logical way to apply this?

I appreciate that a moving average can be done without using TA-Lib, but
there are other functions in this library that I would like to use, plus it
would give me experience with integrating external analysis libraries.

Thanks and regards
Anton


Re: Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input
stream and once this checkpoint reaches end of DAG take some action. So I
need a signal at the sink which can tell "all events in source before
checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't
propagate source events one-to-one. Some transformations might create 3
child events out of 1 source. If I want to make sure I fully processed
source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <polyakov.an...@gmail.com>
wrote:

> Hi Fabian
>
> Defining a special flag for record seems like a checkpoint barrier. I
> think I will end up re-implementing checkpointing myself. I found the
> discussion in flink-dev:
> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
> <http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA+faj9xDFAUG_zi==e2h8s-8r4cn8zbdon_hf+1rud5pjqv...@mail.gmail.com%3E>
>  which
> seems to solve my task. Essentially they want to have a mechanism which
> will mark record produced by job as “last” and then wait until it’s fully
> propagated through DAG. Similarly to what I need. Essentially my job which
> produces trades can also thought as being finished once it produced all
> trades, then I just need to wait till latest trade produced by this job is
> processed.
>
> So although windows can probably also be applied, I think propagating
> barrier through DAG and checkpointing at final job is what I need.
>
> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
> triggering a custom checkoint or finishing streaming job)?
>
> On 24 Nov 2015, at 21:53, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Anton,
>
> If I got your requirements right, you are looking for a solution that
> continuously produces updated partial aggregates in a streaming fashion.
> When a  special event (no more trades) is received, you would like to store
> the last update as a final result. Is that correct?
>
> You can compute continuous updates using a reduce() or fold() function.
> These will produce a new update for each incoming event.
> For example:
>
> val s: DataStream[(Int, Long)] = ...
> s.keyBy(_._1)
>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>
> would continuously compute a sum for every key (_._1) and produce an
> update for each incoming record.
>
> You could add a flag to the record and implement a ReduceFunction that
> marks a record as final when the no-more-trades event is received.
> With a filter and a data sink you could emit such final records to a
> persistent data store.
>
> Btw.: You can also define custom trigger policies for windows. A custom
> trigger is called for each element that is added to a window and when
> certain timers expire. For example with a custom trigger, you can evaluate
> a window for every second element that is added. You can also define
> whether the elements in the window should be retained or removed after the
> evaluation.
>
> Best, Fabian
>
>
>
> 2015-11-24 21:32 GMT+01:00 Anton Polyakov <polyakov.an...@gmail.com>:
>
>> Hi Max
>>
>> thanks for reply. From what I understand window works in a way that it
>> buffers records while window is open, then apply transformation once window
>> close is triggered and pass transformed result.
>> In my case then window will be open for few hours, then the whole amount
>> of trades will be processed once window close is triggered. Actually I want
>> to process events as they are produced without buffering them. It is more
>> like a stream with some special mark versus windowing seems more like a
>> batch (if I understand it correctly).
>>
>> In other words - buffering and waiting for window to close, then
>> processing will be equal to simply doing one-off processing when all events
>> are produced. I am looking for a solution when I am processing events as
>> they are produced and when source signals "done" my processing is also
>> nearly done.
>>
>>
>> On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hi Anton,
>>>
>>> You should be able to model your problem using the Flink Streaming
>>> API. The actions you want to perform on the streamed records
>>> correspond to transformations on Windows. You can indeed use
>>> Watermarks to signal the window that a threshold for an action has
>>> been reached. Otherwise an eviction policy should also do it.
>>>
>>> Without more details about what

Re: Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the
source, while reading source events I can find out that - this is the
source event I want to take actions after. So if at ssource I can then emit
checkpoint and catch it at the end of the DAG that would solve my problem
(well, I also need to somehow distinguish my checkpoint from Flink's
auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion,
can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Anton!
>
> That you can do!
>
> You can look at the interfaces "Checkpointed" and "checkpointNotifier".
> There you will get a call at every checkpoint (and can look at what records
> are before that checkpoint). You also get a call once the checkpoint is
> complete, which corresponds to the point when everything has flown through
> the DAG.
>
> I think it is nice to implement it like that, because it works
> non-blocking: The stream continues while the the records-you-wait-for flow
> through the DAG, and you get an asynchronous notification once they have
> flown all the way through.
>
> Greetings,
> Stephan
>
>
> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <polyakov.an...@gmail.com
> > wrote:
>
>> I think I can turn my problem into a simpler one.
>>
>> Effectively what I need - I need way to checkpoint certain events in
>> input stream and once this checkpoint reaches end of DAG take some action.
>> So I need a signal at the sink which can tell "all events in source before
>> checkpointed event are now processed".
>>
>> As far as I understand flagged record don't quite work since DAG doesn't
>> propagate source events one-to-one. Some transformations might create 3
>> child events out of 1 source. If I want to make sure I fully processed
>> source event, I need to wait till all childs are processed.
>>
>>
>>
>> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <polyakov.an...@gmail.com
>> > wrote:
>>
>>> Hi Fabian
>>>
>>> Defining a special flag for record seems like a checkpoint barrier. I
>>> think I will end up re-implementing checkpointing myself. I found the
>>> discussion in flink-dev:
>>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
>>> <http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA+faj9xDFAUG_zi==e2h8s-8r4cn8zbdon_hf+1rud5pjqv...@mail.gmail.com%3E>
>>>  which
>>> seems to solve my task. Essentially they want to have a mechanism which
>>> will mark record produced by job as “last” and then wait until it’s fully
>>> propagated through DAG. Similarly to what I need. Essentially my job which
>>> produces trades can also thought as being finished once it produced all
>>> trades, then I just need to wait till latest trade produced by this job is
>>> processed.
>>>
>>> So although windows can probably also be applied, I think propagating
>>> barrier through DAG and checkpointing at final job is what I need.
>>>
>>> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
>>> triggering a custom checkoint or finishing streaming job)?
>>>
>>> On 24 Nov 2015, at 21:53, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>> Hi Anton,
>>>
>>> If I got your requirements right, you are looking for a solution that
>>> continuously produces updated partial aggregates in a streaming fashion.
>>> When a  special event (no more trades) is received, you would like to store
>>> the last update as a final result. Is that correct?
>>>
>>> You can compute continuous updates using a reduce() or fold() function.
>>> These will produce a new update for each incoming event.
>>> For example:
>>>
>>> val s: DataStream[(Int, Long)] = ...
>>> s.keyBy(_._1)
>>>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>>>
>>> would continuously compute a sum for every key (_._1) and produce an
>>> update for each incoming record.
>>>
>>> You could add a flag to the record and implement a ReduceFunction that
>>> marks a record as final when the no-more-trades event is received.
>>> With a filter and a data sink you could emit such final records to a
>>> persistent data store.
>>>
>>> Btw.: You can also define custom trigger policies for windows. A custom
>>> trigger is called for each element that is added to a window and when
>>> certain timers expi

Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
I think overall it would a very usefull feature to have ability to track
procession of source stream events by attaching barriers to them and
reacting on them in processing stages. working with time windows cant help
since processing can involve some long running operations (eg db queries)
and working with markers/event counts cant work either as diring processing
events might spawn child events.

However without ability to specify where in the source you put a barrier
one cant do it.


On Mon, Nov 30, 2015 at 3:35 PM, Stephan Ewen <se...@apache.org
<javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote:

> You cannot force a barrier at one point in time. At what time checkpoints
> are triggered is decided by the master node.
>
> I think in your case you can use the checkpoint and notification calls to
> figure out when data has flown through the DAG, but you cannot force a
> barrier at a specific point.
>
> On Mon, Nov 30, 2015 at 3:33 PM, Anton Polyakov <polyakov.an...@gmail.com
> <javascript:_e(%7B%7D,'cvml','polyakov.an...@gmail.com');>> wrote:
>
>> Hi Stephan
>>
>> sorry for misunderstanding, but how do I make sure barrier is placed at
>> the proper time? How does my source "force" checkpoint to start happening
>> once it finds that all needed elements are now produced?
>>
>> On Mon, Nov 30, 2015 at 2:13 PM, Stephan Ewen <se...@apache.org
>> <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote:
>>
>>> Hi!
>>>
>>> If you implement the "Checkpointed" interface, you get the function
>>> calls to "snapshotState()" at the point when the checkpoint barrier arrives
>>> at an operator. So, the call to "snapshotState()" in the sink is when the
>>> barrier reaches the sink. The call to "checkpointComplete()" in the sources
>>> comes after all barriers have reached all sinks.
>>>
>>> Have a look here for an illustration about barriers flowing with the
>>> stream:
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html
>>>
>>> Stephan
>>>
>>>
>>> On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov <
>>> polyakov.an...@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','polyakov.an...@gmail.com');>> wrote:
>>>
>>>> Hi Stephan
>>>>
>>>> thanks that looks super. But source needs then to emit checkpoint. At
>>>> the source, while reading source events I can find out that - this is the
>>>> source event I want to take actions after. So if at ssource I can then emit
>>>> checkpoint and catch it at the end of the DAG that would solve my problem
>>>> (well, I also need to somehow distinguish my checkpoint from Flink's
>>>> auto-generated ones).
>>>>
>>>> Sorry for being too chatty, this is the topic where I need expert
>>>> opinion, can't find out the answer by just googling.
>>>>
>>>>
>>>> On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <se...@apache.org
>>>> <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote:
>>>>
>>>>> Hi Anton!
>>>>>
>>>>> That you can do!
>>>>>
>>>>> You can look at the interfaces "Checkpointed" and
>>>>> "checkpointNotifier". There you will get a call at every checkpoint (and
>>>>> can look at what records are before that checkpoint). You also get a call
>>>>> once the checkpoint is complete, which corresponds to the point when
>>>>> everything has flown through the DAG.
>>>>>
>>>>> I think it is nice to implement it like that, because it works
>>>>> non-blocking: The stream continues while the the records-you-wait-for flow
>>>>> through the DAG, and you get an asynchronous notification once they have
>>>>> flown all the way through.
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <
>>>>> polyakov.an...@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','polyakov.an...@gmail.com');>> wrote:
>>>>>
>>>>>> I think I can turn my problem into a simpler one.
>>>>>>
>>>>>> Effectively what I need - I need way to checkpoint certain events in
>>>>>> input stream and once this checkpoint reaches end of DAG take some 
>&

Re: Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the
source, while reading source events I can find out that - this is the
source event I want to take actions after. So if at ssource I can then emit
checkpoint and catch it at the end of the DAG that would solve my problem
(well, I also need to somehow distinguish my checkpoint from Flink's
auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion,
can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Anton!
>
> That you can do!
>
> You can look at the interfaces "Checkpointed" and "checkpointNotifier".
> There you will get a call at every checkpoint (and can look at what records
> are before that checkpoint). You also get a call once the checkpoint is
> complete, which corresponds to the point when everything has flown through
> the DAG.
>
> I think it is nice to implement it like that, because it works
> non-blocking: The stream continues while the the records-you-wait-for flow
> through the DAG, and you get an asynchronous notification once they have
> flown all the way through.
>
> Greetings,
> Stephan
>
>
> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <polyakov.an...@gmail.com
> > wrote:
>
>> I think I can turn my problem into a simpler one.
>>
>> Effectively what I need - I need way to checkpoint certain events in
>> input stream and once this checkpoint reaches end of DAG take some action.
>> So I need a signal at the sink which can tell "all events in source before
>> checkpointed event are now processed".
>>
>> As far as I understand flagged record don't quite work since DAG doesn't
>> propagate source events one-to-one. Some transformations might create 3
>> child events out of 1 source. If I want to make sure I fully processed
>> source event, I need to wait till all childs are processed.
>>
>>
>>
>> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <polyakov.an...@gmail.com
>> > wrote:
>>
>>> Hi Fabian
>>>
>>> Defining a special flag for record seems like a checkpoint barrier. I
>>> think I will end up re-implementing checkpointing myself. I found the
>>> discussion in flink-dev:
>>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
>>> <http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA+faj9xDFAUG_zi==e2h8s-8r4cn8zbdon_hf+1rud5pjqv...@mail.gmail.com%3E>
>>>  which
>>> seems to solve my task. Essentially they want to have a mechanism which
>>> will mark record produced by job as “last” and then wait until it’s fully
>>> propagated through DAG. Similarly to what I need. Essentially my job which
>>> produces trades can also thought as being finished once it produced all
>>> trades, then I just need to wait till latest trade produced by this job is
>>> processed.
>>>
>>> So although windows can probably also be applied, I think propagating
>>> barrier through DAG and checkpointing at final job is what I need.
>>>
>>> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
>>> triggering a custom checkoint or finishing streaming job)?
>>>
>>> On 24 Nov 2015, at 21:53, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>> Hi Anton,
>>>
>>> If I got your requirements right, you are looking for a solution that
>>> continuously produces updated partial aggregates in a streaming fashion.
>>> When a  special event (no more trades) is received, you would like to store
>>> the last update as a final result. Is that correct?
>>>
>>> You can compute continuous updates using a reduce() or fold() function.
>>> These will produce a new update for each incoming event.
>>> For example:
>>>
>>> val s: DataStream[(Int, Long)] = ...
>>> s.keyBy(_._1)
>>>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>>>
>>> would continuously compute a sum for every key (_._1) and produce an
>>> update for each incoming record.
>>>
>>> You could add a flag to the record and implement a ReduceFunction that
>>> marks a record as final when the no-more-trades event is received.
>>> With a filter and a data sink you could emit such final records to a
>>> persistent data store.
>>>
>>> Btw.: You can also define custom trigger policies for windows. A custom
>>> trigger is called for each element that is added to a window and when
>>> certain timers expi

Re: Working with State example /flink streaming

2015-11-30 Thread Anton Polyakov
Javier

sorry to jumping in, but I think your case is very similar to what I am
trying to achieve in the thread just next to yours (called "Watermarks as
"process completion" flags". I also need to process a stream which is
produced for some time, but then take an action after certain event. Also
window doesn't work for me because in my case stream producing data for 4-5
hours and I need to evaluate it continuously but then finalize upon
receiving certain "least event".

I am thinking that existing checkpointing would be very helpful as it
solves exactly this task but internally. If you'd be able to emit "special"
checkpoint in source and then react on it at the end of processing chain,
do you think you could solve your task?

On Fri, Nov 27, 2015 at 4:29 PM, Lopez, Javier 
wrote:

> Hi,
>
> Thanks for the example. We have done it with windows before and it works.
> We are using state because the data comes with a gap of several days and we
> can't handle a window size of several days. That's why we decided to use
> the state.
>
> On 27 November 2015 at 11:09, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I’ll try to go into a bit more detail about the windows here. What you
>> can do is this:
>>
>> DataStream> input = … // fields are (id,
>> sum, count), where count is initialized to 1, similar to word count
>>
>> DataStream> counts = input
>>   .keyBy(0)
>>   .timeWindow(Time.minutes(10))
>>   .reduce(new MyCountingReducer())
>>
>> DataStream> result = counts.map( > that divides sum by count> )
>>
>> Does this help? Here, you don’t even have to deal with state, the
>> windowing system will keep the state (i.e. the reduced) value in internal
>> state in a fault tolerant fashion.
>>
>> Cheers,
>> Aljoscha
>> > On 26 Nov 2015, at 17:14, Stephan Ewen  wrote:
>> >
>> > Hi!
>> >
>> > In streaming, there is no "end" of the stream when you would emit the
>> final sum. That's why there are windows.
>> >
>> > If you do not want the partial sums, but only the final sum, you need
>> to define what window in which the sum is computed. At the end of that
>> window, that value is emitted. The window can be based on time, counts, or
>> other measures.
>> >
>> > Greetings,
>> > Stephan
>> >
>> >
>> > On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier 
>> wrote:
>> > Hi, thanks for the answer. It worked but not in the way we expected. We
>> expect to have only one sum per ID and we are getting all the consecutive
>> sums, for example:
>> >
>> > We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the
>> initial values are ID -> 11, values -> 1,2,3). Here is the code we are
>> using for our test:
>> >
>> > DataStream> > uple2> stream = ...;
>> >
>> >
>> > DataStream> result =
>> stream.keyBy(0).map(new RollingSum());
>> >
>> >
>> >
>> > public static class RollingSum extends RichMapFunction> Double>, Tuple4> {
>> >
>> > // persistent counter
>> >   private OperatorState sum;
>> >   private OperatorState count;
>> >
>> >
>> > @Override
>> > public Tuple4 map(Tuple2> Double> value1) {
>> >   try {
>> >   Double newSum = sum.value()+value1.f1;
>> >
>> >   sum.update(newSum);
>> >   count.update(count.value()+1);
>> >   return new Tuple4> Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
>> >   } catch (IOException e) {
>> >   // TODO Auto-generated catch block
>> >   e.printStackTrace();
>> >   }
>> >
>> >   return null;
>> >
>> > }
>> >
>> > @Override
>> > public void open(Configuration config) {
>> > sum = getRuntimeContext().getKeyValueState("mySum",
>> Double.class, 0D);
>> > count = getRuntimeContext().getKeyValueState("myCounter",
>> Long.class, 0L);
>> > }
>> >
>> > }
>> >
>> >
>> > We are using a Tuple4 because we want to calculate the sum and the
>> average (So our Tuple is ID, SUM, Count, AVG). Do we need to add another
>> step to get a single value out of it? or is this the expected behavior.
>> >
>> > Thanks again for your help.
>> >
>> > On 25 November 2015 at 17:19, Stephan Ewen  wrote:
>> > Hi Javier!
>> >
>> > You can solve this both using windows, or using manual state.
>> >
>> > What is better depends a bit on when you want to have the result (the
>> sum). Do you want a result emitted after each update (or do some other
>> operation with that value) or do you want only the final sum after a
>> certain time?
>> >
>> > For the 

Re: Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
Hi Stephan

sorry for misunderstanding, but how do I make sure barrier is placed at the
proper time? How does my source "force" checkpoint to start happening once
it finds that all needed elements are now produced?

On Mon, Nov 30, 2015 at 2:13 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> If you implement the "Checkpointed" interface, you get the function calls
> to "snapshotState()" at the point when the checkpoint barrier arrives at an
> operator. So, the call to "snapshotState()" in the sink is when the barrier
> reaches the sink. The call to "checkpointComplete()" in the sources comes
> after all barriers have reached all sinks.
>
> Have a look here for an illustration about barriers flowing with the
> stream:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html
>
> Stephan
>
>
> On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov <polyakov.an...@gmail.com
> > wrote:
>
>> Hi Stephan
>>
>> thanks that looks super. But source needs then to emit checkpoint. At the
>> source, while reading source events I can find out that - this is the
>> source event I want to take actions after. So if at ssource I can then emit
>> checkpoint and catch it at the end of the DAG that would solve my problem
>> (well, I also need to somehow distinguish my checkpoint from Flink's
>> auto-generated ones).
>>
>> Sorry for being too chatty, this is the topic where I need expert
>> opinion, can't find out the answer by just googling.
>>
>>
>> On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi Anton!
>>>
>>> That you can do!
>>>
>>> You can look at the interfaces "Checkpointed" and "checkpointNotifier".
>>> There you will get a call at every checkpoint (and can look at what records
>>> are before that checkpoint). You also get a call once the checkpoint is
>>> complete, which corresponds to the point when everything has flown through
>>> the DAG.
>>>
>>> I think it is nice to implement it like that, because it works
>>> non-blocking: The stream continues while the the records-you-wait-for flow
>>> through the DAG, and you get an asynchronous notification once they have
>>> flown all the way through.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <
>>> polyakov.an...@gmail.com> wrote:
>>>
>>>> I think I can turn my problem into a simpler one.
>>>>
>>>> Effectively what I need - I need way to checkpoint certain events in
>>>> input stream and once this checkpoint reaches end of DAG take some action.
>>>> So I need a signal at the sink which can tell "all events in source before
>>>> checkpointed event are now processed".
>>>>
>>>> As far as I understand flagged record don't quite work since DAG
>>>> doesn't propagate source events one-to-one. Some transformations might
>>>> create 3 child events out of 1 source. If I want to make sure I fully
>>>> processed source event, I need to wait till all childs are processed.
>>>>
>>>>
>>>>
>>>> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <
>>>> polyakov.an...@gmail.com> wrote:
>>>>
>>>>> Hi Fabian
>>>>>
>>>>> Defining a special flag for record seems like a checkpoint barrier. I
>>>>> think I will end up re-implementing checkpointing myself. I found the
>>>>> discussion in flink-dev:
>>>>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
>>>>> <http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA+faj9xDFAUG_zi==e2h8s-8r4cn8zbdon_hf+1rud5pjqv...@mail.gmail.com%3E>
>>>>>  which
>>>>> seems to solve my task. Essentially they want to have a mechanism which
>>>>> will mark record produced by job as “last” and then wait until it’s fully
>>>>> propagated through DAG. Similarly to what I need. Essentially my job which
>>>>> produces trades can also thought as being finished once it produced all
>>>>> trades, then I just need to wait till latest trade produced by this job is
>>>>> processed.
>>>>>
>>>>> So although windows can probably also be applied, I think propagating
>>>>> barrier through DAG and checkpointing at final job is what I need.

Re: Watermarks as "process completion" flags

2015-11-29 Thread Anton Polyakov
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I 
will end up re-implementing checkpointing myself. I found the discussion in 
flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… 
<http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA+faj9xDFAUG_zi==e2h8s-8r4cn8zbdon_hf+1rud5pjqv...@mail.gmail.com%3E>
 which seems to solve my task. Essentially they want to have a mechanism which 
will mark record produced by job as “last” and then wait until it’s fully 
propagated through DAG. Similarly to what I need. Essentially my job which 
produces trades can also thought as being finished once it produced all trades, 
then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier 
through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like 
triggering a custom checkoint or finishing streaming job)? 

> On 24 Nov 2015, at 21:53, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Anton,
> 
> If I got your requirements right, you are looking for a solution that 
> continuously produces updated partial aggregates in a streaming fashion. When 
> a  special event (no more trades) is received, you would like to store the 
> last update as a final result. Is that correct?
> 
> You can compute continuous updates using a reduce() or fold() function. These 
> will produce a new update for each incoming event.
> For example:
> 
> val s: DataStream[(Int, Long)] = ...
> s.keyBy(_._1)
>   .reduce( (x,y) => (x._1, y._2 + y._2) )
> 
> would continuously compute a sum for every key (_._1) and produce an update 
> for each incoming record.
> 
> You could add a flag to the record and implement a ReduceFunction that marks 
> a record as final when the no-more-trades event is received.
> With a filter and a data sink you could emit such final records to a 
> persistent data store.
> 
> Btw.: You can also define custom trigger policies for windows. A custom 
> trigger is called for each element that is added to a window and when certain 
> timers expire. For example with a custom trigger, you can evaluate a window 
> for every second element that is added. You can also define whether the 
> elements in the window should be retained or removed after the evaluation.
> 
> Best, Fabian
> 
> 
> 
> 2015-11-24 21:32 GMT+01:00 Anton Polyakov <polyakov.an...@gmail.com 
> <mailto:polyakov.an...@gmail.com>>:
> Hi Max
> 
> thanks for reply. From what I understand window works in a way that it 
> buffers records while window is open, then apply transformation once window 
> close is triggered and pass transformed result. 
> In my case then window will be open for few hours, then the whole amount of 
> trades will be processed once window close is triggered. Actually I want to 
> process events as they are produced without buffering them. It is more like a 
> stream with some special mark versus windowing seems more like a batch (if I 
> understand it correctly).
> 
> In other words - buffering and waiting for window to close, then processing 
> will be equal to simply doing one-off processing when all events are 
> produced. I am looking for a solution when I am processing events as they are 
> produced and when source signals "done" my processing is also nearly done.
> 
> 
> On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <m...@apache.org 
> <mailto:m...@apache.org>> wrote:
> Hi Anton,
> 
> You should be able to model your problem using the Flink Streaming
> API. The actions you want to perform on the streamed records
> correspond to transformations on Windows. You can indeed use
> Watermarks to signal the window that a threshold for an action has
> been reached. Otherwise an eviction policy should also do it.
> 
> Without more details about what you want to do I can only refer you to
> the streaming API documentation:
> Please see 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html>
> 
> Thanks,
> Max
> 
> On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
> <polyakov.an...@gmail.com <mailto:polyakov.an...@gmail.com>> wrote:
> > Hi
> >
> > I am very new to Flink and in fact never used it. My task (which I 
> > currently solve using home grown Redis-based solution) is quite simple - I 
> > have a system which produces some events (trades, it is a financial system) 
> > and computational chain which computes some measure accumulatively over 
> > these ev

Re: Watermarks as "process completion" flags

2015-11-24 Thread Anton Polyakov
Hi Max

thanks for reply. From what I understand window works in a way that it
buffers records while window is open, then apply transformation once window
close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of
trades will be processed once window close is triggered. Actually I want to
process events as they are produced without buffering them. It is more like
a stream with some special mark versus windowing seems more like a batch
(if I understand it correctly).

In other words - buffering and waiting for window to close, then processing
will be equal to simply doing one-off processing when all events are
produced. I am looking for a solution when I am processing events as they
are produced and when source signals "done" my processing is also nearly
done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <m...@apache.org> wrote:

> Hi Anton,
>
> You should be able to model your problem using the Flink Streaming
> API. The actions you want to perform on the streamed records
> correspond to transformations on Windows. You can indeed use
> Watermarks to signal the window that a threshold for an action has
> been reached. Otherwise an eviction policy should also do it.
>
> Without more details about what you want to do I can only refer you to
> the streaming API documentation:
> Please see
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>
> Thanks,
> Max
>
> On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
> <polyakov.an...@gmail.com> wrote:
> > Hi
> >
> > I am very new to Flink and in fact never used it. My task (which I
> currently solve using home grown Redis-based solution) is quite simple - I
> have a system which produces some events (trades, it is a financial system)
> and computational chain which computes some measure accumulatively over
> these events. Those events form a long but finite stream, they are produced
> as a result of end of day flow. Computational logic forms a processing DAG
> which computes some measure over these events (VaR). Each trade is
> processed through DAG and at different stages might produce different set
> of subsequent events (like return vectors), eventually they all arrive into
> some aggregator which computes accumulated measure (reducer).
> >
> > Ideally I would like to process trades as they appear (i.e. stream them)
> and once producer reaches end of portfolio (there will be no more trades),
> I need to write final resulting measure and mark it as “end of day record”.
> Of course I also could use a classical batch - i.e. wait until all trades
> are produced and then batch process them, but this will be too inefficient.
> >
> > If I use Flink, I will need a sort of watermark saying - “done, no more
> trades” and once this watermark reaches end of DAG, final measure can be
> saved. More generally would be cool to have an indication at the end of DAG
> telling to which input stream position current measure corresponds.
> >
> > I feel my problem is very typical yet I can’t find any solution. All
> examples operate either on infinite streams where nobody cares about
> completion or classical batch examples which rely on fact all input data is
> ready.
> >
> > Can you please hint me.
> >
> > Thank you vm
> > Anton
>


Watermarks as "process completion" flags

2015-11-22 Thread Anton Polyakov
Hi

I am very new to Flink and in fact never used it. My task (which I currently 
solve using home grown Redis-based solution) is quite simple - I have a system 
which produces some events (trades, it is a financial system) and computational 
chain which computes some measure accumulatively over these events. Those 
events form a long but finite stream, they are produced as a result of end of 
day flow. Computational logic forms a processing DAG which computes some 
measure over these events (VaR). Each trade is processed through DAG and at 
different stages might produce different set of subsequent events (like return 
vectors), eventually they all arrive into some aggregator which computes 
accumulated measure (reducer).

Ideally I would like to process trades as they appear (i.e. stream them) and 
once producer reaches end of portfolio (there will be no more trades), I need 
to write final resulting measure and mark it as “end of day record”. Of course 
I also could use a classical batch - i.e. wait until all trades are produced 
and then batch process them, but this will be too inefficient.

If I use Flink, I will need a sort of watermark saying - “done, no more trades” 
and once this watermark reaches end of DAG, final measure can be saved. More 
generally would be cool to have an indication at the end of DAG telling to 
which input stream position current measure corresponds. 

I feel my problem is very typical yet I can’t find any solution. All examples 
operate either on infinite streams where nobody cares about completion or 
classical batch examples which rely on fact all input data is ready.

Can you please hint me.

Thank you vm
Anton