Re: Stateful functions 2.2 and stop with savepoint

2021-02-28 Thread Kezhu Wang
Hi,

Thanks for reporting. I think it is a Flink bug and have created
FLINK-21522 for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented
as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required
to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain
", the operation never completes, reporting IN_PROGRESS until I hit
the "failure-cause:
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed
their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%)
| end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0%
| end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms |
data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?


Setting max parallelism via properties

2021-02-28 Thread Padarn Wilson
Hi all,

Sorry for the basic question, but is it possible to set max
parallelism using the flink conf file, rather than explicitly in code:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Need this for a PR I am working on for the flink operator:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/425


Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Rion Williams
Hi Prassana,

Thanks for that. It’s what I was doing previously as a workaround however I was 
just curious if there was any Flink-specific functionality to handle this prior 
to Prometheus.

Additionally from the docs on metrics [0], it seems that there’s a pattern in 
place to use supported third-party metrics such as those from 
CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see a 
similarly named package for Prometheus which may be what I’m looking for as 
it’s similarly named (flink-metrics-prometheus), so I may give that a try.

Thanks,

Rion

[0]: https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html

> On Feb 28, 2021, at 12:20 AM, Prasanna kumar  
> wrote:
> 
> 
> Rion,
> 
> Regarding the second question , you can aggregate by using sum function  
> sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the 
> metric counter.
> 
> Prasanna.
> 
>> On Sat, Feb 27, 2021 at 9:01 PM Rion Williams  wrote:
>> Hi folks,
>> 
>> I’ve just recently started working with Flink and I was in the process of 
>> adding some metrics through my existing pipeline with the hopes of building 
>> some Grafana dashboards with them to help with observability.
>> 
>> Initially I looked at the built-in Flink metrics that were available, but I 
>> didn’t see an easy mechanism for setting/using labels with them. 
>> Essentially, I have two properties for my messages coming through the 
>> pipeline that I’d like to be able to keep track of (tenant/source) across 
>> several metrics (e.g. total_messages with tenant / source labels, etc.). I 
>> didn’t see an easy way to adjust this out of the box, or wasn’t aware of a 
>> good pattern for handling these.
>> 
>> I had previously used the Prometheus Client metrics [0] to accomplish this 
>> in the past but I wasn’t entirely sure how it would/could mesh with Flink. 
>> Does anyone have experience in working with these or know if they are 
>> supported?
>> 
>> Secondly, when using the Flink metrics, I noticed I was receiving a separate 
>> metric for each task that was being spun up. Is there an “easy button” to 
>> handle aggregating these to ensure that a single metric (e.g. 
>> total_messages) reflects the total processed across all of the tasks instead 
>> of each individual one?
>> 
>> Any recommendations / resources / advice would be greatly appreciated!
>> 
>> Thanks,
>> 
>> Rion
>> 
>> [0] : https://prometheus.io/docs/instrumenting/clientlibs/


How to emit after a merge?

2021-02-28 Thread Yik San Chan
I define a `Transaction` class:

```scala
case class Transaction(accountId: Long, amount: Long, timestamp: Long)
```

The `TransactionSource` simply emits `Transaction` with some time interval.
Now I want to compute the last 2 transaction timestamp of each account id,
see code below:

```scala
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource

object LastNJob {

  final val QUERY =
"""
  |WITH last_n AS (
  |SELECT accountId, `timestamp`
  |FROM (
  |SELECT *,
  |ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY
`timestamp` DESC) AS row_num
  |FROM transactions
  |)
  |WHERE row_num <= 2
  |)
  |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
last2_timestamp
  |FROM last_n
  |GROUP BY accountId
  |""".stripMargin

  def main(args: Array[String]): Unit = {
val settings: EnvironmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment =
StreamTableEnvironment.create(streamEnv, settings)

val txnStream: DataStream[Transaction] = streamEnv
  .addSource(new TransactionSource)
  .name("transactions")

tableEnv.createTemporaryView("transactions", txnStream)

tableEnv.executeSql(QUERY).print()
  }
}
```

When I run the program, I get:

```
++--++
| op |accountId |last2_timestamp |
++--++
| +I |1 |  154627200 |
| +I |2 |  154627236 |
| +I |3 |  154627272 |
| +I |4 |  154627308 |
| +I |5 |  154627344 |
| -U |1 |  154627200 |
| +U |1 |154627200,154627380 |
| -U |2 |  154627236 |
| +U |2 |154627236,154627416 |
| -U |3 |  154627272 |
| +U |3 |154627272,154627452 |
| -U |4 |  154627308 |
| +U |4 |154627308,154627488 |
| -U |5 |  154627344 |
| +U |5 |154627344,154627524 |
| -U |1 |154627200,154627380 |
| +U |1 |  154627380 |
| -U |1 |  154627380 |
| +U |1 |154627380,154627560 |
(to continue)
```

Let's focus on the last transaction (from above) of accountId=1. When there
is a new transaction from account 1 that happens at
timestamp=154627560, there are 4 operations in total.

```
++--++
| op |accountId |last2_timestamp |
++--++
| -U |1 |154627200,154627380 |
| +U |1 |  154627380 |
| -U |1 |  154627380 |
| +U |1 |154627380,154627560 |
```

While I only want to emit the below "new status" to my downstream (let's
say another Kafka topic) via some sort of merging:

```
+--++
|accountId |last2_timestamp |
+--++
|1 |154627380,154627560 |
```

So that my downstream is able to consume literally "the last 2 transaction
timestamps of each account":
```
+--++
|accountId |last2_timestamp |
+--++
|1 |  154627200 |
|1 |154627200,154627380 |
|1 |154627380,154627560 |
(to continue)
```

What is the right way to do this?


Re: Window Process function is not getting trigger

2021-02-28 Thread Kezhu Wang
Hi,

Glad to hear.

Normally, you would not encounter this if there are massive data.
`WatermarkStrategy.withIdleness` could be more appropriate in production.


Best,
Kezhu Wang


On February 24, 2021 at 22:35:11, sagar (sagarban...@gmail.com) wrote:

Thanks Kezhu, It worked!!!

On Wed, Feb 24, 2021 at 2:47 PM Kezhu Wang  wrote:

> Try `env.setParallelism(1)`. Default parallelism for local environment is
> `Runtime.getRuntime.availableProcessors`.
>
> You test data set are so small that when they are scatter cross multiple
> parallel instances, there will be no data with event time assigned to
> trigger downstream computation.
>
> Or you could try `WatermarkStrategy.withIdleness`.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:
>
> It is fairly simple requirement, if I changed it to PRocessing time it
> works fine , but not working with event time..help appreciated!
>
> On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:
>
>> HI
>>
>> Corrected with below code, but still getting same issue
>>
>> Instant instant = 
>> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
>> long timeInMillis = instant.toEpochMilli();
>> System.out.println(timeInMillis);
>> return timeInMillis;
>>
>>
>> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>>
>>> I saw one potential issue. Your timestamp assigner returns timestamp in
>>> second resolution while Flink requires millisecond resolution.
>>>
>>>
>>> Best,
>>> Kezhu Wang
>>>
>>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>>
>>> I have simple flink stream program, where I am using socket as my
>>> continuous source
>>> I have window size of 2 seconds.
>>>
>>> Somehow my window process function is not triggering and even if I pass
>>> events in any order, flink is not ignoring
>>>
>>> I can see the output only when I kill my socket , please find the code
>>> snippet below
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>>
>>> DataStream price = env.socketTextStream("localhost",
>>> 9998).uid("price source").map(new MapFunction() {
>>> @Override
>>> public Price map(String s) throws Exception {
>>> return new Price(s.split(",")[0],
>>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>>> }
>>> }
>>> );
>>>
>>> DataStream priceStream = price
>>>
>>>  
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>>> .withTimestampAssigner((p,timestamp) ->
>>> {
>>> ZoneId zoneId = ZoneId.systemDefault();
>>> long epoch =
>>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>>> System.out.println(epoch);
>>>  return epoch;
>>> }))
>>> .keyBy(new KeySelector() {
>>> @Override
>>> public String getKey(Price price) throws Exception {
>>> return price.getPerformanceId();
>>> }
>>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>>> .process(new ProcessWindowFunction>> TimeWindow>() {
>>>
>>> @Override
>>> public void process(String s, Context context,
>>> Iterable iterable, Collector collector) throws Exception {
>>> System.out.println(context.window().getStart()+
>>> "Current watermark: "+context.window().getEnd());
>>> Price p1 = null ;
>>> for(Price p : iterable)
>>> {
>>> System.out.println(p.toString());
>>> p1= p;
>>> }
>>> collector.collect(p1);
>>> }
>>> });
>>>
>>>
>>> priceStream.writeAsText("c:\\ab.txt");
>>>
>>> also data I am inputting are
>>>
>>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>>
>>> --
>>> ---Regards---
>>>
>>>   Sagar Bandal
>>>
>>> This is confidential mail ,All Rights are Reserved.If you are not
>>> intended receipiant please ignore this email.
>>>
>>>
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant ple

Re: Setting max parallelism via properties

2021-02-28 Thread Kezhu Wang
Hi Padarn,

There is a configuration option “pipeline.max-parallelism”.

It is not a cluster wide configuration but client/job/pipeline side
configuration which means you should bring this configuration
from flink conf file to pipeline generation stage.


If I understand correctly, `flink-on-k8s-operator` uses `flink run`(I found
this in `flinkcluster_submit_job_script.go`) to submit job to cluster. This
command already cover the bridge work, so I think it should just work in
your case.


pipeline-max-parallelism:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#pipeline-max-parallelism


Best,
Kezhu Wang

On February 28, 2021 at 16:45:03, Padarn Wilson (pad...@gmail.com) wrote:

Hi all,

Sorry for the basic question, but is it possible to set max
parallelism using the flink conf file, rather than explicitly in code:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Need this for a PR I am working on for the flink operator:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/425


Independence of task parallelism

2021-02-28 Thread Jan Nitschke
Hello, 

We are working on a project where we want to gather information about the job 
performance across different task level parallelism settings.
Essentially, we want to see how the throughput of a single task varies across 
different parallelism settings, e.g. for a job of 5 tasks: 1-1-1-1-1 vs. 
1-2-1-1-1 vs. 2-2-2-2-2. 

We are running flink on Kubernetes, a job with 5 tasks, slot sharing is 
enabled, operator chasing is disabled and each task manager has one slot.

So, the number of task managers is always the number of the highest parallelism 
and wen can fit the entire job into one task manager slot. 

We are then running the job against multiple parallelism configs (such as those 
above), collect the relevant metrics and try to get some useful information out 
of them. 

We are now wondering how independent our results are from one another. More 
specifically, if we now look at the parallelism of the second task, is its 
performance independent of the parallelism of the other tasks? So, will a the 
second task perform the same in (1-2-1-1-1) as in (2-2-2-2-2)? 

Our take on it is the following: With our setup, (1-2-1-1-1) should result in 
one task manager holding the entire job and a second task manager that only 
runs the second task. (2-2-2-2-2) will run two task managers with the entire 
job. So, theoretically, the second task should have much more resources 
available in the first setup as it has the entire resources of that task 
manager to its disposal. Does that assumption hold or will flink assign a 
certain amount of resources to a task in a task manager no matter how many 
other tasks are running on that same task manager slot? 

We would highly appreciate any help. 

Best, 
Jan

Re: Stateful functions 2.2 and stop with savepoint

2021-02-28 Thread Meissner, Dylan
Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two 
separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang 
Sent: Sunday, February 28, 2021 12:31 AM
To: user@flink.apache.org ; Meissner, Dylan 

Subject: Re: Stateful functions 2.2 and stop with savepoint

Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 
for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang


On February 28, 2021 at 00:59:04, Meissner, Dylan 
(dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as 
this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to 
use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, 
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain 
", the operation never completes, reporting IN_PROGRESS until I hit the 
"failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: 
Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their 
work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | 
end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | 
end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?


Flink Metrics

2021-02-28 Thread Prasanna kumar
Hi flinksters,

Scenario: We have cdc messages from our rdbms(various tables) flowing to
Kafka.  Our flink job reads the CDC messages and creates events based on
certain rules.

I am using Prometheus  and grafana.

Following are there metrics that i need to calculate

A) Number of CDC messages wrt to each table.
B) Number of events created wrt to each event type.
C) Average/P99/P95 Latency (event created ts - ccd operation ts)

For A and B, I created counters and able to see the metrices flowing into
Prometheus . Few questions I have here.

1) How to create labels for counters in flink ? I did not find any easier
method to do it . Right now I see that I need to create counters for each
type of table and events . I referred to one of the community discussions.
[1] . Is there any way apart from this ?

2) When the job gets restarted , the counters get back to 0 . How to
prevent that and to get continuity.

For C , I calculated latency in code for each event and assigned  it to
histogram.  Few questions I have here.

3) I read in a few blogs [2] that histogram is the best way to get
latencies. Is there any better idea?

4) How to create buckets for various ranges? I also read in a community
email that flink implements  histogram as summaries.  I also should be able
to see the latencies across timelines .

[1]
https://stackoverflow.com/questions/58456830/how-to-use-multiple-counters-in-flink
[2] https://povilasv.me/prometheus-tracking-request-duration/

Thanks,
Prasanna.


Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Meissner, Dylan
Hi Rion,

Regarding the question about adding Prometheus labels out of the box. This is 
common ask of all exporters, but Prometheus philosophy sees this as an 
"anti-pattern" as the metrics source can often be ambivalent about context. See 
[0] for example of such a discussion.

Instead, we can establish context during service discovery. If, for example, we 
run clusters for tenants on Kubernetes, then within the kubernetes_sd_config 
[1] labelling rules we can instruct Prometheus to add the Kubernetes labels 
from the pods, such as "tenant-id: foo" and "environment: staging" to each 
incoming metric it processes.

This isn't limited to Kubernetes; each of the service discovery configs 
designed to accomodate translating metadata from context into metric labels.

If this doesn't work for you, then consider encoding tenant identifier into job 
names, and extract this identifier in a metric_relabel_config [2]

[0]: https://github.com/prometheus/node_exporter/issues/319
[1]: 
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
[2]: 
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs



From: Rion Williams 
Sent: Sunday, February 28, 2021 12:46 AM
To: Prasanna kumar 
Cc: user 
Subject: Re: Using Prometheus Client Metrics in Flink

Hi Prassana,

Thanks for that. It’s what I was doing previously as a workaround however I was 
just curious if there was any Flink-specific functionality to handle this prior 
to Prometheus.

Additionally from the docs on metrics [0], it seems that there’s a pattern in 
place to use supported third-party metrics such as those from 
CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see a 
similarly named package for Prometheus which may be what I’m looking for as 
it’s similarly named (flink-metrics-prometheus), so I may give that a try.

Thanks,

Rion

[0]: https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html

On Feb 28, 2021, at 12:20 AM, Prasanna kumar  
wrote:


Rion,

Regarding the second question , you can aggregate by using sum function  
sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the metric 
counter.

Prasanna.

On Sat, Feb 27, 2021 at 9:01 PM Rion Williams 
mailto:rionmons...@gmail.com>> wrote:
Hi folks,

I’ve just recently started working with Flink and I was in the process of 
adding some metrics through my existing pipeline with the hopes of building 
some Grafana dashboards with them to help with observability.

Initially I looked at the built-in Flink metrics that were available, but I 
didn’t see an easy mechanism for setting/using labels with them. Essentially, I 
have two properties for my messages coming through the pipeline that I’d like 
to be able to keep track of (tenant/source) across several metrics (e.g. 
total_messages with tenant / source labels, etc.). I didn’t see an easy way to 
adjust this out of the box, or wasn’t aware of a good pattern for handling 
these.

I had previously used the Prometheus Client metrics [0] to accomplish this in 
the past but I wasn’t entirely sure how it would/could mesh with Flink. Does 
anyone have experience in working with these or know if they are supported?

Secondly, when using the Flink metrics, I noticed I was receiving a separate 
metric for each task that was being spun up. Is there an “easy button” to 
handle aggregating these to ensure that a single metric (e.g. total_messages) 
reflects the total processed across all of the tasks instead of each individual 
one?

Any recommendations / resources / advice would be greatly appreciated!

Thanks,

Rion

[0] : https://prometheus.io/docs/instrumenting/clientlibs/


Re: Setting max parallelism via properties

2021-02-28 Thread Padarn Wilson
Thanks a lot Kezhu, this fits the bill perfectly.

Thanks,
Padarn



On Sun, Feb 28, 2021 at 7:00 PM Kezhu Wang  wrote:

> Hi Padarn,
>
> There is a configuration option “pipeline.max-parallelism”.
>
> It is not a cluster wide configuration but client/job/pipeline side
> configuration which means you should bring this configuration
> from flink conf file to pipeline generation stage.
>
>
> If I understand correctly, `flink-on-k8s-operator` uses `flink run`(I
> found this in `flinkcluster_submit_job_script.go`) to submit job to
> cluster. This command already cover the bridge work, so I think it should
> just work in your case.
>
>
> pipeline-max-parallelism:
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#pipeline-max-parallelism
>
>
> Best,
> Kezhu Wang
>
> On February 28, 2021 at 16:45:03, Padarn Wilson (pad...@gmail.com) wrote:
>
> Hi all,
>
> Sorry for the basic question, but is it possible to set max
> parallelism using the flink conf file, rather than explicitly in code:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism
>
> Need this for a PR I am working on for the flink operator:
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/425
>
>


Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Rion Williams
Thanks Dylan,

Totally understandable. I already have the appropriate exporters / monitors in 
place for scraping metrics from Flink, including custom ones, into Prometheus. 
The labeling challenge is really the big one as while I see lots of labels for 
the metrics being exported (e.g. job id, worker, etc.) I didn’t see a mechanism 
to inject my own into those coming from Flink.

Additionally, in my specific use case I’m dealing with a multi-tenant pipeline 
(I.e. reading messages from a single multi-tenant Kafka topic), which is where 
the labeling comes in. I’d love to be able to have a counter (among other types 
of metrics) with their appropriate labels for each tenant.

I suppose I could implement a custom counter or series of counters (one for 
each tenant) that would each be responsible for keeping track of their own 
respective tenant values. In my case I’m dealing with a KeyedProcessFunction, 
so I only have access to the key (tenant) within the processElement function as 
opposed to when the function is initially opened, where I understand you would 
typically register a metric.

Sorry for the somewhat convoluted response, I’m still getting accustomed to 
some of the Flink APIs, specifically around metrics.

Thanks,

Rion

> On Feb 28, 2021, at 8:02 AM, Meissner, Dylan  
> wrote:
> 
> 
> Hi Rion,
> 
> Regarding the question about adding Prometheus labels out of the box. This is 
> common ask of all exporters, but Prometheus philosophy sees this as an 
> "anti-pattern" as the metrics source can often be ambivalent about context. 
> See [0] for example of such a discussion.
> 
> Instead, we can establish context during service discovery. If, for example, 
> we run clusters for tenants on Kubernetes, then within the 
> kubernetes_sd_config [1] labelling rules we can instruct Prometheus to add 
> the Kubernetes labels from the pods, such as "tenant-id: foo" and 
> "environment: staging" to each incoming metric it processes.
> 
> This isn't limited to Kubernetes; each of the service discovery configs 
> designed to accomodate translating metadata from context into metric labels.
> 
> If this doesn't work for you, then consider encoding tenant identifier into 
> job names, and extract this identifier in a metric_relabel_config [2]
> 
> [0]: https://github.com/prometheus/node_exporter/issues/319
> [1]: 
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
> [2]: 
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs
> 
> 
> From: Rion Williams 
> Sent: Sunday, February 28, 2021 12:46 AM
> To: Prasanna kumar 
> Cc: user 
> Subject: Re: Using Prometheus Client Metrics in Flink
>  
> Hi Prassana,
> 
> Thanks for that. It’s what I was doing previously as a workaround however I 
> was just curious if there was any Flink-specific functionality to handle this 
> prior to Prometheus.
> 
> Additionally from the docs on metrics [0], it seems that there’s a pattern in 
> place to use supported third-party metrics such as those from 
> CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see 
> a similarly named package for Prometheus which may be what I’m looking for as 
> it’s similarly named (flink-metrics-prometheus), so I may give that a try.
> 
> Thanks,
> 
> Rion
> 
> [0]: https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
> 
>>> On Feb 28, 2021, at 12:20 AM, Prasanna kumar 
>>>  wrote:
>>> 
>> 
>> Rion,
>> 
>> Regarding the second question , you can aggregate by using sum function  
>> sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the 
>> metric counter.
>> 
>> Prasanna.
>> 
>> On Sat, Feb 27, 2021 at 9:01 PM Rion Williams  wrote:
>> Hi folks,
>> 
>> I’ve just recently started working with Flink and I was in the process of 
>> adding some metrics through my existing pipeline with the hopes of building 
>> some Grafana dashboards with them to help with observability.
>> 
>> Initially I looked at the built-in Flink metrics that were available, but I 
>> didn’t see an easy mechanism for setting/using labels with them. 
>> Essentially, I have two properties for my messages coming through the 
>> pipeline that I’d like to be able to keep track of (tenant/source) across 
>> several metrics (e.g. total_messages with tenant / source labels, etc.). I 
>> didn’t see an easy way to adjust this out of the box, or wasn’t aware of a 
>> good pattern for handling these.
>> 
>> I had previously used the Prometheus Client metrics [0] to accomplish this 
>> in the past but I wasn’t entirely sure how it would/could mesh with Flink. 
>> Does anyone have experience in working with these or know if they are 
>> supported?
>> 
>> Secondly, when using the Flink metrics, I noticed I was receiving a separate 
>> metric for each task that was being spun up. Is there an “easy button” to 
>> handle aggregating these to ensure that a single metric (e.g. 
>> total_messages) reflects t

Re: Stateful functions 2.2 and stop with savepoint

2021-02-28 Thread Kezhu Wang
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in
deprecation. Comparing to take-savepoints and then cancel approach, there
will be no checkpoints in between. This may be important if there are two
phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking
two separate actions when stopping job: take-savepoints, then cancel.
--
*From:* Kezhu Wang 
*Sent:* Sunday, February 28, 2021 12:31 AM
*To:* user@flink.apache.org ; Meissner, Dylan <
dylan.t.meiss...@nordstrom.com>
*Subject:* Re: Stateful functions 2.2 and stop with savepoint

Hi,

Thanks for reporting. I think it is a Flink bug and have created
FLINK-21522 for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented
as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required
to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain
", the operation never completes, reporting IN_PROGRESS until I hit
the "failure-cause:
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed
their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%)
| end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0%
| end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms |
data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?


Issues running multiple Jobs using the same JAR

2021-02-28 Thread Morgan Geldenhuys

Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same 
JAR in the same Flink native cluster (all 1.12.1).


When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannotperform send because at 
least one previous transactional oridempotent request has failed with 
errors.
at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failedto send data to Kafka: Producerattempted an operation with an old 
epoch. Eitherthere is a newer producer with the same transactionalId, 
orthe producer's transaction has been expired by the broker.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

... 3more
Causedby: org.apache.kafka.common.errors.ProducerFencedException: 
Producerattempted an operation with an old epoch. Eitherthere is a newer 
producer with the same transactionalId, orthe producer's transaction has 
been expired by the broker.
Suppressed: java.lang.IllegalStateException: Pendingrecord count must be 
zero at thispoint: 1
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: java.lang.IllegalStateException: Pendingrecord count must be 
zero at thispoint: 1
at 
org

Re: Issues running multiple Jobs using the same JAR

2021-02-28 Thread Kezhu Wang
Hi Morgan,

You could check FLINK-11654, from its description, I think it is the
problem you encountered.

> We run multiple jobs on a cluster which write a lot to the same Kafka
topic from identically named sinks. When EXACTLY_ONCE semantic is enabled
for the KafkaProducers we run into a lot of ProducerFencedExceptions and
all jobs go into a restart cycle.

FLINK-11654: https://issues.apache.org/jira/browse/FLINK-11654


Best,
Kezhu Wang


On February 28, 2021 at 22:35:02, Morgan Geldenhuys (
morgan.geldenh...@tu-berlin.de) wrote:

Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same JAR
in the same Flink native cluster (all 1.12.1).

When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannot perform send because at
least one previous transactional or idempotent request has failed with
errors.
at org.apache.kafka.clients.producer.internals.TransactionManager
.failIfNotReadyForSend(TransactionManager.java:356)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer
.java:926)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer
.java:865)
at org.apache.flink.streaming.connectors.kafka.internals.
FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:915)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:187)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:395)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:609)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Unknown Source)
Suppressed: org.apache.flink.streaming.connectors.kafka.
FlinkKafkaException: Failed to send data to Kafka: Producer attempted an
operation with an old epoch. Either there is a newer producer with the same
transactionalId, or the producer's transaction has been expired by the
broker.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.checkErroneous(FlinkKafkaProducer.java:1392)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.close(FlinkKafkaProducer.java:965)
at org.apache.flink.api.common.functions.util.FunctionUtils
.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.disposeAllOperators(StreamTask.java:783)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.runAndSuppressThrowable(StreamTask.java:762)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.cleanUpInvoke(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
StreamTask.java:585)
... 3 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException:
Producer attempted an operation with an old epoch. Either there is a newer
producer with the same transactionalId, or the producer's transaction has
been expired by the broker.
Suppressed: java.lang.IllegalStateException: Pending record count
must be zero at this point: 1
at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
at org.apache.flink.api.common.functions.util.FunctionUtils
.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.disposeAllOperators(StreamTask.java:783)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.runAndSuppressThrow

Standard method to generate watermark forBoundedOutOfOrderness

2021-02-28 Thread Maminspapin
Hello, everyone.

I'm learning Flink but still not sure if I realise the topic of watermark
mechanism.

That is a simple common example of pipeline with event-time mode:


 

I want to use strategy
*WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSecond(6))* in my
code. Does it mean that this strategy is working by formula: 

*max(event_time) - 6 sec. = new watermark?*

So there are next steps:
0. At first we have w(0) and no input events
1. Get 4.  4 is new max. -> 4-6 < 0 -> still w(0)
2. Get 2.  2 < 4 -> 4 is max -> still w(0)
3. Get 11.11 > 4 -> 11 is new max -> 11-6 = 5 -> new watermark w(5)
4. Get 7.  11 is max. -> still w(5)
5. Get 9.  11 is max. -> still w(5)
6. Get 15  15 > 11 -> 15 is new max -> 15-6 = 9 -> new watermark w(9)

Am I right?

Thanks,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Rion Williams
It looks like I was finally able to get the expected labeling behavior that
I was looking for by simply storing a reference to the underlying
MetricGroup and then keeping track of any new metrics that I needed to
dynamically create and use downstream:

class MagicMetricRegistry(private val metricGroup: MetricGroup):
Serializable {
// Reference for all of the registered metrics
private val registeredMetrics: HashMap = hashMapOf()

// Increments a given metric by key
fun inc(metric: String, tenant: String, source: String, amount: Long =
1) {
// Store a key
val key = "$metric-$tenant-$source"
if (!registeredMetrics.containsKey(key)){
registeredMetrics[key] = metricGroup
.addGroup("tenant", tenant)
.addGroup("source", source)
.counter(metric)
}

// Update the metric by a given amount
registeredMetrics[key]!!.inc(amount)
}
}

And then simply within the open function call in my KeyedProcessFunction, I
stored a reference to it and registered any new, in this case tenant/source
combinations, as they came in:

class MagicWindowFunction: KeyedProcessFunction<...>() {
@Transient private lateinit var metrics: MagicMetricRegistry

override fun open(parameters: Configuration) {
metrics = MagicMetricRegistry(runtimeContext.metricGroup)
}

override fun processElement(...) {
// Omitted for brevity

metrics.inc("logs_seen", "my-tenant", "my-source")
}

// Omitted for brevity
}

This appears to be working as expected as far as I can tell at this point.
I can see all of the expected labels appearing within Prometheus and
further downstream in Grafana!

Thanks again,

Rion

On Sun, Feb 28, 2021 at 8:15 AM Rion Williams  wrote:

> Thanks Dylan,
>
> Totally understandable. I already have the appropriate exporters /
> monitors in place for scraping metrics from Flink, including custom ones,
> into Prometheus. The labeling challenge is really the big one as while I
> see lots of labels for the metrics being exported (e.g. job id, worker,
> etc.) I didn’t see a mechanism to inject my own into those coming from
> Flink.
>
> Additionally, in my specific use case I’m dealing with a multi-tenant
> pipeline (I.e. reading messages from a single multi-tenant Kafka topic),
> which is where the labeling comes in. I’d love to be able to have a counter
> (among other types of metrics) with their appropriate labels for each
> tenant.
>
> I suppose I could implement a custom counter or series of counters (one
> for each tenant) that would each be responsible for keeping track of their
> own respective tenant values. In my case I’m dealing with a
> KeyedProcessFunction, so I only have access to the key (tenant) within the
> processElement function as opposed to when the function is initially
> opened, where I understand you would typically register a metric.
>
> Sorry for the somewhat convoluted response, I’m still getting accustomed
> to some of the Flink APIs, specifically around metrics.
>
> Thanks,
>
> Rion
>
> On Feb 28, 2021, at 8:02 AM, Meissner, Dylan <
> dylan.t.meiss...@nordstrom.com> wrote:
>
> 
> Hi Rion,
>
> Regarding the question about adding Prometheus labels out of the box. This
> is common ask of all exporters, but Prometheus philosophy sees this as an
> "anti-pattern" as the metrics source can often be ambivalent about context.
> See [0] for example of such a discussion.
>
> Instead, we can establish context during service discovery. If, for
> example, we run clusters for tenants on Kubernetes, then within the
> kubernetes_sd_config [1] labelling rules we can instruct Prometheus to add
> the Kubernetes labels from the pods, such as "tenant-id: foo" and
> "environment: staging" to each incoming metric it processes.
>
> This isn't limited to Kubernetes; each of the service discovery configs
> designed to accomodate translating metadata from context into metric labels.
>
> If this doesn't work for you, then consider encoding tenant identifier
> into job names, and extract this identifier in a metric_relabel_config [2]
>
> [0]: https://github.com/prometheus/node_exporter/issues/319
> [1]:
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
> [2]:
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs
>
>
> --
> *From:* Rion Williams 
> *Sent:* Sunday, February 28, 2021 12:46 AM
> *To:* Prasanna kumar 
> *Cc:* user 
> *Subject:* Re: Using Prometheus Client Metrics in Flink
>
> Hi Prassana,
>
> Thanks for that. It’s what I was doing previously as a workaround however
> I was just curious if there was any Flink-specific functionality to handle
> this prior to Prometheus.
>
> Additionally from the docs on metrics [0], it seems that there’s a pattern
> in place to use supported third-party metrics such as those from
> CodeHale/DropWizard via a Mave

Re: Best way to handle BIGING to TIMESTAMP conversions

2021-02-28 Thread Jark Wu
Hi Sebastián,

You can use `TO_TIMESTAMP(FROM_UNIXTIME(e))` to get a timestamp value.
The BIGINT should be in seconds.  Please note to declare the computed column
 in DDL schema and declare a watermark strategy on this computed field to
make
 the field to be a rowtime attribute. Because streaming over window
requires to
 order by a time attribute.

Best,
Jark

On Sun, 21 Feb 2021 at 07:32, Sebastián Magrí  wrote:

> I have a table with two BIGINT fields for start and end of an event as
> UNIX time in milliseconds. I want to be able to have a resulting column
> with the delta in milliseconds and group by that difference. Also, I want
> to be able to have aggregations with window functions based upon the `end`
> field.
>
> The table definition looks like this:
> |CREATE TABLE sessions (
> |  `ats`   STRING,
> |  `e` BIGINT,
> |  `s` BIGINT,
> |  `proc_time` AS PROCTIME(),
> |  PRIMARY KEY (`ats`, `s`, `e`) NOT ENFORCED
> |)
>
> Then I have a few views like this:
>
> CREATE VIEW second_sessions AS
>   SELECT * FROM sessions
>   WHERE `e` - `s` = 1000
>
> And some windows using these views like this:
>
>   WINDOW w3m AS (
> PARTITION BY `t`
> ORDER BY `proc_time`
> RANGE BETWEEN INTERVAL '3' MINUTE PRECEDING AND CURRENT ROW
>   )
>
> I'd like to use the `e` field for windowing instead of `proc_time`. But I
> keep running into errors with the `TO_TIMESTAMP(BIGINT)` function now
> missing or with unsupported timestamp arithmetics.
>
> What is the best practice for a case such as this?
>
> Best Regards,
> --
> Sebastián Ramírez Magrí
>


Re: Union fields with time attributes have different types

2021-02-28 Thread Jark Wu
Hi Sebastián,

`endts` in your case is a time attribute which is slightly different than a
regular TIMESTAMP type.
You can manually `cast(endts as timestamp(3)` to make this query work
which removes the time attribute meta.

SELECT `evt`, `value`, `startts`, cast(endts as timestamp(3)
FROM aggs_1m


Best,
Jark

On Mon, 22 Feb 2021 at 05:01, Sebastián Magrí  wrote:

> I'm using a query like this
>
> WITH aggs_1m AS (
>   SELECT
> `evt`,
> `startts`
> `endts`,
> SUM(`value`) AS `value`
>   FROM aggregates_per_minute
> ), aggs_3m AS (
>   SELECT
> `evt`,
> TUMBLE_START(`endts`, INTERVAL '3' MINUTE) AS `startts`,
> TUMBLE_END(`endts`, INTERVAL '3' MINUTE) AS `endts`,
> SUM(`c`) AS `value`
>   FROM aggregates_per_minute
>   GROUP BY t, TUMBLE(`endts`, INTERVAL '3' MINUTE)
> )
> SELECT `evt`, `value`, `startts`, `endts`
> FROM aggs_1m
> UNION
> SELECT `evt`, `value`, `startts`, `endts`
> FROM aggs_3m
>
> But it's throwing this exception
>
> org.apache.flink.table.api.ValidationException: Union fields with time
> attributes have different types.
>
> Doesn't TUMBLE_START(somets, ...) return a TIMESTAMP of the same type?
>
> --
> Sebastián Ramírez Magrí
>


Re: java Flink local test failure (Could not create actor system)

2021-02-28 Thread Smile
Hi Vijay,

Since version 1.7 Flink builds with Scala version 2.11 (default) and 2.12.
Flink has APIs, libraries, and runtime modules written in Scala. Users of
the Scala API and libraries may have to match the Scala version of Flink
with the Scala version of their projects (because Scala is not strictly
backward compatible). See [1] for more information.

If using maven, artifactId of Flink components usually end with scala
version, such as flink-streaming-java_2.11 means it was built against Scala
2.11.

[1].
https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/building.html#scala-versions

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-28 Thread Abhishek Shukla
Hi Matthias,
Thanks for replying,
I checked both of these pages,
And I downloaded the zip of flink 1.12.1 so the changes related to log4j2
are there in property file,

I am able to see the logs of pipeline once application in up, but the logs
related to application failure or successful bean creation or logs at time
of post construct are not getting printed out in file, which was happening
in flink 1.9 with provided log4j-cli.properties file.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-02-28 Thread Yang Wang
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints
have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in
KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is
recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application
cluster entrypoint is not creating a new JobGraph from the specified
arguments.


Best,
Yang

Alexey Trenikhun  于2021年2月27日周六 上午1:48写道:

> Hello,
> We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is
> deployed as Job, single TM as StatefulSet). We taken savepoint with
> cancel=true. Now when we are trying to start job using --fromSavepoint *A*,
> where is *A* path we got from taking savepoint (ClusterEntrypoint reports
> *A* in log), but looks like Job for some reason ignores given *A* and
> actually trying to restore from some path *B* (CheckpointCoordinator logs
> *B* ):
>
> *{"ts":"2021-02-26T17:09:52.500Z","message":" Program
> Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-e8a201008f2c
> ","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.502Z","message":"
>  
> ","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
>  *
> *...*
>
> *{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from
> KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in
> KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are
> already
> downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during
> restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.183Z","message":"Starting job
>  from savepoint
> wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-fbcd58f66685
> 
> (allowing non restored
> state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build
> 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id:
> 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2}
> {"ts":"2021-02-26T

Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-28 Thread Matthias Pohl
Hi Abhishek,
have you also tried to apply the instructions listed in [1]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/logging.html#configuring-log4j1

On Mon, Mar 1, 2021 at 4:42 AM Abhishek Shukla 
wrote:

> Hi Matthias,
> Thanks for replying,
> I checked both of these pages,
> And I downloaded the zip of flink 1.12.1 so the changes related to log4j2
> are there in property file,
>
> I am able to see the logs of pipeline once application in up, but the logs
> related to application failure or successful bean creation or logs at time
> of post construct are not getting printed out in file, which was happening
> in flink 1.9 with provided log4j-cli.properties file.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/