The Role of TimerService in ProcessFunction

2021-03-16 Thread Chirag Dewan
Hi,
Currently, both ProcessFunction and KeyedProcessFunction (and their CoProcess 
counterparts) expose the Context and TimerService in the processElement() 
method. However, if we use the TimerService in non keyed context, it gives a 
runtime error. 
I am a bit confused about these APIs. Is there any specific reason for exposing 
TimerService in non-keyed context especially if it cant be used without keyed 
stream?
Any leads are much appreciated.
Thanks,Chirag

Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-16 Thread Yun Tang
Hi Alexey,

Thanks for your reply, could you also share logs during normal restoring just 
as I wrote in previous thread so that I could compare.

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Wednesday, March 17, 2021 13:55
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,
I'm attaching shorter version of log, looks like full version didn't come 
through

Thanks,
Alexey

From: Yun Tang 
Sent: Tuesday, March 16, 2021 8:05 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

I believe your exception messages are printed from Flink-1.12.2 not 
Flink-1.12.1 due to the line number of method calling.

Could you share exception message of Flink-1.12.1 when rescaling? Moreover, I 
hope you could share more logs during restoring and rescaling. I want to see 
details of key group handle [1]

[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L153

Best

From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 15:10
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Also restore from same savepoint without change in parallelism works fine.


From: Alexey Trenikhun 
Sent: Monday, March 15, 2021 9:51 PM
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

No, I believe original exception was from 1.12.1 to 1.12.1

Thanks,
Alexey


From: Yun Tang 
Sent: Monday, March 15, 2021 8:07:07 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Can you scale the job at the same version from 1.12.1 to 1.12.1?

Best
Yun Tang


From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 4:46
To: Tzu-Li (Gordon) Tai ; user@flink.apache.org 

Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Savepoint was taken with 1.12.1, I've tried to scale up using same version and 
1.12.2


From: Tzu-Li (Gordon) Tai 
Sent: Monday, March 15, 2021 12:06 AM
To: user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Could you provide info on the Flink version used?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Editing job graph at runtime

2021-03-16 Thread Jessy Ping
Hi Team,

Can you provide your thoughts on this, it will be helpful ..

Thanks
Jessy

On Tue, 16 Mar 2021 at 21:29, Jessy Ping  wrote:

> Hi Timo/Team,
> Thanks for the reply.
>
> Just take the example from the following pseduo code,
> Suppose , this is the current application logic.
>
> firstInputStream = addSource(...)* //Kafka consumer C1*
> secondInputStream =  addSource(...) *//Kafka consumer C2*
>
> outputStream = firstInputStream,keyBy(a -> a.key)
> .connect(secondInputStream.keyBy(b->b.key))
> .coProcessFunction()
> * // logic determines : whether a new sink should be added to the
> application or not ?. If not: then the event will be produced to the
> existing sink(s). If a new sink is required: produce the events to the
> existing sinks + the new one*
> sink1 = addSink(outPutStream). //Kafka producer P1
> .
> .
> .
> sinkN =  addSink(outPutStream). //Kafka producer PN
>
> *Questions*
> --> Can I add a new sink into the execution graph at runtime, for example
> : a new Kafka producer , without restarting the current application  or
> using option1 ?
>
> -->  (Option 2 )What do you mean by adding a custom sink at
> coProcessFunction , how will it change the execution graph ?
>
> Thanks
> Jessy
>
>
>
> On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:
>
>> Hi Jessy,
>>
>> to be precise, the JobGraph is not used at runtime. It is translated
>> into an ExecutionGraph.
>>
>> But nevertheless such patterns are possible but require a bit of manual
>> implementation.
>>
>> Option 1) You stop the job with a savepoint and restart the application
>> with slightly different parameters. If the pipeline has not changed
>> much, the old state can be remapped to the slightly modified job graph.
>> This is the easiest solution but with the downside of maybe a couple of
>> seconds downtime.
>>
>> Option 2) You introduce a dedicated control stream (i.e. by using the
>> connect() DataStream API [1]). Either you implement a custom sink in the
>> main stream of the CoProcessFunction. Or you enrich every record in the
>> main stream with sink parameters that are read by you custom sink
>> implementation.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>
>> On 16.03.21 12:37, Jessy Ping wrote:
>> > Hi Team,
>> > Is it possible to edit the job graph at runtime ? . Suppose, I want to
>> > add a new sink to the flink application at runtime that depends upon
>> > the  specific parameters in the incoming events.Can i edit the jobgraph
>> > of a running flink application ?
>> >
>> > Thanks
>> > Jessy
>>
>>


Production Readiness of File Source

2021-03-16 Thread Chirag Dewan
Hi,
I am intending to use the File source for a production use case. I have a few 
use cases that are currently not supported like deleting a file once it's 
processed. 
So I was wondering if we can use this in production or write my own 
implementation? Is there any recommendations around this?
ThanksChirag

Re: Checkpoint fail due to timeout

2021-03-16 Thread Alexey Trenikhun
In my opinion looks similar. Were you able to tune-up Flink to make it work? 
I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to 
rescale I need to take savepoint, which never completes (at least takes longer 
than 3 hours).



From: ChangZhuo Chen (陳昌倬)
Sent: Tuesday, March 16, 2021 6:59 AM
To: Alexey Trenikhun
Cc: ro...@apache.org; Flink User Mail List
Subject: Re: Checkpoint fail due to timeout

On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> Hi Roman,
> I took thread dump:
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> -  blocked on java.lang.Object@5366a0e2
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 
> WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at sun.misc.Unsafe.park(Native Method)
> -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>
> Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple 
> threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on 
> different Objects.

Hi,

This call stack is similar to our case as described in [0]. Maybe they
are the same issue?

[0] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


ClassCastException after upgrading Flink application to 1.11.2

2021-03-16 Thread soumoks
Hi,

We have upgraded an application originally written for Flink 1.9.1 with
Scala 2.11 to Flink 1.11.2 with Scala 2.12.7 and we are seeing the following
error at runtime.


2021-03-16 20:37:08
java.lang.RuntimeException
  at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
  at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
  at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
  at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
  at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
  at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
  at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
  at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
  at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
  at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
  at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
Caused by: java.lang.ClassCastException



The class in question was using Scala Long and Scala BigDecimal types which
have been changed to Java Long and Java BigDecimal types as a means to
resolve this error but to no avail.

This application is running on AWS EMR running emr-6.2.0 if that helps.


Thanks,
Sourabh




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-16 Thread Yun Tang
Hi Alexey,

I believe your exception messages are printed from Flink-1.12.2 not 
Flink-1.12.1 due to the line number of method calling.

Could you share exception message of Flink-1.12.1 when rescaling? Moreover, I 
hope you could share more logs during restoring and rescaling. I want to see 
details of key group handle [1]

[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L153

Best

From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 15:10
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Also restore from same savepoint without change in parallelism works fine.


From: Alexey Trenikhun 
Sent: Monday, March 15, 2021 9:51 PM
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

No, I believe original exception was from 1.12.1 to 1.12.1

Thanks,
Alexey


From: Yun Tang 
Sent: Monday, March 15, 2021 8:07:07 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Can you scale the job at the same version from 1.12.1 to 1.12.1?

Best
Yun Tang


From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 4:46
To: Tzu-Li (Gordon) Tai ; user@flink.apache.org 

Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Savepoint was taken with 1.12.1, I've tried to scale up using same version and 
1.12.2


From: Tzu-Li (Gordon) Tai 
Sent: Monday, March 15, 2021 12:06 AM
To: user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Could you provide info on the Flink version used?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Prefix Seek RocksDB

2021-03-16 Thread Yun Tang
Hi Rex,

Prefix seek iterator has not ever been used in Flink when seeking. I hope you 
could first read more details about this from RocksDB wiki as prefix extractor 
could impact the performance.

Best
Yun Tang

From: Rex Fenley 
Sent: Wednesday, March 17, 2021 2:02
To: Yun Tang 
Cc: user ; Brad Davis 
Subject: Re: Prefix Seek RocksDB

Thanks for the input, I'll look more into that.

Does your answer then imply that Joins and Aggs do not inherently always use 
prefix seeks? I'd imagine that the join key on join and groupby key on aggs 
would always be used as prefix keys. Is this not the case?

Also, is there good information on what the correct prefix extractor is for 
Flink? This feature is something I only just discovered so I was hoping to gain 
clarity.

Thanks

On Mon, Mar 15, 2021 at 8:33 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Rex,

You could configure prefix seek via RocksDB's column family options [1]. Be 
careful to use correct prefix extractor.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#passing-options-factory-to-rocksdb


Best



From: Rex Fenley mailto:r...@remind101.com>>
Sent: Tuesday, March 16, 2021 8:29
To: user mailto:user@flink.apache.org>>
Cc: Brad Davis mailto:brad.da...@remind101.com>>
Subject: Prefix Seek RocksDB

Hello!

I'm wondering if Flink RocksDB state backend is pre-configured to have Prefix 
Seeks enabled, such as for Joins and Aggs on the TableAPI [1]? If not, what's 
the easiest way to configure this? I'd imagine this would be beneficial.

Thanks!

[1] https://github.com/facebook/rocksdb/wiki/Prefix-Seek

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW 
US  |  LIKE US


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW 
US  |  LIKE US


Re: Question about session_aggregate.merging-window-set.rocksdb_estimate-num-keys

2021-03-16 Thread Vishal Santoshi
Neither those are metrics metrics on a ValueState, which is
updated at least once every call to process.  The metric is the the number
of these ValueStates  scoped to a key ( am using session
windows ).


On Mon, Mar 15, 2021 at 11:29 PM Yun Tang  wrote:

> Hi,
>
> Could you describe what you observed in details? Which states you compare
> with the session window state "merging-window-set", the "newKeysInState"
> or "existingKeysInState"?
>
> BTW, since we use list state as main state for window operator and we use
> RocksDB's merge operation for window state add operations, this would cause
> the estimating of number keys inaccurate [1]:
>   // Estimation will be inaccurate when:
>   // (1) there exist merge keys
>   // (2) keys are directly overwritten
>   // (3) deletion on non-existing keys
>   // (4) low number of samples
>
> [1]
> https://github.com/ververica/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/version_set.cc#L919-L924
>
>
>
> Best
> Yun Tang
> --
> *From:* Vishal Santoshi 
> *Sent:* Monday, March 15, 2021 5:48
> *To:* user 
> *Subject:* Re: Question about
> session_aggregate.merging-window-set.rocksdb_estimate-num-keys
>
> All I can think is, that any update on a state key, which I do in my
> ProcessFunction, creates an update ( essentially an append on rocksdb )
> which does render the previous value for the key, a  tombstone , but that
> need not reflect on the count  ( as double or triple counts ) atomically,
> thus the called as an "estimate" , but was not anticipating this much
> difference ...
>
> On Sun, Mar 14, 2021 at 5:32 PM Vishal Santoshi 
> wrote:
>
> The reason I ask is that I have a "Process Window Function" on that
> Session  Window  and I keep key scoped Global State.  I maintain a TTL on
> that state ( that is outside the Window state )  that is roughly the
> current WM + lateness.
>
> I would imagine that keys for that custom state are *roughly* equal to
> the number of keys in the "merging-window-set" . It seems twice that number
> but does follow the slope. I am trying to figure out why this deviation.
>
> public void process(KEY key,
> ProcessWindowFunction, KeyedSessionWithSessionID<
> KEY, VALUE>, KEY, TimeWindow>.Context context,
> Iterable> elements, Collector<
> KeyedSessionWithSessionID> out)
> throws Exception {
> // scoped to the key
> if (state.value() == null) {
> this.newKeysInState.inc();
> state.update(new IntervalList());
> }else{
> this.existingKeysInState.inc();
> }
>
> On Sun, Mar 14, 2021 at 3:32 PM Vishal Santoshi 
> wrote:
>
> Hey folks,
>
>   Was looking at this very specific metric
> "session_aggregate.merging-window-set.rocksdb_estimate-num-keys".  Does
> this metric also represent session windows ( it is a session window ) that
> have lateness on them ? In essence if the session window was closed but has
> a lateness of a few hours would those keys still be counted against this
> metric.
>
> I think they should as it is an estimate keys for the Column Family for
> the operator and if the window has not been GCed then the key for those
> Windows should be in RocksDB but wanted to be sure.
>
> Regards.
>
>
>


How to advance Kafka offsets manually with checkpointing enabled on Flink (TableAPI)

2021-03-16 Thread Rex Fenley
Hello,

I'm wondering how, in the event of a poison pill record on Kafka, to
advance a partition's checkpointed offsets by 1 when using the TableAPI/SQL.

It is my understanding that when checkpointing is enabled Flink uses its
own checkpoint committed offsets and not the offsets committed to Kafka
when starting a job from a checkpoint.

In the event that there is a poison pill record in Kafka that is crashing
the Flink job, we may want to simply advance our checkpointed offsets by 1
for the partition, past the poison record, and then continue operation as
normal. We do not want to lose any other state in Flink however.

I'm wondering how to go about this then. It's easy enough to have Kafka
advance its committed offsets. Is there a way to tell Flink to ignore
checkpointed offsets and instead respect the offsets committed to Kafka for
a consumer group when restoring from a checkpoint?
If so we could:
1. Advance Kafka's offsets.
2. Run our job from the checkpoint and have it use Kafka's offsets and then
checkpoint with new Kafka offsets.
3. Stop the job, and rerun it using Flink's committed, now advanced,
offsets.

Is this possible? Are there any better strategies?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Prefix Seek RocksDB

2021-03-16 Thread Rex Fenley
Thanks for the input, I'll look more into that.

Does your answer then imply that Joins and Aggs do not inherently always
use prefix seeks? I'd imagine that the join key on join and groupby key on
aggs would always be used as prefix keys. Is this not the case?

Also, is there good information on what the correct prefix extractor is for
Flink? This feature is something I only just discovered so I was hoping to
gain clarity.

Thanks

On Mon, Mar 15, 2021 at 8:33 PM Yun Tang  wrote:

> Hi Rex,
>
> You could configure prefix seek via RocksDB's column family options [1].
> Be careful to use correct prefix extractor.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#passing-options-factory-to-rocksdb
>
>
> Best
>
>
> --
> *From:* Rex Fenley 
> *Sent:* Tuesday, March 16, 2021 8:29
> *To:* user 
> *Cc:* Brad Davis 
> *Subject:* Prefix Seek RocksDB
>
> Hello!
>
> I'm wondering if Flink RocksDB state backend is pre-configured to have
> Prefix Seeks enabled, such as for Joins and Aggs on the TableAPI [1]? If
> not, what's the easiest way to configure this? I'd imagine this would be
> beneficial.
>
> Thanks!
>
> [1] https://github.com/facebook/rocksdb/wiki/Prefix-Seek
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Rion Williams
I've made a handful of tweaks to it to try and get them to pick up as
expected (i.e. adding logging to every available overload for the
interceptors, etc) using something similar to the following:

fun create(): InterceptingTaskMetricGroup {
val operatorGroup = object: InterceptingOperatorMetricGroup() {
override fun addGroup(name: Int): MetricGroup {
// Include logging here...
}

override fun addGroup(name: String?): MetricGroup {
// Include logging here...
}

// Repeat ad nauseum
}

return object: InterceptingTaskMetricGroup() {
override fun getOrAddOperator(id: OperatorID, name: String):
OperatorMetricGroup {
return operatorGroup
}
}
}

It still looks like it's only ever registering the built-in metrics and not
hitting any of those for the TestHarness execution. I've even included a
simple test metric for the function during the open() call to ensure that
it wasn't some other unrelated issue for something happening in the
processFunction() calls / dynamic metrics.

Said differently - I can see the logs being hit in the
InterceptingOperatorMetricGroup.addGroup()
calls, but only for the internal metrics from the Task/JobManagers
respectively, nothing custom.

Rion


On Tue, Mar 16, 2021 at 11:00 AM Chesnay Schepler 
wrote:

> Actually you'd have to further subclass the operatorMetricGroup such that
> addGroup works as expected.
> This is admittedly a bit of a drag :/
>
> On 3/16/2021 4:35 PM, Chesnay Schepler wrote:
>
> The test harness is fully independent of the MiniClusterResource; it isn't
> actually running a job. That's why your metrics never arrive at the
> reporter.
>
> You can either:
> a) use the test harness with a custom MetricGroup implementation that
> intercepts registered metrics, set in the MockEnvironment
> b) use the function as part of a job with the custom reporter approach.
> (essentially, fromElements -> function -> discarding sink)
>
> The following would work for a), but it must be noted that this relies on
> quite a few things that are internal to Flink:
>
> ...
>
> InterceptingOperatorMetricGroup operatorMetricGroup =
> new InterceptingOperatorMetricGroup();InterceptingTaskMetricGroup 
> taskMetricGroup =
> new InterceptingTaskMetricGroup() {
> @Overridepublic OperatorMetricGroup 
> getOrAddOperator(OperatorID id, String name) {
> return operatorMetricGroup;}
> };new MockEnvironmentBuilder()
> .setMetricGroup(taskMetricGroup)
>
> ...
>
>
> On 3/16/2021 3:42 PM, Rion Williams wrote:
>
> In this case, I was using a harness to test the function. Although, I
> could honestly care less about the unit-test surrounding metrics, I'm much
> more concerned with having something that will actually run and work as
> intended within a job. The only real concern I have or problem that I want
> to solve is building metrics that may vary based on the data coming in from
> a "label" perspective (e.g. keeping track of the events I've seen for a
> given tenant, or some other properties).
>
> Something like:
>
> _events_seen { tenant = "tenant-1" } 1.0
> _events_seen { tenant = "tenant-2" } 200.0
>
> If that makes sense. I've used the Prometheus client previously to
> accomplish these types of metrics, but since I'm fairly new to the Flink
> world, I was trying to use the built-in constructs available (thus the
> dynamic groups / metrics being added).
>
> On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler 
> wrote:
>
>> Are you actually running a job, or are you using a harness for testing
>> your function?
>>
>> On 3/16/2021 3:24 PM, Rion Williams wrote:
>>
>> Hi Chesnay,
>>
>> Thanks for the prompt response and feedback, it's very much appreciated.
>> Please see the inline responses below to your questions:
>>
>> *Was there anything in the logs (ideally on debug)?*
>>
>>
>> I didn't see anything within the logs that seemed to indicate anything
>> out of the ordinary. I'm currently using a MiniClusterResources for this
>> and attempted to set the logging levels to pick up everything (i.e. ALL),
>> but if there's a way to expose more, I'm not aware of it.
>>
>> *Have you debugged the execution and followed the counter() calls all the
>>> way to the reporter?*
>>
>>
>> With the debugger, I traced one of the counter initializations and it
>> seems that no reporters were being found within the register call in the
>> MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
>>
>> if (this.reporters != null) {
>> for(int i = 0; i < this.reporters.size(); ++i) {
>> MetricRegistryImpl.ReporterAndSettings reporterAndSettings = 
>> (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>>
>> try {
>> if (reporterAndSettings != null) {
>> FrontMetricGroup front = new 
>> FrontMetricGroup(reporterAndSettings.getSettings(), group);
>> 
>> repo

custom metrics within a Trigger

2021-03-16 Thread Aleksander Sumowski
Hi all,
I'd like to measure how many events arrive within allowed lateness grouped
by particular feature of the event. We assume particular type of events
have way more late arrivals and would like to verify this. The natural
place to make the measurement would be our custom trigger within onElement
method as this is the place where we know whether event is late of not. The
issue is that the only way to register MetricGroup at this moment is via
Trigger.TriggerContext - which leads to re-registering and lots of logs:


`Name collision: Group already contains a Metric with the name XXX. Metric
will not be reported.`

Any hints how to tackle it?

Thanks,
Aleksander


Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Chesnay Schepler
Actually you'd have to further subclass the operatorMetricGroup such 
that addGroup works as expected.

This is admittedly a bit of a drag :/

On 3/16/2021 4:35 PM, Chesnay Schepler wrote:
The test harness is fully independent of the MiniClusterResource; it 
isn't actually running a job. That's why your metrics never arrive at 
the reporter.


You can either:
a) use the test harness with a custom MetricGroup implementation that 
intercepts registered metrics, set in the MockEnvironment
b) use the function as part of a job with the custom reporter 
approach. (essentially, fromElements -> function -> discarding sink)


The following would work for a), but it must be noted that this relies 
on quite a few things that are internal to Flink:


...
InterceptingOperatorMetricGroup operatorMetricGroup =
 new InterceptingOperatorMetricGroup(); InterceptingTaskMetricGroup 
taskMetricGroup =
 new InterceptingTaskMetricGroup() {
 @Override public OperatorMetricGroupgetOrAddOperator(OperatorID 
id, String name) {
 return operatorMetricGroup; }
 };
new MockEnvironmentBuilder()
 .setMetricGroup(taskMetricGroup)

...

On 3/16/2021 3:42 PM, Rion Williams wrote:
In this case, I was using a harness to test the function. Although, I 
could honestly care less about the unit-test surrounding metrics, I'm 
much more concerned with having something that will actually run and 
work as intended within a job. The only real concern I have or 
problem that I want to solve is building metrics that may vary based 
on the data coming in from a "label" perspective (e.g. keeping track 
of the events I've seen for a given tenant, or some other properties).


Something like:

_events_seen { tenant = "tenant-1" } 1.0
_events_seen { tenant = "tenant-2" } 200.0

If that makes sense. I've used the Prometheus client previously to 
accomplish these types of metrics, but since I'm fairly new to the 
Flink world, I was trying to use the built-in constructs available 
(thus the dynamic groups / metrics being added).


On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler > wrote:


Are you actually running a job, or are you using a harness for
testing your function?

On 3/16/2021 3:24 PM, Rion Williams wrote:

Hi Chesnay,

Thanks for the prompt response and feedback, it's very much
appreciated. Please see the inline responses below to your
questions:

*Was there anything in the logs (ideally on debug)?*


I didn't see anything within the logs that seemed to indicate
anything out of the ordinary. I'm currently using a
MiniClusterResources for this and attempted to set the logging
levels to pick up everything (i.e. ALL), but if there's a way to
expose more, I'm not aware of it.

*Have you debugged the execution and followed the counter()
calls all the way to the reporter?*


With the debugger, I traced one of the counter initializations
and it seems that no reporters were being found within the
register call in the MetricsRegistryImpl (i.e. this.reporters
has no registered reporters):
if (this.reporters !=null) {
 for(int i =0; i mailto:ches...@apache.org>> wrote:

Was there anything in the logs (ideally on debug)?
Have you debugged the execution and followed the counter()
calls all the way to the reporter?
Do you only see JobManager metrics, or is there somewhere
also something about the TaskManager?

I can see several issues with your code, but none that would
fully explain the issue:

a) your reporter is not thread-safe
b) you only differentiate metrics by name, which will lead
to quite a few collisions.

Be also aware that there will be 2 reporter instances; one
for the JM and one for the TM.
To remedy this, I would recommend creating a factory that
returns a static reporter instance instead; overall this
tends to be cleaner.

Alternatively, when using the testing harnesses IIRC you can
also set set a custom MetricGroup implementation.

On 3/16/2021 4:13 AM, Rion Williams wrote:

Hi all,

Recently, I was working on adding some custom metrics to a
Flink job that required the use of dynamic labels (i.e.
capturing various counters that were "slicable" by things
like tenant / source, etc.).

I ended up handling it in a very naive fashion that would
just keep a dictionary of metrics that had already been
registered and update them accordingly which looked
something like this:
class MyCustomProcessFunction:ProcessFunction() {
 private lateinit var metrics:CustomMetricsRegistry override fun 
open(parameters:Configuration) {
 metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
 }

 override fun 
processE

Re: Editing job graph at runtime

2021-03-16 Thread Jessy Ping
Hi Timo/Team,
Thanks for the reply.

Just take the example from the following pseduo code,
Suppose , this is the current application logic.

firstInputStream = addSource(...)* //Kafka consumer C1*
secondInputStream =  addSource(...) *//Kafka consumer C2*

outputStream = firstInputStream,keyBy(a -> a.key)
.connect(secondInputStream.keyBy(b->b.key))
.coProcessFunction()
* // logic determines : whether a new sink should be added to the
application or not ?. If not: then the event will be produced to the
existing sink(s). If a new sink is required: produce the events to the
existing sinks + the new one*
sink1 = addSink(outPutStream). //Kafka producer P1
.
.
.
sinkN =  addSink(outPutStream). //Kafka producer PN

*Questions*
--> Can I add a new sink into the execution graph at runtime, for example :
a new Kafka producer , without restarting the current application  or using
option1 ?

-->  (Option 2 )What do you mean by adding a custom sink at
coProcessFunction , how will it change the execution graph ?

Thanks
Jessy



On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:

> Hi Jessy,
>
> to be precise, the JobGraph is not used at runtime. It is translated
> into an ExecutionGraph.
>
> But nevertheless such patterns are possible but require a bit of manual
> implementation.
>
> Option 1) You stop the job with a savepoint and restart the application
> with slightly different parameters. If the pipeline has not changed
> much, the old state can be remapped to the slightly modified job graph.
> This is the easiest solution but with the downside of maybe a couple of
> seconds downtime.
>
> Option 2) You introduce a dedicated control stream (i.e. by using the
> connect() DataStream API [1]). Either you implement a custom sink in the
> main stream of the CoProcessFunction. Or you enrich every record in the
> main stream with sink parameters that are read by you custom sink
> implementation.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>
> On 16.03.21 12:37, Jessy Ping wrote:
> > Hi Team,
> > Is it possible to edit the job graph at runtime ? . Suppose, I want to
> > add a new sink to the flink application at runtime that depends upon
> > the  specific parameters in the incoming events.Can i edit the jobgraph
> > of a running flink application ?
> >
> > Thanks
> > Jessy
>
>


Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-16 Thread Robert Cullen
Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now
however submitting a python script to the cluster successfully is sporadic;
sometimes it completes but most of the time it just hangs.  Not sure what
is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang  wrote:

> Hi,
>
> From the error message, I think the problem is no python interpreter on
> your TaskManager machine. You need to install a python 3.5+ interpreter on
> the TM machine, and this python environment needs to install pyflink (pip
> install apache-flink). For details, you can refer to the document[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html
>
> Best,
> Xingbo
>
> Robert Cullen  于2021年3月16日周二 上午2:58写道:
>
>> Okay, I added the jars and fixed that exception. However I have a new
>> exception that is harder to decipher:
>>
>> 2021-03-15 14:46:20
>> org.apache.flink.runtime.JobException: Recovery is suppressed by 
>> NoRestartBackoffTimeStrategy
>> at 
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>> at 
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>> at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>> at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>> at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>> at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>> at 
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>> at 
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>> at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>> at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at 
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at 
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.io.IOException: Cannot run program "python": error=2, No 
>> such file or directory
>> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>> at 
>> org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>> at 
>> org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>> at 
>> org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>> at 
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>> at 
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPyth

Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Chesnay Schepler
The test harness is fully independent of the MiniClusterResource; it 
isn't actually running a job. That's why your metrics never arrive at 
the reporter.


You can either:
a) use the test harness with a custom MetricGroup implementation that 
intercepts registered metrics, set in the MockEnvironment
b) use the function as part of a job with the custom reporter approach. 
(essentially, fromElements -> function -> discarding sink)


The following would work for a), but it must be noted that this relies 
on quite a few things that are internal to Flink:


...

InterceptingOperatorMetricGroup operatorMetricGroup =
new InterceptingOperatorMetricGroup(); InterceptingTaskMetricGroup 
taskMetricGroup =
new InterceptingTaskMetricGroup() {
@Override public OperatorMetricGroupgetOrAddOperator(OperatorID id, 
String name) {
return operatorMetricGroup; }
};
new MockEnvironmentBuilder()
.setMetricGroup(taskMetricGroup)

...


On 3/16/2021 3:42 PM, Rion Williams wrote:
In this case, I was using a harness to test the function. Although, I 
could honestly care less about the unit-test surrounding metrics, I'm 
much more concerned with having something that will actually run and 
work as intended within a job. The only real concern I have or problem 
that I want to solve is building metrics that may vary based on the 
data coming in from a "label" perspective (e.g. keeping track of the 
events I've seen for a given tenant, or some other properties).


Something like:

_events_seen { tenant = "tenant-1" } 1.0
_events_seen { tenant = "tenant-2" } 200.0

If that makes sense. I've used the Prometheus client previously to 
accomplish these types of metrics, but since I'm fairly new to the 
Flink world, I was trying to use the built-in constructs available 
(thus the dynamic groups / metrics being added).


On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler > wrote:


Are you actually running a job, or are you using a harness for
testing your function?

On 3/16/2021 3:24 PM, Rion Williams wrote:

Hi Chesnay,

Thanks for the prompt response and feedback, it's very much
appreciated. Please see the inline responses below to your questions:

*Was there anything in the logs (ideally on debug)?*


I didn't see anything within the logs that seemed to indicate
anything out of the ordinary. I'm currently using a
MiniClusterResources for this and attempted to set the logging
levels to pick up everything (i.e. ALL), but if there's a way to
expose more, I'm not aware of it.

*Have you debugged the execution and followed the counter()
calls all the way to the reporter?*


With the debugger, I traced one of the counter initializations
and it seems that no reporters were being found within the
register call in the MetricsRegistryImpl (i.e. this.reporters has
no registered reporters):
if (this.reporters !=null) {
 for(int i =0; i mailto:ches...@apache.org>> wrote:

Was there anything in the logs (ideally on debug)?
Have you debugged the execution and followed the counter()
calls all the way to the reporter?
Do you only see JobManager metrics, or is there somewhere
also something about the TaskManager?

I can see several issues with your code, but none that would
fully explain the issue:

a) your reporter is not thread-safe
b) you only differentiate metrics by name, which will lead to
quite a few collisions.

Be also aware that there will be 2 reporter instances; one
for the JM and one for the TM.
To remedy this, I would recommend creating a factory that
returns a static reporter instance instead; overall this
tends to be cleaner.

Alternatively, when using the testing harnesses IIRC you can
also set set a custom MetricGroup implementation.

On 3/16/2021 4:13 AM, Rion Williams wrote:

Hi all,

Recently, I was working on adding some custom metrics to a
Flink job that required the use of dynamic labels (i.e.
capturing various counters that were "slicable" by things
like tenant / source, etc.).

I ended up handling it in a very naive fashion that would
just keep a dictionary of metrics that had already been
registered and update them accordingly which looked
something like this:
class MyCustomProcessFunction:ProcessFunction() {
 private lateinit var metrics:CustomMetricsRegistry override fun 
open(parameters:Configuration) {
 metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
 }

 override fun 
processElement(event:Event,context:Context,collector:Collector) {
 // Insert calls like metrics.inc("tenant-name", 4) here }
}

class CustomMetricsRegistry(private val 
metr

Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Rion Williams
In this case, I was using a harness to test the function. Although, I could
honestly care less about the unit-test surrounding metrics, I'm much more
concerned with having something that will actually run and work as intended
within a job. The only real concern I have or problem that I want to solve
is building metrics that may vary based on the data coming in from a
"label" perspective (e.g. keeping track of the events I've seen for a given
tenant, or some other properties).

Something like:

_events_seen { tenant = "tenant-1" } 1.0
_events_seen { tenant = "tenant-2" } 200.0

If that makes sense. I've used the Prometheus client previously to
accomplish these types of metrics, but since I'm fairly new to the Flink
world, I was trying to use the built-in constructs available (thus the
dynamic groups / metrics being added).

On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler  wrote:

> Are you actually running a job, or are you using a harness for testing
> your function?
>
> On 3/16/2021 3:24 PM, Rion Williams wrote:
>
> Hi Chesnay,
>
> Thanks for the prompt response and feedback, it's very much appreciated.
> Please see the inline responses below to your questions:
>
> *Was there anything in the logs (ideally on debug)?*
>
>
> I didn't see anything within the logs that seemed to indicate anything out
> of the ordinary. I'm currently using a MiniClusterResources for this and
> attempted to set the logging levels to pick up everything (i.e. ALL), but
> if there's a way to expose more, I'm not aware of it.
>
> *Have you debugged the execution and followed the counter() calls all the
>> way to the reporter?*
>
>
> With the debugger, I traced one of the counter initializations and it
> seems that no reporters were being found within the register call in the
> MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
>
> if (this.reporters != null) {
> for(int i = 0; i < this.reporters.size(); ++i) {
> MetricRegistryImpl.ReporterAndSettings reporterAndSettings = 
> (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>
> try {
> if (reporterAndSettings != null) {
> FrontMetricGroup front = new 
> FrontMetricGroup(reporterAndSettings.getSettings(), group);
> reporterAndSettings.getReporter().notifyOfAddedMetric(metric, 
> metricName, front);
> }
> } catch (Exception var11) {
> LOG.warn("Error while registering metric: {}.", metricName, 
> var11);
> }
> }
> }
>
>  Perhaps this is an error on my part as I had assumed the following would
> be sufficient to register my reporter (within a local / minicluster
> environment):
>
> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
> ConfigConstants.METRICS_REPORTER_PREFIX +
> "MockCustomMetricsReporter." +
> ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
> MockCustomMetricsReporter::class.java.name))
> @ClassRule@JvmFieldval flink = MiniClusterResource(
> MiniClusterResourceConfiguration.Builder()
> .setConfiguration(metricsConfiguration)
> .setNumberTaskManagers(1)
> .setNumberSlotsPerTaskManager(1)
> .build()
> )
>
> However, it's clearly being recognized for the built-in metrics, just not
> these custom ones that are being registered as they are triggering the
> notifyOfAddedMetric() function within the reporter itself.
>
> *Do you only see JobManager metrics, or is there somewhere also something
>> about the TaskManager?*
>
>
> It looks like there are metrics coming from both the JobManager and
> TaskManagers from the following examples that were coming out:
>
> localhost.jobmanager.numRegisteredTaskManagers
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
> localhost.jobmanager.Status.JVM.Memory.Direct.Count
>
> I do agree that a factory implementation with a static reporter would
> likely be a better approach, so I may explore that a bit more. As well as
> adding some changes to the existing, albeit ghetto, implementation for
> handling the dynamic metrics. I did see several references to a
> MetricRegistry class, however I wasn't sure if that was the most
> appropriate place to add this type of functionality or if it was needed at
> all.
>
> Thanks much,
>
> Rion
>
>
>
> On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler 
> wrote:
>
>> Was there anything in the logs (ideally on debug)?
>> Have you debugged the execution and followed the counter() calls all the
>> way to the reporter?
>> Do you only see JobManager metrics, or is there somewhere also something
>> about the TaskManager?
>>
>> I can see several issues with your code, but none that would fully
>> explain the issue:
>>
>> a) your reporter is not thread-safe
>> b) you only differentiate metrics by name, which will lead to quite a few
>> collisions.
>>
>> Be also aware that there 

Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Chesnay Schepler
Are you actually running a job, or are you using a harness for testing 
your function?


On 3/16/2021 3:24 PM, Rion Williams wrote:

Hi Chesnay,

Thanks for the prompt response and feedback, it's very much 
appreciated. Please see the inline responses below to your questions:


*Was there anything in the logs (ideally on debug)?*


I didn't see anything within the logs that seemed to indicate anything 
out of the ordinary. I'm currently using a MiniClusterResources for 
this and attempted to set the logging levels to pick up everything 
(i.e. ALL), but if there's a way to expose more, I'm not aware of it.


*Have you debugged the execution and followed the counter() calls
all the way to the reporter?*


With the debugger, I traced one of the counter initializations and it 
seems that no reporters were being found within the register call in 
the MetricsRegistryImpl (i.e. this.reporters has no registered reporters):

if (this.reporters !=null) {
 for(int i =0; i  Perhaps this is an error on my part as I had assumed the following 
would be sufficient to register my reporter (within a local / 
minicluster environment):

private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
 ConfigConstants.METRICS_REPORTER_PREFIX +
 "MockCustomMetricsReporter." +
 ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
MockCustomMetricsReporter::class.java.name ))

@ClassRule @JvmField val flink =MiniClusterResource(
 MiniClusterResourceConfiguration.Builder()
 .setConfiguration(metricsConfiguration)
 .setNumberTaskManagers(1)
 .setNumberSlotsPerTaskManager(1)
 .build()
)
However, it's clearly being recognized for the built-in metrics, just 
not these custom ones that are being registered as they are triggering 
the notifyOfAddedMetric() function within the reporter itself.


*Do you only see JobManager metrics, or is there somewhere also
something about the TaskManager?*


It looks like there are metrics coming from both the JobManager and 
TaskManagers from the following examples that were coming out:

localhost.jobmanager.numRegisteredTaskManagers
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
localhost.jobmanager.Status.JVM.Memory.Direct.Count
I do agree that a factory implementation with a static reporter would 
likely be a better approach, so I may explore that a bit more. As well 
as adding some changes to the existing, albeit ghetto, implementation 
for handling the dynamic metrics. I did see several references to a 
MetricRegistry class, however I wasn't sure if that was the most 
appropriate place to add this type of functionality or if it was 
needed at all.


Thanks much,

Rion



On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler > wrote:


Was there anything in the logs (ideally on debug)?
Have you debugged the execution and followed the counter() calls
all the way to the reporter?
Do you only see JobManager metrics, or is there somewhere also
something about the TaskManager?

I can see several issues with your code, but none that would fully
explain the issue:

a) your reporter is not thread-safe
b) you only differentiate metrics by name, which will lead to
quite a few collisions.

Be also aware that there will be 2 reporter instances; one for the
JM and one for the TM.
To remedy this, I would recommend creating a factory that returns
a static reporter instance instead; overall this tends to be cleaner.

Alternatively, when using the testing harnesses IIRC you can also
set set a custom MetricGroup implementation.

On 3/16/2021 4:13 AM, Rion Williams wrote:

Hi all,

Recently, I was working on adding some custom metrics to a Flink
job that required the use of dynamic labels (i.e. capturing
various counters that were "slicable" by things like tenant /
source, etc.).

I ended up handling it in a very naive fashion that would just
keep a dictionary of metrics that had already been registered and
update them accordingly which looked something like this:
class MyCustomProcessFunction:ProcessFunction() {
 private lateinit var metrics:CustomMetricsRegistry override fun 
open(parameters:Configuration) {
 metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
 }

 override fun 
processElement(event:Event,context:Context,collector:Collector) {
 // Insert calls like metrics.inc("tenant-name", 4) here }
}

class CustomMetricsRegistry(private val 
metricGroup:MetricGroup):Serializable {
 // Increments a given metric by key fun 
inc(metric:String,tenant:String,amount:Long =1) {
 // Store a key for the metric val key ="$metric-$tenant" // 
Store/register the metric if (!registeredMetrics.containsKey(key)){
  

Re: [Flink SQL] Leniency of JSON parsing

2021-03-16 Thread Timo Walther

Hi Sebastian,

you can checkout the logic your self by looking into

https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java

and

https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java

So actually your use case should work. Could you help investogating what 
is going wrong? In any case we should open an issue for it. It seems to 
be a bug.


Regards,
Timo

On 12.03.21 21:10, Magri, Sebastian wrote:
I validated it's still accepted by the connector but it's not in the 
documentation anymore.


It doesn't seem to help in my case.

Thanks,
Sebastian

*From:* Magri, Sebastian 
*Sent:* Friday, March 12, 2021 18:50
*To:* Timo Walther ; ro...@apache.org 
*Cc:* user 
*Subject:* Re: [Flink SQL] Leniency of JSON parsing
Hi Roman!

Seems like that option is no longer available.

Best Regards,
Sebastian

*From:* Roman Khachatryan 
*Sent:* Friday, March 12, 2021 16:59
*To:* Magri, Sebastian ; Timo Walther 


*Cc:* user 
*Subject:* Re: [Flink SQL] Leniency of JSON parsing
Hi Sebastian,

Did you try setting debezium-json-map-null-key-mode to DROP [1]?

I'm also pulling in Timo who might know better.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html#debezium-json-map-null-key-mode 



Regards,
Roman



On Fri, Mar 12, 2021 at 2:42 PM Magri, Sebastian
 wrote:


I'm trying to extract data from a Debezium CDC source, in which one of the 
backing tables has an open schema nested JSON field like this:


"objectives": {
 "items": [
 {
 "id": 1,
 "label": "test 1"
 "size": 1000.0
 },
 {
 "id": 2,
 "label": "test 2"
 "size": 500.0
 }
 ],
 "threshold": 10.0,
 "threshold_period": "hourly",
 "max_ms": 3.0
}


Any of these fields can be missing at any time, and there can also be 
additional, different fields. It is guaranteed that a field will have the same 
data type for all occurrences.

For now, I really need to get only the `threshold` and `threshold_period` 
fields. For which I'm using a field as the following:


CREATE TABLE probes (
   `objectives` ROW(`threshold` FLOAT, `threshold_period` STRING)
   ...
) WITH (
  ...
   'format' = 'debezium-json',
   'debezium-json.schema-include' = 'true',
   'debezium-json.ignore-parse-errors' = 'true'
)


However I keep getting `NULL` values in my `objectives` column, or corrupt JSON 
message exceptions when I disable the `ignore-parse-errors` option.

Does JSON parsing need to match 100% the schema of the field or is it lenient?

Is there any option or syntactic detail I'm missing?

Best Regards,




Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Rion Williams
Hi Chesnay,

Thanks for the prompt response and feedback, it's very much appreciated.
Please see the inline responses below to your questions:

*Was there anything in the logs (ideally on debug)?*


I didn't see anything within the logs that seemed to indicate anything out
of the ordinary. I'm currently using a MiniClusterResources for this and
attempted to set the logging levels to pick up everything (i.e. ALL), but
if there's a way to expose more, I'm not aware of it.

*Have you debugged the execution and followed the counter() calls all the
> way to the reporter?*


With the debugger, I traced one of the counter initializations and it seems
that no reporters were being found within the register call in the
MetricsRegistryImpl (i.e. this.reporters has no registered reporters):

if (this.reporters != null) {
for(int i = 0; i < this.reporters.size(); ++i) {
MetricRegistryImpl.ReporterAndSettings reporterAndSettings =
(MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);

try {
if (reporterAndSettings != null) {
FrontMetricGroup front = new
FrontMetricGroup(reporterAndSettings.getSettings(), group);

reporterAndSettings.getReporter().notifyOfAddedMetric(metric,
metricName, front);
}
} catch (Exception var11) {
LOG.warn("Error while registering metric: {}.", metricName, var11);
}
}
}

 Perhaps this is an error on my part as I had assumed the following would
be sufficient to register my reporter (within a local / minicluster
environment):

private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
ConfigConstants.METRICS_REPORTER_PREFIX +
"MockCustomMetricsReporter." +
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
MockCustomMetricsReporter::class.java.name
))

@ClassRule
@JvmField
val flink = MiniClusterResource(
MiniClusterResourceConfiguration.Builder()
.setConfiguration(metricsConfiguration)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build()
)

However, it's clearly being recognized for the built-in metrics, just not
these custom ones that are being registered as they are triggering the
notifyOfAddedMetric() function within the reporter itself.

*Do you only see JobManager metrics, or is there somewhere also something
> about the TaskManager?*


It looks like there are metrics coming from both the JobManager and
TaskManagers from the following examples that were coming out:

localhost.jobmanager.numRegisteredTaskManagers
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
localhost.jobmanager.Status.JVM.Memory.Direct.Count

I do agree that a factory implementation with a static reporter would
likely be a better approach, so I may explore that a bit more. As well as
adding some changes to the existing, albeit ghetto, implementation for
handling the dynamic metrics. I did see several references to a
MetricRegistry class, however I wasn't sure if that was the most
appropriate place to add this type of functionality or if it was needed at
all.

Thanks much,

Rion



On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler  wrote:

> Was there anything in the logs (ideally on debug)?
> Have you debugged the execution and followed the counter() calls all the
> way to the reporter?
> Do you only see JobManager metrics, or is there somewhere also something
> about the TaskManager?
>
> I can see several issues with your code, but none that would fully explain
> the issue:
>
> a) your reporter is not thread-safe
> b) you only differentiate metrics by name, which will lead to quite a few
> collisions.
>
> Be also aware that there will be 2 reporter instances; one for the JM and
> one for the TM.
> To remedy this, I would recommend creating a factory that returns a static
> reporter instance instead; overall this tends to be cleaner.
>
> Alternatively, when using the testing harnesses IIRC you can also set set
> a custom MetricGroup implementation.
>
> On 3/16/2021 4:13 AM, Rion Williams wrote:
>
> Hi all,
>
> Recently, I was working on adding some custom metrics to a Flink job that
> required the use of dynamic labels (i.e. capturing various counters that
> were "slicable" by things like tenant / source, etc.).
>
> I ended up handling it in a very naive fashion that would just keep a
> dictionary of metrics that had already been registered and update them
> accordingly which looked something like this:
>
> class MyCustomProcessFunction: ProcessFunction() {
> private lateinit var metrics: CustomMetricsRegistryoverride fun 
> open(parameters: Configuration) {
> metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
> }
>
> override fun processElement(event: Event, context: Context, collector: 
> Collector) {
> // Insert calls like metrics.inc("tenant-name", 4) here}
> }
> class CustomM

Re: Checkpoint fail due to timeout

2021-03-16 Thread 陳昌倬
On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> Hi Roman,
> I took thread dump:
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> -  blocked on java.lang.Object@5366a0e2
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> 
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 
> WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at sun.misc.Unsafe.park(Native Method)
> -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> 
> Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple 
> threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on 
> different Objects.

Hi,

This call stack is similar to our case as described in [0]. Maybe they
are the same issue?

[0] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re:Re: Find many strange measurements in metrics database of influxdb

2021-03-16 Thread Tim yu
Hi Timo,

The measurement of influxdb is like the table of mysql, "from table1" is a very 
strange table name.
Maybe this is a bug.

Regards,
Tim


--





- Original Message -
From: "Timo Walther" 
To: user@flink.apache.org
Sent: Tue, 16 Mar 2021 13:30:41 +0100
Subject: Re: Find many strange measurements in metrics database of influxdb

Hi Tim,

"from table1" might be the operator that reads "table1" also known as 
the table scan operator. Could you share more of the metrics and their 
values? Most of them should be explained in

https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#system-metrics

Regards,
Timo


On 16.03.21 06:46, Tim yu wrote:
> Hi all,
> 
> I run many flink jobs that contains sql, they reports the metrics to 
> infuxdb.
> I find many strange measurements in metrics database of  influxdb, e.g. 
> "from table1".
> Does sql produce those measurements ? What is the meanings of those 
> measurements ?
> 
> --
> 
> tim yu
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 


Re: Time Temporal Join

2021-03-16 Thread Timo Walther

Hi Satyam,

first of all your initial join query can also work, you just need to 
make sure that no time attribute is in the SELECT clause. As the 
exception indicates, you need to cast all time attributes to TIMESTAMP. 
The reason for this is some major design issue that is also explained 
here where a time attribute must not be in the output of a regular join:


https://stackoverflow.com/a/64500296/806430

However, since you would like to perform the join "time-based" either 
interval join or temporal join might solve your use cases.


In your case I guess the watermark strategy of D is the problem. Are you 
sure the result is:


> Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
> Emit D watermark=0

and not:

> Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit D watermark=0
> Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> Emit D row=+I(2,"2",1970-01-01T00:00)@time=0

Or maybe the watermark is even dropped. Could you try to use a watermark 
strategy with


`R` - INTERVAL '0.001' SECONDS

I hope this helps.

Regards,
Timo



On 16.03.21 04:37, Satyam Shekhar wrote:

Hello folks,

I would love to hear back your feedback on this.

Regards,
Satyam

On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar > wrote:


Hello folks,

I am looking to enrich rows from an unbounded streaming table by
joining it with a bounded static table while preserving rowtime for
the streaming table. For example, let's consider table two tables F
and D, where F is unbounded and D is bounded. The schema for the two
tables is the following -

F:
  |-- C0: BIGINT
  |-- C1: STRING
  |-- R: TIMESTAMP(3) **rowtime**
  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS

D:
  |-- C0: BIGINT
  |-- C1: STRING NOT NULL

I'd like to run the following query on this schema -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
     from F join D ON F.C1 = D.C1
     group by D.C1, tumble(F.R, interval '1' second)

However, I run into the following error while running the above query -

"Rowtime attributes must not be in the input rows of a regular join.
As a workaround you can cast the time attributes of input tables to
TIMESTAMP before."

My understanding reading the docs is that Time Temporal Join is
meant to solve this problem. So I model table D as the following -

D:
  |-- C0: BIGINT
  |-- C1: STRING NOT NULL
  |-- R: TIMESTAMP(3)
  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
  |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1)

With column D.R always set to 0 and modify the query as follows -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
     from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1
     group by D.C1, tumble(F.R, interval '1' second)

The above query runs but does not return any result. I have the
following data in D initially -
Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
Emit D watermark=0

And F streams the following rows -
Emit F row=+I(0,"0",1970-01-01T00:00)@time=0
Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000
Emit F watermark=1000

I expect that two rows in F will join with matching rows (on C1) in
D and produce some output. But I do not see anything in the output.

So I have the following questions -

1. Is time temporal join the correct tool to solve this problem?
2. What could be the reason for not getting any output rows in the
result?

Thanks,
Satyam





Re: Find many strange measurements in metrics database of influxdb

2021-03-16 Thread Timo Walther

Hi Tim,

"from table1" might be the operator that reads "table1" also known as 
the table scan operator. Could you share more of the metrics and their 
values? Most of them should be explained in


https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#system-metrics

Regards,
Timo


On 16.03.21 06:46, Tim yu wrote:

Hi all,

I run many flink jobs that contains sql, they reports the metrics to 
infuxdb.
I find many strange measurements in metrics database of  influxdb, e.g. 
"from table1".
Does sql produce those measurements ? What is the meanings of those 
measurements ?


--

tim yu














Re: Editing job graph at runtime

2021-03-16 Thread Timo Walther

Hi Jessy,

to be precise, the JobGraph is not used at runtime. It is translated 
into an ExecutionGraph.


But nevertheless such patterns are possible but require a bit of manual 
implementation.


Option 1) You stop the job with a savepoint and restart the application 
with slightly different parameters. If the pipeline has not changed 
much, the old state can be remapped to the slightly modified job graph. 
This is the easiest solution but with the downside of maybe a couple of 
seconds downtime.


Option 2) You introduce a dedicated control stream (i.e. by using the 
connect() DataStream API [1]). Either you implement a custom sink in the 
main stream of the CoProcessFunction. Or you enrich every record in the 
main stream with sink parameters that are read by you custom sink 
implementation.


I hope this helps.

Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect


On 16.03.21 12:37, Jessy Ping wrote:

Hi Team,
Is it possible to edit the job graph at runtime ? . Suppose, I want to 
add a new sink to the flink application at runtime that depends upon 
the  specific parameters in the incoming events.Can i edit the jobgraph 
of a running flink application ?


Thanks
Jessy




Editing job graph at runtime

2021-03-16 Thread Jessy Ping
Hi Team,
Is it possible to edit the job graph at runtime ? . Suppose, I want to add
a new sink to the flink application at runtime that depends upon the
specific parameters in the incoming events.Can i edit the jobgraph of a
running flink application ?

Thanks
Jessy


Re: Handle late message with flink SQL

2021-03-16 Thread Timo Walther

Hi,

your explanation makes sense but I'm wondering how the implementation 
would look like. This would mean bigger changes in a Flink fork, right?


Late data handling in SQL is a frequently asked question. Currently, we 
don't have a good way of supporting it. Usually, we recommend to use 
DataStream API before Table API for branching late events into a 
separate processing pipeline.


Another idea (not well thought though) could be a temporal join at a 
later stage with a LookupTableSource that contains the late events to 
perform the "connect"?


Regards,
Timo


On 15.03.21 09:58, Yi Tang wrote:

We can get a stream from a DataStream api by SideOutput. But it's hard to do
the same thing with Flink SQL.

I have an idea about how to get the late records while using Flink SQL.

Assuming we have a source table for the late records, then we can query late
records on it. Obviously, it's not a real dynamic source table, it can be a
virtual source.

After optimizing, we can get a graph with some window aggregate nodes, which
can produced late records. And another graph for handling late records with
a virtual source node.

[scan] --> [group] --> [sink]

[virtual scan] --> [sink]

Then we can just "connect" these window nodes into the virtual source node.

The "connect" can be done by the following:

1. A side output node from each window node;
2. A mapper node may needed to encoding the record from the window node to
match the row type of virtual source;

[scan] --> [group] --> [sink]
  \
--> [side output] --> [mapper] --> [sink]


Does it make sense? Or is there another way in progress for the similar
purpose?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





[HEADS UP] Flink Community Survey

2021-03-16 Thread Ana Vasiliuk
Hi everyone!

We're constantly working to improve the Flink community experience and need
your help! Please take 2 min to share your thoughts with us via a short
survey [1].

Your feedback will help us understand where we stand on communication
between community members, what activities you prefer or would like to see
more of, and in general, identify opportunities for improvement. The survey
only takes 2 minutes (possibly less) and is anonymous.

Thanks a lot!

Ana

[1] https://form.typeform.com/to/M5JyHILk


Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Chesnay Schepler

Was there anything in the logs (ideally on debug)?
Have you debugged the execution and followed the counter() calls all the 
way to the reporter?
Do you only see JobManager metrics, or is there somewhere also something 
about the TaskManager?


I can see several issues with your code, but none that would fully 
explain the issue:


a) your reporter is not thread-safe
b) you only differentiate metrics by name, which will lead to quite a 
few collisions.


Be also aware that there will be 2 reporter instances; one for the JM 
and one for the TM.
To remedy this, I would recommend creating a factory that returns a 
static reporter instance instead; overall this tends to be cleaner.


Alternatively, when using the testing harnesses IIRC you can also set 
set a custom MetricGroup implementation.


On 3/16/2021 4:13 AM, Rion Williams wrote:

Hi all,

Recently, I was working on adding some custom metrics to a Flink job 
that required the use of dynamic labels (i.e. capturing various 
counters that were "slicable" by things like tenant / source, etc.).


I ended up handling it in a very naive fashion that would just keep a 
dictionary of metrics that had already been registered and update them 
accordingly which looked something like this:

class MyCustomProcessFunction:ProcessFunction() {
 private lateinit var metrics:CustomMetricsRegistry override fun 
open(parameters:Configuration) {
 metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
 }

 override fun 
processElement(event:Event,context:Context,collector:Collector) {
 // Insert calls like metrics.inc("tenant-name", 4) here }
}

class CustomMetricsRegistry(private val metricGroup:MetricGroup):Serializable {
 // Increments a given metric by key fun 
inc(metric:String,tenant:String,amount:Long =1) {
 // Store a key for the metric val key ="$metric-$tenant" // 
Store/register the metric if (!registeredMetrics.containsKey(key)){
 registeredMetrics[key] =metricGroup .addGroup("tenant",tenant)
 .counter(metric)
 }

 // Update the metric by a given amount 
registeredMetrics[key]!!.inc(amount)
 }

 companion object {
 private var registeredMetrics:HashMap = hashMapOf()
 }
}
Basically registering and updating new metrics for tenants as they are 
encountered, which I've seen being emitted as expected via hitting the 
appropriately configured metrics endpoint (using a PrometheusReporter).


However, while I was trying to write a few unit tests for this, I 
seemed to encounter an issue. I was following a Stack Overflow post 
that was answered by @Chesnay Schepler  [0] 
that described the use of an in-memory/embedded Flink cluster and a 
custom reporter that would statically expose the underlying metrics.


So I took a shot at implementing something similar as follows:

*Flink Cluster Definition*
private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
 ConfigConstants.METRICS_REPORTER_PREFIX +
 "MockCustomMetricsReporter." +
 ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
MockCustomMetricsReporter::class.java.name ))

@ClassRule @JvmField val flinkCluster =MiniClusterResource(
 MiniClusterResourceConfiguration.Builder()
 .setConfiguration(metricsConfiguration)
 .setNumberTaskManagers(1)
 .setNumberSlotsPerTaskManager(1)
 .build()
)
*Custom Reporter*
class MockCustomMetricsReporter :MetricReporter {

 override fun open(metricConfig:MetricConfig) {}
 override fun close() {}
 override fun 
notifyOfAddedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
 // Store the metrics that are being registered as we see them if 
(!registeredCustomMetrics.containsKey(name)){
 registeredCustomMetrics[name] =metric }
 }

 override fun 
notifyOfRemovedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
 // Do nothing here }

 companion object {
 // Static reference to metrics as they are registered var 
registeredCustomMetrics =HashMap()

 }
}
*Example Test*
@Test fun `Example Metrics Use Case`(){
 // Arrange val stream =StreamExecutionEnvironment.getExecutionEnvironment()
 val events =listOf(
 eventWithUsers("tenant1","us...@testing.com 
"),
 eventWithUsers("tenant2","us...@testing.com 
"),
 )

 // Act stream
 .fromCollection(events)
 .process(MyCustomProcessFunction())

 // Assert stream.execute()
 assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
}
While this test will pass, *the problem is that the custom metrics 
defined dynamically (via the CustomMetricsRegistry implementation) do 
not appear within the registeredCustomMetrics collection*. In fact, 
there are 21 metrics that get registered but all of them appear to be 
classic out-of-the-box metrics such as CPU usage, number of t

Re: Is it possible to mount node local disk for task managers in a k8s application cluster?

2021-03-16 Thread Chen-Che Huang
Hi Yang,

Thanks for the reply. Looking forward to 1.13 :)

Best wishes,
Chen-Che

On 2021/03/16 07:41:18, Yang Wang  wrote: 
>  I think the pod template[1] is what you are looking for. It will be
> released in 1.13.
> 
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#pod-template
> 
> Best,
> Yang
> 
> Chen-Che Huang  于2021年3月16日周二 下午1:26写道:
> 
> > Hi,
> >
> > We use the per-job deployment mode to deploy our Flink services on
> > Kubernetes. We're considering to move from the per-job mode to the
> > application mode in view of the advantages of the application mode.
> > However, it seems that `bin/flink run-application --target
> > kubernetes-application` does not support to configure the task managers to
> > mount volumes from the node local disk (volumes - hostPath) according to
> > https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#kubernetes.
> > If it is feasible, please let me know which option should be used. If not,
> > could I create an issue for this feature. Thanks.
> >
> > Best regards,
> > Chen-Che Huang
> >
> 


Re: Is it possible to mount node local disk for task managers in a k8s application cluster?

2021-03-16 Thread Yang Wang
 I think the pod template[1] is what you are looking for. It will be
released in 1.13.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#pod-template

Best,
Yang

Chen-Che Huang  于2021年3月16日周二 下午1:26写道:

> Hi,
>
> We use the per-job deployment mode to deploy our Flink services on
> Kubernetes. We're considering to move from the per-job mode to the
> application mode in view of the advantages of the application mode.
> However, it seems that `bin/flink run-application --target
> kubernetes-application` does not support to configure the task managers to
> mount volumes from the node local disk (volumes - hostPath) according to
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#kubernetes.
> If it is feasible, please let me know which option should be used. If not,
> could I create an issue for this feature. Thanks.
>
> Best regards,
> Chen-Che Huang
>


Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-16 Thread Alexey Trenikhun
Also restore from same savepoint without change in parallelism works fine.


From: Alexey Trenikhun 
Sent: Monday, March 15, 2021 9:51 PM
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

No, I believe original exception was from 1.12.1 to 1.12.1

Thanks,
Alexey


From: Yun Tang 
Sent: Monday, March 15, 2021 8:07:07 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Can you scale the job at the same version from 1.12.1 to 1.12.1?

Best
Yun Tang


From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 4:46
To: Tzu-Li (Gordon) Tai ; user@flink.apache.org 

Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Savepoint was taken with 1.12.1, I've tried to scale up using same version and 
1.12.2


From: Tzu-Li (Gordon) Tai 
Sent: Monday, March 15, 2021 12:06 AM
To: user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Could you provide info on the Flink version used?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/