Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-09 Thread Yun Gao
Hi Chirag,

Logically Integer type should not have this issue. Sorry that from the current 
description I 
have not found other issues, could you also share the code in the main method 
that
adds the KeyProcessFunction into the job ? Very thanks!

Best,
Yun


--
From:Chirag Dewan 
Send Time:2021 Jun. 9 (Wed.) 15:15
To:User ; Yun Gao 
Subject:Re: Multiple Exceptions during Load Test in State Access APIs with 
RocksDB

Thanks for the reply Yun.

The key is an Integer type. Do you think there can be hash collisions for 
Integers?

It somehow works on single TM now. No errors for 1m records.
But as soon as we move to 2 TMs, we get all sort of errors - 'Position Out of 
Bound', key not in Keygroup etc.

This also causes a NPE in the user defined code -

if (valueState != null)
valueState.value() -> This causes Null, so while the if check passed, it 
caused an NPE while reading the value.

Thanks,
Chirag

 On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao  
wrote: 


Hi Chirag,

As far as I know, If you are running a single job, I think all th pods share 
the same 
state.checkpoints.dir configuration should be as expected, and it is not 
necessary 
to configuraiton the rocksdb local dir since Flink will chosen a default dir.

Regarding the latest exception, I think you might first check the key type used 
and 
the key type should has a stable hashcode method. 

Best,
Yun




 --Original Mail --
Sender:Chirag Dewan 
Send Date:Tue Jun 8 18:06:07 2021
Recipients:User , Yun Gao 
Subject:Re: Multiple Exceptions during Load Test in State Access APIs with 
RocksDB
Hi,

Although this looks like a problem to me, I still cant conclude it. 

I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I 
was hoping that with single TM there will be file write conflicts. But that 
doesn't seem to be the case as still get the:


Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalArgumentException: Key group 2 is not in 
KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
I have checked that there's no concurrent access on the ValueState.

Any more leads?

Thanks,
Chirag


 On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan 
 wrote: 


Hi,

I think I got my issue. Would help if someone can confirm it :)

I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 

All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.

Does this lead to state corruption?

Thanks,
Chirag



 On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote: 


Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.

Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?

 On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote: 


Hi Chirag,

If be able to produce the exception, could you first add some logs to print
the value of valueState, valueState.value(), inEvent and 
inEvent.getPriceDelta() ?
I think either object being null would cause NullPointerException here. 

For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-18587

Best,
Yun


 --Original Mail --
Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021
Recipients:User 
Subject:Multiple Exceptions during Load Test in State Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 
I have 2 Task Managers with 2 taskslots and 4 cores each. 
Below is our setup:
Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic
public class Aggregator_KeyedExpression extendsKeyedProcessFunction {
private ValueStatevalueState;
@Override
public void open() throws Exception {
ValueStateDescriptor descriptor =
   new ValueStateDescriptor(
   "totalPrize",Integer.class);
valueState =getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)
   throws Exception {
if(valueState.value() == null) {
   valueState.update(0);
}
valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line
int sum =valueState.value();
GameZoneOutputoutput = new GameZoneOutput();
   output.setPlayerId(inEvent.getPlayerId());
   output.setNetPrize(sum);
   outEvents.add(output);

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-09 Thread Yun Gao
Very thanks Kezhu for the catch, it also looks to me the same issue as 
FLINK-21028. 


--
From:Piotr Nowojski 
Send Time:2021 Jun. 9 (Wed.) 22:12
To:Kezhu Wang 
Cc:Thomas Wang ; Yun Gao ; user 

Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Yes good catch Kezhu, IllegalStateException sounds very much like FLINK-21028.

Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't been 
released yet)?

Piotrek
wt., 8 cze 2021 o 17:18 Kezhu Wang  napisał(a):

Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t 
stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?


[1]: https://issues.apache.org/jira/browse/FLINK-21028


Best,
Kezhu Wang 
On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote: 
Hi Thomas,

I tried but do not re-produce the exception yet. I have filed 
an issue for the exception first [1].



[1] https://issues.apache.org/jira/browse/FLINK-22928


 --Original Mail --
Sender:Thomas Wang 
Send Date:Tue Jun 8 07:45:52 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API
This is actually a very simple job that reads from Kafka and writes to S3 using 
the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing 
custom.

Thomas
On Sun, Jun 6, 2021 at 6:43 PM Yun Gao  wrote:
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as 
expected to me... 
Could you also show us the dag of the job ? And does some operators in the 
source task
use multiple-threads to emit records?

Best,
Yun


 --Original Mail --
Sender:Thomas Wang 
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped 
correctly. Maybe that's because I'm using a Parquet file sink which is a 
bulk-encoded format and only writes to disk during checkpoints?

Thomas
On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang  wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not 
quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
at 
org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskAct

FileSource may cause akka.pattern.AskTimeoutException, and akka.ask.timeout not workable

2021-06-09 Thread 陳樺威
Hello all,

Our team encounter *akka.pattern.AskTimeoutException *when start
jobmanager. Base on the error message, we try to setup *akka.ask.timeout *
and* web.timeout *to 360s, but both of them doesn't work.

We guess the issue may cause by *FileSource.forRecordFileFormat.* The
application will load files in batch mode to rebuild our historical data.
The job can run normally in small batch. But it will be broken when run
over lots of files. (around 3 files distributed in 1500 folders)

The flink application is on kubernetes in application mode and files stores
in Google Cloud Storage.

Our questions are,
1. How to enlarge akka.ask.timeout correctly in our case?
2. Is it cause by FileSource? If yes, could you provide some suggestions to
prevent it?


Following is our settings.
```
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: kubernetes.container.image, */:**.*.**
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: fs.hdfs.hadoopconfig, /opt/flink/conf
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 4
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: kubernetes.rest-service.exposed.type, ClusterIP
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: high-availability.jobmanager.port, 6123
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: akka.ask.timeout, 360s
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.backend.rocksdb.memory.write-buffer-ratio, 0.7
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: metrics.reporter.prom.class,
org.apache.flink.metrics.prometheus.PrometheusReporter
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.storage.fs.memory-threshold, 1048576
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.checkpointing.unaligned, true
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: web.timeout, 100
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.target, kubernetes-application
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: restart-strategy.fixed-delay.attempts, 2147483647
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.memory.process.size, 8g
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.rpc.port, 6122
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: akka.framesize, 104857600b
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: containerized.master.env.HADOOP_CLASSPATH,
/opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.attached, true
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: internal.cluster.execution-mode, NORMAL
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: high-availability,
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
2021-06-10 03:44:14,320 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property:
execution.checkpointing.externalized-checkpoint-retention,
DELETE_ON_CANCELLATION
2021-06-10 03:44:14,320 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property

How to gracefully handle job recovery failures

2021-06-09 Thread Li Peng
Hey folks, we have a cluster with HA mode enabled, and recently after doing
a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 2.12)
crashed and was stuck in a crash loop, with the following error:

2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error
occurred in the cluster entrypoint.
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
id .
at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover
job with job id .
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 common frames omitted
Caused by: org.apache.flink.util.FlinkException: Could not retrieve
submitted JobGraph from state handle under
/. This indicates that the retrieved state
handle is broken. Try cleaning the state handle store.
at
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
... 7 common frames omitted
Caused by: java.io.FileNotFoundException: No such file or directory:
s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
at
org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
at
org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
... 8 common frames omitted

We have an idea of why the file might be gone and are addressing it, but my
question is: *how can I configure this in such a way so that a missing job
file doesn't trap the cluster in a forever restart loop?* Is there some
setting to just treat this like a complete fresh deployment if the recovery
file is missing?

Thanks!
Li


confused about `TO_TIMESTAMP` document description

2021-06-09 Thread Tony Wei
Hi Expert,

this document [1] said `TO_TIMESTAMP` will use the session time zone to
convert date time string into a timestamp.
If I understand correctly, when I set session time zone to `Asia/Shanghai`
and query `SELECT TO_TIMESTAMP('1970-01-01 08:00:00');`,
the result should be epoch timestamp `0` (i.e. '1970-01-01 08:00:00 UTC+8').

TO_TIMESTAMP(string1[, string2])

Converts date time string *string1* with format *string2* (by default:
'-MM-dd HH:mm:ss') under the session time zone (specified by
TableConfig) to a timestamp.

Only supported in blink planner.
However, I found that result is not same as I expected. I tested it by
running the below query under the `Asia/Shanghai` timezone:

SELECT
> CAST(TO_TIMESTAMP(FROM_UNIXTIME(0)) AS BIGINT),

FROM_UNIXTIME(0),

TO_TIMESTAMP(FROM_UNIXTIME(0));


and I got the result like

  EXPR$0EXPR$1EXPR$2
> 28800   1970-01-01 08:00:00  1970-01-01T08:00


The `FROM_UNIXTIME` did convert the epoch timestamp to string format based
on session time zone, but `FROM_UNIXTIME` didn't.
Therefore, I got the `28800` when I cast timestamp into bigint. The result
actually shift 8 hours.

I found this code snippet [2] might be related to `TO_TIMESTAMP` udf, and
seems like it won't set use any timezone configuration, so maybe the
document might be wrong.

Please correct me if I misunderstood something. Thank you.

best regards,

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#temporal-functions
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java#L322:L377


Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Joseph Lorenzini




Hi Arvid,
 
I may have figured out the problem. 
 
When using a tumbling window on a keyed stream and event time is being used, does time only move forward when there’s an event with a newer timestamp? Said another way: if I have a 5 second tumbling window, the job would need to consume
 at least two events before a computation would occur: the first event has a timestamp that fits within the 5 second window, the second event has timestamp that exceeds the max timestamp of the previous window.

 
Does that sound right?
 
Thanks,
Joe 
 

From: Arvid Heise 
Date: Wednesday, June 9, 2021 at 8:34 AM
To: Joseph Lorenzini 
Cc: "user@flink.apache.org" 
Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record


 



Hi Joe,


 


could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.


On which Flink version are you running?


 


On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini  wrote:




Hi all,
 
I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this
 expected and if there’s a way to work around it. 
 
I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each
 kafka topic, I configured the flink kafka consumer like so:
 
   consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofSeconds(10))
    );
 
The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events
 are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.
 
Is this expected and if it how do you handle this use case?
 
Thanks,
Joe


Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone.
 In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message
 that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.





Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Re: DataStream API in Batch Execution mode

2021-06-09 Thread Marco Villalobos
That worked.  Thank you very much.

On Mon, Jun 7, 2021 at 9:23 PM Guowei Ma  wrote:

> Hi, Macro
>
> I think you could try the `FileSource` and you could find an example from
> [1]. The `FileSource` would scan the file under the given
> directory recursively.
> Would you mind opening an issue for lacking the document?
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
> Best,
> Guowei
>
>
> On Tue, Jun 8, 2021 at 5:59 AM Marco Villalobos 
> wrote:
>
>> How do I use a hierarchical directory structure as a file source in S3
>> when using the DataStream API in Batch Execution mode?
>>
>> I have been trying to find out if the API supports that,
>> because currently our data is organized by years, halves, quarters, months,
>> and but before I launch the job, I flatten the file structure just to
>> process the right set of files.
>>
>>
>>


Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Arvid Heise
Hi Joe,

Yes, that is correct! Only when a new record arrives and we know that
timestamp, we can deduce the watermark and advance it. The window operator
would close the old window and open a new one.

Sorry that I haven't seen that immediately. It's sometimes hard to think in
terms of individual records when you are used to think in millions.

On Wed, Jun 9, 2021 at 8:49 PM Joseph Lorenzini 
wrote:

> Hi Arvid,
>
>
>
> I may have figured out the problem.
>
>
>
> When using a tumbling window on a keyed stream and event time is being
> used, does time only move forward when there’s an event with a newer
> timestamp? Said another way: if I have a 5 second tumbling window, the job
> would need to consume at least two events before a computation would occur:
> the first event has a timestamp that fits within the 5 second window, the
> second event has timestamp that exceeds the max timestamp of the previous
> window.
>
>
>
> Does that sound right?
>
>
>
> Thanks,
>
> Joe
>
>
>
> *From: *Arvid Heise 
> *Date: *Wednesday, June 9, 2021 at 8:34 AM
> *To: *Joseph Lorenzini 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Records Are Never Emitted in a Tumbling Event Window When
> Each Key Only Has One Record
>
>
>
> Hi Joe,
>
>
>
> could you please check (in web UI) if the watermark is advancing past the
> join? The window operator would not trigger if it doesn't advance.
>
> On which Flink version are you running?
>
>
>
> On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini 
> wrote:
>
> Hi all,
>
>
>
> I have observed behavior joining two keyed streams together, where events
> are never emitted.  The source of each stream is a different kafka topic. I
> am curious to know if this expected and if there’s a way to work around it.
>
>
>
> I am using a tumbling event window. All records across the two kafka
> topics occurred within the same 5 second window of time. Each kafka topic
> has a single partition.  For each kafka topic, I configured the flink kafka
> consumer like so:
>
>
>
>consumer.assignTimestampsAndWatermarks(
>
> WatermarkStrategy
>
>
> .forBoundedOutOfOrderness(Duration.ofSeconds(10))
>
> .withIdleness(Duration.ofSeconds(10))
>
> );
>
>
>
> The tumbling window has a duration of 60 seconds. Now it happens to be the
> case that there is only a single event when joining on a key.  If I use
> Tumbling Process window then events are emitted as expected. If I actually
> ensure there are multiple events for a key then the events are also
> emitted. However, if it’s a single event per key in a tumbling event window
> then no events are emitted.
>
>
>
> Is this expected and if it how do you handle this use case?
>
>
>
> Thanks,
>
> Joe
>
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Joseph Lorenzini




Hi Arvid,
 
I am on 1.11.2. 
 
The flink job has four operators:
 

Source from kakfa topic one: sent 14 recordsSource from kafka topic two: sent 6 recordsMap: received 15 records/sent 14 recordsMap: received 6 records/sent 6 recordsTumbling Window to Filesink: received 20 records/sent 0 records
 
The watermark is the same for the map operators and the tumbling window, which is to say that between the map and tumbling window the watermark did not advance.

 
Any idea why that might be happening? I did notice that the timestamps for all kafka records are within a fraction of a second of one another. For example:

 

2021-06-09T08:57:00.993-05:002021-06-09T08:57:00.997-05:00
 
I also noted that some kafka records in topic A have the exact same timestamp as records in topic B.

 
Could timestamps not being far enough part (e.g millisecond or more) or two records between two soruces have the exact same time,  cause the watermarks to not advance?
 
 
Joe 
 

From: Arvid Heise 
Date: Wednesday, June 9, 2021 at 8:34 AM
To: Joseph Lorenzini 
Cc: "user@flink.apache.org" 
Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record


 



Hi Joe,


 


could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.


On which Flink version are you running?


 


On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini  wrote:




Hi all,
 
I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this
 expected and if there’s a way to work around it. 
 
I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each
 kafka topic, I configured the flink kafka consumer like so:
 
   consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofSeconds(10))
    );
 
The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events
 are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.
 
Is this expected and if it how do you handle this use case?
 
Thanks,
Joe


Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone.
 In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message
 that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.





Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-09 Thread Piotr Nowojski
Yes good catch Kezhu, IllegalStateException sounds very much like
FLINK-21028.

Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't
been released yet)?

Piotrek

wt., 8 cze 2021 o 17:18 Kezhu Wang  napisał(a):

> Could it be same as FLINK-21028[1] (titled as “Streaming application
> didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?
>
>
> [1]: https://issues.apache.org/jira/browse/FLINK-21028
>
>
> Best,
> Kezhu Wang
>
> On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote:
>
> Hi Thomas,
>
> I tried but do not re-produce the exception yet. I have filed
> an issue for the exception first [1].
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-22928
>
>
> --Original Mail --
> *Sender:*Thomas Wang 
> *Send Date:*Tue Jun 8 07:45:52 2021
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API
>
>> This is actually a very simple job that reads from Kafka and writes to S3
>> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
>> and nothing custom.
>>
>> Thomas
>>
>> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao  wrote:
>>
>>> Hi Thoms,
>>>
>>> Very thanks for reporting the exceptions, and it seems to be not work as
>>> expected to me...
>>> Could you also show us the dag of the job ? And does some operators in
>>> the source task
>>> use multiple-threads to emit records?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*Thomas Wang 
>>> *Send Date:*Sun Jun 6 04:02:20 2021
>>> *Recipients:*Yun Gao 
>>> *CC:*user 
>>> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
>>>
 One thing I noticed is that if I set drain = true, the job could be
 stopped correctly. Maybe that's because I'm using a Parquet file sink which
 is a bulk-encoded format and only writes to disk during checkpoints?

 Thomas

 On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang  wrote:

> Hi Yun,
>
> Thanks for the tips. Yes, I do see some exceptions as copied below.
> I'm not quite sure what they mean though. Any hints?
>
> Thanks.
>
> Thomas
>
> ```
> 2021-06-05 10:02:51
> java.util.concurrent.ExecutionException:
> org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to
> next operator
> at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture
> .java:1928)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:130)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:134)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:80)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .closeOperators(OperatorChain.java:302)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .afterInvoke(StreamTask.java:576)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:544)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to
> next operator
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
> at org.apache.flink.streaming.api.operators.CountingOutput
> .emitWatermark(CountingOutput.java:41)
> at org.apache.flink.streaming.runtime.operators.
> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
> TimestampsAndWatermarksOperator.java:165)
> at org.apache.flink.api.common.eventtime.
> BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
> BoundedOutOfOrdernessWatermarks.java:69)
> at org.apache.flink.streaming.runtime.operators.
> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
> .java:125)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .lambda$closeOperator$5(StreamOperatorWrapper.java:205)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
> .runThrowing(StreamTaskActionExecutor.java:92)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .closeOperator(StreamOperatorWrapper.java:203)
> at org.apache.flink.

NPE when restoring from savepoint in Flink 1.13.1 application

2021-06-09 Thread 陳昌倬
Hi,

We have NullPointerException when trying to restore from savepoint for
the same jar, or different jar, or different parallelism.  We have
workaround this issue by changing UIDs in almost all operators. We want
to know if there is anyway to avoid this problem so that it will not
cause service maintence problem, thanks.


The following is redacted stack trace we can provide for now:

2021-06-09 13:08:59,849 WARN  
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
Could not execute application:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute job ''.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
 [?:?]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
''.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at  ~[?:?]
at  ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) ~[?:?]
at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 12 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
at 
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
 ~[?:?]
... 1 more
Caused by: org.apache.flink.runtime.client.JobInitializationException: 
Could not start the JobMaster.
at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.postComplete(Comp

PyFlink: Upload resource files to Flink cluster

2021-06-09 Thread Sumeet Malhotra
Hi,

I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON
schema files actually). The path of this file can be passed into the UDTF,
but essentially this path needs to exist on the Task Manager node where the
task executes. What's the best way to upload these resource files? As of
now, my custom Flink image creates a fixed path with the required resource
files, but I'd like it to be run time configurable.

There are 2 APIs available to load files when submitting a PyFlink job...

*stream_execution_environment.add_python_file()* - Recommended to upload
files (.py etc) but doesn't let me configure the final path on the target
node. The files are added to PYTHONPATH, but it needs the UDTF function to
lookup for this file. I'd like to pass the file location into the UDTF
instead.

*stream_execution_environment.add_python_archive()* - Appears to be more
generic, in the sense that it allows a target directory to be specified.
The documentation doesn't say anything about the contents of the archive,
so I'm guessing it could be any type of file. Is this what is needed for my
use case?

Or is there any other recommended way to upload non-Python
dependencies/resources?

Thanks in advance,
Sumeet


ProcessFunctionTestHarnesses for testing Python functions

2021-06-09 Thread Bogdan Sulima
Hi all,

in Java/Scala i was using ProcessFunctionTestHarnesses to test my
ProcessFunctions with timers based on event timestamps.
Now i am switching to Python (my favourite language). Is there a similar
TestHarness to support testing Python ProcessFunctions?

Thanks for your answers in advance.

Regards
Bogdan.


Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Arvid Heise
Hi Joe,

could you please check (in web UI) if the watermark is advancing past the
join? The window operator would not trigger if it doesn't advance.

On which Flink version are you running?

On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini 
wrote:

> Hi all,
>
>
>
> I have observed behavior joining two keyed streams together, where events
> are never emitted.  The source of each stream is a different kafka topic. I
> am curious to know if this expected and if there’s a way to work around it.
>
>
>
> I am using a tumbling event window. All records across the two kafka
> topics occurred within the same 5 second window of time. Each kafka topic
> has a single partition.  For each kafka topic, I configured the flink kafka
> consumer like so:
>
>
>
>consumer.assignTimestampsAndWatermarks(
>
> WatermarkStrategy
>
>
> .forBoundedOutOfOrderness(Duration.ofSeconds(10))
>
> .withIdleness(Duration.ofSeconds(10))
>
> );
>
>
>
> The tumbling window has a duration of 60 seconds. Now it happens to be the
> case that there is only a single event when joining on a key.  If I use
> Tumbling Process window then events are emitted as expected. If I actually
> ensure there are multiple events for a key then the events are also
> emitted. However, if it’s a single event per key in a tumbling event window
> then no events are emitted.
>
>
>
> Is this expected and if it how do you handle this use case?
>
>
>
> Thanks,
>
> Joe
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Re: [table-walkthrough exception] Unable to create a source for reading table...

2021-06-09 Thread Arvid Heise
Hi Lingfeng,

could you try

org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}



to your pom?

On Wed, Jun 9, 2021 at 5:04 AM Lingfeng Pu  wrote:

> Hi,
>
> I'm following the tutorial to run the "flink-playground/table-walkthrough"
> project on IDEA. However, I got *the exception as follows:*
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a source for reading table
> 'default_catalog.default_database.transactions'.
>
> *The key localhost environment info shows below:*
> 1. OS: Fedora 34; 2. Flink version: 1.13.1;
> 3. Java version: 1.8; 4. Maven version: 3.6.3;
> 5. Docker version: 20.10.7 (API version: 1.41).
>
> *The entire error report shows below:*
> /usr/java/jdk1.8.0_291-amd64/bin/java
> -javaagent:/var/lib/snapd/snap/intellij-idea-community/302/lib/idea_rt.jar=46805:/var/lib/snapd/snap/intellij-idea-community/302/bin
> -Dfile.encoding=UTF-8 -classpath
> /usr/java/jdk1.8.0_291-amd64/jre/lib/charsets.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/deploy.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/cldrdata.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/dnsns.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/jaccess.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/jfxrt.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/localedata.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/nashorn.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunec.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunjce_provider.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunpkcs11.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/zipfs.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/javaws.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jce.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jfr.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jfxswt.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jsse.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/management-agent.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/plugin.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/resources.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/rt.jar:/home/AkatsukiG5/IdeaProjects/flink-playgrounds-master/table-walkthrough/target/classes:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-java/1.12.1/flink-table-api-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-common/1.12.1/flink-table-common-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-connector-files/1.12.1/flink-connector-files-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-connector-base/1.12.1/flink-connector-base-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-asm-7/7.1-12.0/flink-shaded-asm-7-7.1-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/slf4j/slf4j-api/1.7.15/slf4j-api-1.7.15.jar:/home/AkatsukiG5/Documents/mavenRepo/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/force-shading/1.12.1/force-shading-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-java-bridge_2.11/1.12.1/flink-table-api-java-bridge_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-java/1.12.1/flink-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-streaming-java_2.11/1.12.1/flink-streaming-java_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-file-sink-common/1.12.1/flink-file-sink-common-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-guava/18.0-12.0/flink-shaded-guava-18.0-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-streaming-scala_2.11/1.12.1/flink-streaming-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-scala_2.11/1.12.1/flink-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-reflect/2.11.12/scala-reflect-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-compiler/2.11.12/scala-compiler-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-xml_2.11/1.0.5/scala-xml_2.11-1.0.5.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-planner-blink_2.11/1.12.1/flink-table-planner-blink_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-scala_2.11/1.12.1/flink-table-api-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-scala-bridge_2.11/1.12.1/flink-table-api-scala-bridge_2.11-1.12.1.jar:/home/AkatsukiG5/Document

RE: Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Zhang,

Please find the code snippet.

private ReducingState aggrRecord; // record being aggregated

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {
// FIXME timer is not working? or re-registration not working?
NatLogData event = aggrRecord.get();   // Always get null value.


Thanks,
Suchithra


From: JING ZHANG 
Sent: Wednesday, June 9, 2021 2:20 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org; Jash, Shaswata (Nokia - IN/Bangalore) 

Subject: Re: Issue with onTimer method of KeyedProcessFunction

Hi Suchithra,
Would you please provide more information in detail or paste the main code?

Best regards,
JING ZHANG

V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 于2021年6月9日周三 下午3:42写道:
Hello,

We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3 
version. After upgrading to 1.12.3 version,  the onTimer method of  
KeyedProcessFunction is not behaving correctly, the value of ReducingState and 
ValueState always return null.

Could you please help in debugging the issue.

Thanks,
Suchithra



Re: Using s3 bucket for high availability

2021-06-09 Thread Kurtis Walker
Thank you, I figured it out.  My IAM policy was missing some actions.  Seems I 
needed to give it “*” for it to work.

From: Tamir Sagi 
Date: Wednesday, June 9, 2021 at 6:02 AM
To: Yang Wang , Kurtis Walker 

Cc: user 
Subject: Re: Using s3 bucket for high availability
EXTERNAL EMAIL

I'd try several things

try accessing the bucket from CLI first locally
 
https://docs.aws.amazon.com/cli/latest/reference/s3/

If it does not work
please check your credentials under ~/.aws/credentials file + ~/.aws/config = 
since the AWS clients read the credentials from these files by default(unless 
other credentials are set)

If everything works well:

  1.  Are you running inside EKS? if so, you must attach the pods a service 
account with corresponded permissions to S3.
  2.  If not,  make sure the pods have the credentials to AWS(access key, 
secret key, region)
Please provide more code snippet.

I recently ran Flink job on Application cluster in EKS. the job also reads 
files from S3. (Without HA)

Tamir

[https://my-email-signature.link/signature.gif?u=1088647&e=157997735&v=61757801dda4474f1dcdec8227f1c40523846082fd5a0e52f68e63cfd6b92721]

From: Yang Wang 
Sent: Wednesday, June 9, 2021 11:29 AM
To: Kurtis Walker 
Cc: user 
Subject: Re: Using s3 bucket for high availability


EXTERNAL EMAIL

It seems to be a S3 issue. And I am not sure it is the root cause. Could you 
please share more details of the JobManager log?

Or could you verify that the Flink cluster could access the S3 bucket 
successfully(e.g. store the checkpoint) when HA is disabled?

Best,
Yang

Kurtis Walker mailto:kurtis.wal...@sugarcrm.com>> 
于2021年6月8日周二 下午11:00写道:

Sorry, fat finger send before I finished writing….



Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:



kubernetes.service-account: flink-service-account

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: 
s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery



I’m getting an error accessing the bucket.



2021-06-08 14:33:42,189 DEBUG com.amazonaws.services.s3.AmazonS3Client  
   [] - Bucket region cache doesn't have an entry for 
corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket 
region from Amazon S3.

2021-06-08 14:33:42,193 DEBUG com.amazonaws.util.json.Jackson   
   [] - Failed to parse JSON string.

com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map 
due to end-of-input

at [Source: (String)""; line: 1, column: 0]

at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-s3-fs-presto-1.13.0.jar:1.13.0]



Is there an additional config I need for specifying the region for the bucket?  
I’ve been searching the doc and haven’t found anything like that.





From: Kurtis Walker 
mailto:kurtis.wal...@sugarcrm.com>>
Date: Tuesday, June 8, 2021 at 10:55 AM
To: user mailto:user@flink.apache.org>>
Subject: Using s3 bucket for high availability

Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: Persisting state in RocksDB

2021-06-09 Thread Arvid Heise
Hi Paul,

Welcome to the club!

What's your SinkFunction? Is it custom? If so, you could also implement
CheckpointedFunction to read and write data.
Here you could use OperatorStateStore and with it the BroadcastState.
However, broadcast state is quite heavy (it sends all data to all
instances, so it doesn't scale).

A better way would be to have a keyBy+KeyedProcessFunction before the sink
function. You could keyBy your key and use a normal value state [1] to
store the data point. If you configure your state backend to be rocksdb
[2]. Then you have everything together.

Note that you could also have it next to sink function. There is no reason
to not have a dangling operator (no sink).

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-keyed-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/

On Wed, Jun 9, 2021 at 12:13 AM Paul K Moore  wrote:

> Hi all,
>
> First post here, so please be kind :)
>
> Firstly some context; I have the following high-level job topology:
>
> (1) FlinkPulsarSource -> (2) RichAsyncFunction -> (3) SinkFunction
>
> 1. The FlinkPulsarSource reads event notifications about article updates
> from a Pulsar topic
> 2. The RichAsyncFunction fetches the “full” article from the specified URL
> end-point, and transmutes it into a “legacy” article format
> 3. The SinkFunction writes the “legacy” article to a (legacy) web platform
> i.e. the sink is effectively another web site
>
> I have this all up and running (despite lots of shading fun).
>
> When the SinkFunction creates an article on the legacy platform it returns
> an 'HTTP 201 - Created’ with a Location header suitably populated.
>
> Now, I need to persist that Location header and, more explicitly, need to
> persist a map between the URLs for the new and legacy platforms.  This is
> needed for latter update and delete processing.
>
> The question is how do I store this mapping information?
>
> I’ve spent some time trying to grok state management and the various
> backends, but from what I can see the state management is focused on
> “operator scoped” state.  This seems reasonable given the requirement for
> barriers etc to ensure accurate recovery.
>
> However, I need some persistence between operators (shared state?) and
> with longevity beyond the processing of an operator.
>
> My gut reaction is that I need an external K-V store such as Ignite (or
> similar). Frankly given that Flink ships with embedded RocksDB I was hoping
> to use that, but there seems no obvious way to do this, and lots of advice
> saying don’t :)
>
> Am I missing something obvious here?
>
> Many thanks in advance
>
> Paul
>
>
>


Re: subscribe

2021-06-09 Thread Arvid Heise
To subscribe, please send a mail to user-subscr...@flink.apache.org

On Fri, Jun 4, 2021 at 4:56 AM Boyang Chen 
wrote:

>
>


Re: Flink kafka consumers stopped consuming messages

2021-06-09 Thread Ilya Karpov
Hi Arvid,
thanks for reply,

thread dump + logs research didn’t help. We suggested that problem was in async 
call to remote key-value storage because we (1) found that async client timeout 
was set to 0 (effectively no timeout, idle infinitely), (2) async client 
threads we sleeping, (3) AsyncWaitOperator.Emitter thread was blocked peeking 
new async result while AsyncWaitOperator.processWatermak was blocked to put new 
item in a queue. We changed timeout to non zero value and since then (for a 
week or so) job doesn’t hang. So, I guess the problem was in async client 
timeout (not in kafka or flink).

Hope this helps someone!

> 9 июня 2021 г., в 14:10, Arvid Heise  написал(а):
> 
> Hi Ilya,
> 
> These messages could pop up when a Kafka broker is down but should eventually 
> disappear. So I'm a bit lost. 
> 
> If there was a bug, it's also most likely fixed in the meantime. So if you 
> want to be on the safe side, I'd try to upgrade to more recent versions 
> (Flink + Kafka consumer).
> 
> Best,
> 
> Arvid
> 
> On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov  > wrote:
> Hi there,
> 
> today I've observed strange behaviour of a flink streaming application (flink 
> 1.6.1, per-job cluster deployment, yarn):
> 3 task managers (2 slots each) are running but only 1 slot is actually 
> consuming messages from kafka (v0.11.0.2), others were idling 
> (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics). 
> 
> So I started to investigate: 
> - `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all 
> 6 topic partitions are constantly increasing.
> - `kafka-consumer-groups.sh` listed only single (the 4th) partition. That 
> makes me thinks that by somehow 5 kafka consumers lost connection to brokers.
> - A LOT of messages "Committing offsets to Kafka takes longer than the 
> checkpoint interval. Skipping commit of previous offsets because newer 
> complete checkpoint offsets are available. This does not compromise Flink's 
> checkpoint integrity." in each task manager instance.
> - 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
> - no application errors/uncaught exceptions etc.
> - no reconnections to kafka.
> - some network issues connected with hdfs (Slow waitForAckedSeqno).
> - all kafka networking setting are default (e.g. timeouts).
> 
> After job restart all task managers started to consume messages (6 slots in 
> total, and `kafka-consumer-groups.sh` listed that all 6 partitions are 
> consumed).
> 
> May be someone had already experienced something similar?
> 
> Job topology is as follows (no window operations!):
> ```
> val dataStream = env.addSource(kafkaSource).map(processor);
> 
> val terminalStream = AsyncDataStream
> .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
> .process(sideOutputFun);
> 
> terminalStream
> .keyBy(selector)
> .process(keyProcFun)
> .addSink(kafkaSink_1);
> 
> terminalStream
> .getSideOutput("outputTag")
> .addSink(kafkaSink_2);
> ```



RE: Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Zhang,

Please find the code snippet.

private ReducingState aggrRecord; // record being aggregated

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {
// FIXME timer is not working? or re-registration not working?
NatLogData event = aggrRecord.get();   // Always get null value.


Thanks,
Suchithra

From: JING ZHANG 
Sent: Wednesday, June 9, 2021 2:20 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org; Jash, Shaswata (Nokia - IN/Bangalore) 

Subject: Re: Issue with onTimer method of KeyedProcessFunction

Hi Suchithra,
Would you please provide more information in detail or paste the main code?

Best regards,
JING ZHANG

V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 于2021年6月9日周三 下午3:42写道:
Hello,

We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3 
version. After upgrading to 1.12.3 version,  the onTimer method of  
KeyedProcessFunction is not behaving correctly, the value of ReducingState and 
ValueState always return null.

Could you please help in debugging the issue.

Thanks,
Suchithra



Re: behavior differences when sinking side outputs to a custom KeyedCoProcessFunction

2021-06-09 Thread Arvid Heise
Hi Jin,

as you have figured out, if something goes wrong with watermarks it's
usually because of the watermark generator (sorry for not receiving any
feedback whatsoever).

Thank you very much for sharing your solution!

On Thu, Jun 3, 2021 at 8:51 PM Jin Yi  wrote:

> just to resolve this thread, i figured out the issue.  there's a local
> version of a watermark strategy that we use when running locally for
> development that didn't work correctly on many events with the same
> timestamp which the fake data generation that happens for local runs has a
> tendency to do.  fixing the local watermark generator used in the strategy
> to account for this properly fixed all of my issues.
>
> On Fri, May 21, 2021 at 10:09 AM Jin Yi  wrote:
>
>> (sorry that the last sentence fragment made it into my email... it was a
>> draft comment that i forgot to remove.  my thoughts are/were complete in
>> the first message.)
>>
>> i do have follow-up questions/thoughts for this thread though.  given my
>> current setup, it seems it's more expected to have the behavior when i
>> touch the right events given how event based watermarks and kafka connector
>> generated watermarks should work.  a 2 input stream op should fire its
>> timers on the min of the left and right watermark inputs based on what i've
>> read.  so, it seems that my custom keyedcoprocessfunction's onTimer should
>> only fire when a slowest watermark of either input stream reaches the
>> timer's time, correct?  the behavior of things being dropped from the right
>> even stream prematurely (what i originally thought was logically
>> conclusive) based on just the right watermark point would be the incorrect
>> behavior?
>>
>> should i file an issue/bug?
>>
>> On Thu, May 20, 2021 at 3:39 PM Jin Yi  wrote:
>>
>>> hello,
>>>
>>> sorry for a long post, but this is a puzzling problem and i am enough of
>>> a flink non-expert to be unsure what details are important or not.
>>>
>>> background:
>>> i have a flink pipeline that is a series of custom "joins" for the
>>> purposes of user event "flattening" that i wrote a custom
>>> KeyedCoProcessFunction that either joins on a parent id between the two
>>> connected streams using the "left" event's primary key and the foreign key
>>> on the right event OR if the right (child) event doesn't have a foreign
>>> key, tries to infer the join using heuristics to limit the possible parent
>>> events and grabbing the temporally-closest one.  both the inference and
>>> state cleanup for these joins are happening on the onTimer method.
>>>
>>> everything is based on event time, and i'm using kafka connector input
>>> source for the right event inputs to these operators.  here's what the
>>> pipeline looks like, with the joins in question acting like a chain of
>>> joins with the output of the previous join (left input) being joined with a
>>> new raw event source (right input):
>>>
>>> [image: Screen Shot 2021-05-20 at 3.12.22 PM.png]
>>> these join functions have a time window/duration or interval associated
>>> with them to define the duration of join state and inference window.  this
>>> is set per operator to allow for in order and out of order join thresholds
>>> for id based joins, and this window acts as the scope for inference when a
>>> right event that is an inference candidate (missing foreign key id) is
>>> about to be evicted from state.
>>>
>>> problem:
>>>
>>> i have things coded up with side outputs for duplicate, late and dropped
>>> events.  the dropped events case is the one i am focusing on since events
>>> that go unmatched are dropped when they are evicted from state.  only rhs
>>> events are the ones being dropped, with rhs events w/ foreign keys dropped
>>> when they go unmatched (late/no left arrival or no valid inference based
>>> left event).  with a wide enough time duration setting for both in order
>>> and out of order durations, everything gets matched.  however, when testing
>>> things out, i observed (expectedly) that the dropped events increases the
>>> tighter you make the join window based on these durations.  great, this
>>> makes sense.  i wanted to get a better understanding for these durations'
>>> impacts, so i wrote our integration/stress test case to focus on just id
>>> key based joins to start on this.
>>>
>>> further, to help observe the dropping characteristics, i connected the
>>> side outputs to some s3 sinks to store these dropped events.  originally,
>>> these dropped right events were output properly to the s3 output.  for the
>>> integration/stress test setup, they start to appear with durations < 1
>>> minute.
>>>
>>> however, i observed that they didn't include the flink Context.timestamp
>>> encoded in the event structure anywhere (the left events were already
>>> setting the timestamp in the processElement1 method).  i wanted this
>>> information to see how event time processing worked in practice.  so, i
>>> made a similarly simple change to the processElement2 function to

Re: streaming file sink OUT metrics not populating

2021-06-09 Thread Arvid Heise
For reference, the respective FLIP shows the ideas [1]. It's on our agenda
for 1.14.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics

On Thu, Jun 3, 2021 at 6:41 PM Chesnay Schepler  wrote:

> This is a known issue, and cannot be fixed on the user side.
> The underlying problem is that this needs to be implemented separately for
> each source/sink and we haven't gotten around to doing that yet, but some
> progress is being made for 1.14 to that end.
>
> On 6/3/2021 6:06 PM, Vijayendra Yadav wrote:
>
> Hi Team,
>
> I am using streaming file sink and sinking data to s3. Also, using
> Graphite exporter for metrics.
>
> I can see correct counters for Source, Map, Filter functions. But for
> SINK, only* numRecordsIn* is populating, I am looking to get *numRecordsOut
> *counts also, but it's always staying 0 although data is sinking to s3
> correctly.
>
> Any reason for OUT ( *numRecordsOut) *metrics to stay zero all the time?
> Any way to fix it ?
>
> Thanks,
> Vijay
>
>
>


Re: Events triggering JobListener notification

2021-06-09 Thread Arvid Heise
Hi Barak,

I think the answer to your question is lies in the javadoc:

/**
 * Callback on job execution finished, successfully or unsuccessfully.
It is only called back
 * when you call {@code execute()} instead of {@code executeAsync()}
methods of execution
 * environments.
 *
 * Exactly one of the passed parameters is null, respectively for
failure or success.
 */
void onJobExecuted(
@Nullable JobExecutionResult jobExecutionResult, @Nullable
Throwable throwable);


So this callback will be invoked even on failure and cancellation.

On Thu, Jun 3, 2021 at 2:38 PM Barak Ben Nathan 
wrote:

> Hi all,
>
>
>
> I am using Flink 1.12.1
>
>
>
> I’m building a system that creates/cancels Flink Jobs and monitors them.
>
>
>
> We thought to use org.apache.flink.core.execution.JobListener as a ‘push’
> mechanism for job-status-change events.
>
>
>
> We based this idea on the documentation that stated that JobListener ‘…is
> notified on specific job status changed’
>
>
>
> However,  from the JobListener’s methods ‘onJobSubmitted’,
> ‘onJobExecuted’, and their documentation, I understand that JobListener is
> not notified on **all** events.
>
> E.g. :
>
> Job failure (after it running for some time) or Job cancellation, will not
> cause JobListener to be notified.
>
>
>
> Am I correct?
>
>
>
> Barak
>
>
>


Re: Flink kafka consumers stopped consuming messages

2021-06-09 Thread Arvid Heise
Hi Ilya,

These messages could pop up when a Kafka broker is down but should
eventually disappear. So I'm a bit lost.

If there was a bug, it's also most likely fixed in the meantime. So if you
want to be on the safe side, I'd try to upgrade to more recent versions
(Flink + Kafka consumer).

Best,

Arvid

On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov  wrote:

> Hi there,
>
> today I've observed strange behaviour of a flink streaming application
> (flink 1.6.1, per-job cluster deployment, yarn):
> 3 task managers (2 slots each) are running but only 1 slot is actually
> consuming messages from kafka (v0.11.0.2), others were idling
> (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics).
>
> So I started to investigate:
> - `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for
> all 6 topic partitions are constantly increasing.
> - `kafka-consumer-groups.sh` listed only single (the 4th) partition. That
> makes me thinks that by somehow 5 kafka consumers lost connection to
> brokers.
> - A LOT of messages "Committing offsets to Kafka takes longer than the
> checkpoint interval. Skipping commit of previous offsets because newer
> complete checkpoint offsets are available. This does not compromise Flink's
> checkpoint integrity." in each task manager instance.
> - 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
> - no application errors/uncaught exceptions etc.
> - no reconnections to kafka.
> - some network issues connected with hdfs (Slow waitForAckedSeqno).
> - all kafka networking setting are default (e.g. timeouts).
>
> After job restart all task managers started to consume messages (6 slots
> in total, and `kafka-consumer-groups.sh` listed that all 6 partitions are
> consumed).
>
> May be someone had already experienced something similar?
>
> Job topology is as follows (no window operations!):
> ```
> val dataStream = env.addSource(kafkaSource).map(processor);
>
> val terminalStream = AsyncDataStream
> .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
> .process(sideOutputFun);
>
> terminalStream
> .keyBy(selector)
> .process(keyProcFun)
> .addSink(kafkaSink_1);
>
> terminalStream
> .getSideOutput("outputTag")
> .addSink(kafkaSink_2);
> ```


Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
BTW, you can also send email to zeppelin user maillist to join zeppelin
slack channel to discuss more details.
http://zeppelin.apache.org/community.html


Jeff Zhang  于2021年6月9日周三 下午6:34写道:

> Hi Maciek,
>
> You can try zeppelin which support pyflink and display flink job url
> inline.
>
> http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html
>
>
> Maciej Bryński  于2021年6月9日周三 下午1:53写道:
>
>> Nope.
>> I found the following solution.
>>
>> conf = Configuration()
>> env =
>> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
>> env_settings =
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> table_env =
>> StreamTableEnvironment.create(stream_execution_environment=env,
>> environment_settings=env_settings)
>>
>> I also created the bug report
>> https://issues.apache.org/jira/browse/FLINK-22924.
>> I think this API should be exposed in Python.
>>
>> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
>> >
>> > Hi Macike,
>> >
>> > You could try if the following works:
>> >
>> > ```
>> > table_env.get_config().get_configuration().set_string("rest.bind-port",
>> "xxx")
>> > ```
>> >
>> > Regards,
>> > Dian
>> >
>> > > 2021年6月8日 下午8:26,maverick  写道:
>> > >
>> > > Hi,
>> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
>> starting
>> > > TableEnvironment with following code:
>> > >
>> > > env_settings =
>> > >
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> > > table_env = TableEnvironment.create(env_settings)
>> > >
>> > > How can I enable Web UI in this code?
>> > >
>> > > Regards,
>> > > Maciek
>> > >
>> > >
>> > >
>> > > --
>> > > Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> >
>>
>>
>> --
>> Maciek Bryński
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
Hi Maciek,

You can try zeppelin which support pyflink and display flink job url inline.

http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


Maciej Bryński  于2021年6月9日周三 下午1:53写道:

> Nope.
> I found the following solution.
>
> conf = Configuration()
> env =
> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(stream_execution_environment=env,
> environment_settings=env_settings)
>
> I also created the bug report
> https://issues.apache.org/jira/browse/FLINK-22924.
> I think this API should be exposed in Python.
>
> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
> >
> > Hi Macike,
> >
> > You could try if the following works:
> >
> > ```
> > table_env.get_config().get_configuration().set_string("rest.bind-port",
> "xxx")
> > ```
> >
> > Regards,
> > Dian
> >
> > > 2021年6月8日 下午8:26,maverick  写道:
> > >
> > > Hi,
> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
> starting
> > > TableEnvironment with following code:
> > >
> > > env_settings =
> > >
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > > table_env = TableEnvironment.create(env_settings)
> > >
> > > How can I enable Web UI in this code?
> > >
> > > Regards,
> > > Maciek
> > >
> > >
> > >
> > > --
> > > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>
> --
> Maciek Bryński
>


-- 
Best Regards

Jeff Zhang


Re: Using s3 bucket for high availability

2021-06-09 Thread Tamir Sagi

I'd try several things

try accessing the bucket from CLI first locally
 https://docs.aws.amazon.com/cli/latest/reference/s3/

If it does not work
please check your credentials under ~/.aws/credentials file + ~/.aws/config = 
since the AWS clients read the credentials from these files by default(unless 
other credentials are set)

If everything works well:

  1.  Are you running inside EKS? if so, you must attach the pods a service 
account with corresponded permissions to S3.
  2.  If not,  make sure the pods have the credentials to AWS(access key, 
secret key, region)

Please provide more code snippet.

I recently ran Flink job on Application cluster in EKS. the job also reads 
files from S3. (Without HA)

Tamir

[https://my-email-signature.link/signature.gif?u=1088647&e=157997735&v=61757801dda4474f1dcdec8227f1c40523846082fd5a0e52f68e63cfd6b92721]

From: Yang Wang 
Sent: Wednesday, June 9, 2021 11:29 AM
To: Kurtis Walker 
Cc: user 
Subject: Re: Using s3 bucket for high availability


EXTERNAL EMAIL


It seems to be a S3 issue. And I am not sure it is the root cause. Could you 
please share more details of the JobManager log?

Or could you verify that the Flink cluster could access the S3 bucket 
successfully(e.g. store the checkpoint) when HA is disabled?

Best,
Yang

Kurtis Walker mailto:kurtis.wal...@sugarcrm.com>> 
于2021年6月8日周二 下午11:00写道:

Sorry, fat finger send before I finished writing….



Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:



kubernetes.service-account: flink-service-account

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: 
s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery



I’m getting an error accessing the bucket.



2021-06-08 14:33:42,189 DEBUG com.amazonaws.services.s3.AmazonS3Client  
   [] - Bucket region cache doesn't have an entry for 
corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket 
region from Amazon S3.

2021-06-08 14:33:42,193 DEBUG com.amazonaws.util.json.Jackson   
   [] - Failed to parse JSON string.

com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map 
due to end-of-input

at [Source: (String)""; line: 1, column: 0]

at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-s3-fs-presto-1.13.0.jar:1.13.0]



Is there an additional config I need for specifying the region for the bucket?  
I’ve been searching the doc and haven’t found anything like that.





From: Kurtis Walker 
mailto:kurtis.wal...@sugarcrm.com>>
Date: Tuesday, June 8, 2021 at 10:55 AM
To: user mailto:user@flink.apache.org>>
Subject: Using s3 bucket for high availability

Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: State migration for sql job

2021-06-09 Thread Yuval Itzchakov
As my company is also a heavy user of Flink SQL, the state migration story
is very important to us.

I as well believe that adding new fields should start to accumulate state
from the point in time of the change forward.

Is anyone actively working on this? Is there anyway to get involved?

On Tue, Jun 8, 2021, 17:33 aitozi  wrote:

> Thanks for JING & Kurt's reply. I think we prefer to choose the option (a)
> that will not take  the history data into account.
>
> IMO, if we want to process all the historical data, we have to store the
> original data, which may be a big overhead to backend. But if we just
> aggregate after the new added function, may just need a data format
> transfer. Besides, the business logic we met only need the new aggFunction
> accumulate with new data.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread JING ZHANG
Hi Suchithra,
Would you please provide more information in detail or paste the main code?

Best regards,
JING ZHANG

V N, Suchithra (Nokia - IN/Bangalore) 
于2021年6月9日周三 下午3:42写道:

> Hello,
>
>
>
> We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3
> version. After upgrading to 1.12.3 version,  the onTimer method of
>  KeyedProcessFunction is not behaving correctly, the value of ReducingState
> and ValueState always return null.
>
>
>
> Could you please help in debugging the issue.
>
>
>
> Thanks,
>
> Suchithra
>
>
>


Re: Using s3 bucket for high availability

2021-06-09 Thread Yang Wang
It seems to be a S3 issue. And I am not sure it is the root cause. Could
you please share more details of the JobManager log?

Or could you verify that the Flink cluster could access the S3 bucket
successfully(e.g. store the checkpoint) when HA is disabled?

Best,
Yang

Kurtis Walker  于2021年6月8日周二 下午11:00写道:

> Sorry, fat finger send before I finished writing….
>
>
>
> Hello,
>
>   I’m trying to set up my flink native Kubernetes cluster with High
> availability.  Here’s the relevant config:
>
>
>
> kubernetes.service-account: flink-service-account
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.storageDir:
> s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery
>
>
>
> I’m getting an error accessing the bucket.
>
>
>
> 2021-06-08 14:33:42,189 DEBUG
> com.amazonaws.services.s3.AmazonS3Client [] - Bucket
> region cache doesn't have an entry for
> corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket
> region from Amazon S3.
>
> 2021-06-08 14:33:42,193 DEBUG
> com.amazonaws.util.json.Jackson  [] - Failed to
> parse JSON string.
>
> com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to
> map due to end-of-input
>
> at [Source: (String)""; line: 1, column: 0]
>
> at
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-s3-fs-presto-1.13.0.jar:1.13.0]
>
>
>
> Is there an additional config I need for specifying the region for the
> bucket?  I’ve been searching the doc and haven’t found anything like that.
>
>
>
>
>
> *From: *Kurtis Walker 
> *Date: *Tuesday, June 8, 2021 at 10:55 AM
> *To: *user 
> *Subject: *Using s3 bucket for high availability
>
> Hello,
>
>   I’m trying to set up my flink native Kubernetes cluster with High
> availability.  Here’s the relevant config:
>


Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello,

We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3 
version. After upgrading to 1.12.3 version,  the onTimer method of  
KeyedProcessFunction is not behaving correctly, the value of ReducingState and 
ValueState always return null.

Could you please help in debugging the issue.

Thanks,
Suchithra



Re: Questions about implementing a flink source

2021-06-09 Thread Arvid Heise
Hi Evan,

1. I'd recommend supporting DeserializationSchema in any case similar to
KafkaRecordDeserializationSchema.
First, it aligns with other sources and user expectations.
Second, it's a tad faster and the plan looks easier if you omit a chained
task.
Third, you can avoid quite a bit of boilerplate code on user side by having
adapters such that a user can use any existing Flink DeserializationSchema
to deserialize the payload; so without writing any UDF in 80% of the use
cases, the user gets the value that he wants (see
KafkaValueOnlyDeserializationSchemaWrapper).
Lastly, we also plan to have first class support for invalid record
handling at some point and it might be connected to DeserializationSchema.

2. It's any reassignment while there is still data flowing in the execution
graph. It's always a matter if there are parallel roads from source to
sink. As long as there is an old record on the road, sending new records on
a different road has always the potential of new record overtaking old
record.
If you could drain all data (currently not possible) without restarting,
then dynamic reassignment would be safe.

Note that without backpressure, it would certainly be enough to wait a
couple of seconds after unassigning a partition before reassigning it to
avoid any reordering issue. Maybe you could offer a configuration option
and the user has to take some responsibility.

I could also see that we could piggyback on aligned checkpoint barriers to
not emit any data until the checkpoint has been completed and do the
reassignment then. But that's certainly something that the framework should
support and that you don't want to implement on your own.

3. Yes if you throw an IOException (or any other exception), the checkpoint
would not complete and the task gets restarted (could be in an inconsistent
state).

On Tue, Jun 8, 2021 at 10:51 PM Evan Palmer  wrote:

> Hello again,
>
> Thank you for all of your help so far, I have a few more questions if you
> have the time :)
>
> 1. Deserialization Schema
>
> There's been some debate within my team about whether we should offer a
> DeserializationSchema and SerializationSchema in our source and sink.
>
> If we include don't include the schemas, our source and sink would be
> implement Source<...pubsublite.Message> and Sink<...pubsublite.Message>,
> which is the type our client library currently returns (this type is
> serializable), and users could transform the messages in a map function
> after the source. This would make implementing the source somewhat easier,
> and it doesn't seem like it would be much more difficult for users. On the
> other hand, I looked around and didn't find any flink sources implemented
> without a deserialization/serialization schema, so I'm worried that this
> choice might make our source/sink confusing for users, or that we're
> missing something. What are your thoughts on this?
>
> 2. Order aware rebalancing.
>
> I want to make sure I understand the problem with rebalancing partitions
> to different SourceReaders. Does any reassignment of a pub/sub
> partition between SourceReaders have the potential to cause disorder, or
> can order be guaranteed by some variant of ensuring that the partition is
> assigned to only one source reader at a time?
>
> I read through
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/#parallel-dataflows,
> which made me think that if the user wanted a pipeline like
>
> env.fromSource(source).keyBy("Message Partition", ...).sinkTo(sink)
>
> Then if two different source tasks had messages from a single pub/sub
> partition, there could be disorder. We're not planning to implement any
> rebalancing of partitions in our source, but I wanted to make sure I can
> document this correctly :)
>
> 3. Reporting permanent failures in the Sink
>
> Is it sufficient to throw an exception from Committer.commit() in the case
> where our sink has permanently failed in some way (e.g. the configured
> topic has been deleted, or the user doesn't have permissions to publish),
> or is there something else we should be doing?
>
> Evan
>
>
> On Mon, May 10, 2021 at 9:57 AM Arvid Heise  wrote:
>
>> Hi Evan,
>>
>> A few replies / questions inline. Somewhat relatedly, I'm also wondering
>>> where this connector should live. I saw that there's already a pubsub
>>> connector in
>>> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
>>> so if flink is willing to host it, perhaps it could live near there?
>>> Alternatively, it could live alongside our client library in
>>> https://github.com/googleapis/java-pubsublite.
>>>
>>
>> For a long time, the community has been thinking of moving (most)
>> connectors out of the repository. Especially now with the new source/sink
>> interface, the need to decouple Flink release cycle and connector release
>> cycle is bigger t

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-09 Thread Chirag Dewan
 Thanks for the reply Yun.
The key is an Integer type. Do you think there can be hash collisions for 
Integers?
It somehow works on single TM now. No errors for 1m records.But as soon as we 
move to 2 TMs, we get all sort of errors - 'Position Out of Bound', key not in 
Keygroup etc.
This also causes a NPE in the user defined code -
if (valueState != null)    valueState.value() -> This causes Null, so while the 
if check passed, it caused an NPE while reading the value.

Thanks,Chirag
On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
As far as I know, If you are running a single job, I think all th pods share 
the same state.checkpoints.dir configuration should be as expected, and it is 
not necessary to configuraiton the rocksdb local dir since Flink will chosen a 
default dir.
Regarding the latest exception, I think you might first check the key type used 
and the key type should has a stable hashcode method. 
Best,Yun



 --Original Mail --Sender:Chirag Dewan 
Send Date:Tue Jun 8 18:06:07 2021Recipients:User 
, Yun Gao Subject:Re: Multiple 
Exceptions during Load Test in State Access APIs with RocksDB
 Hi,
Although this looks like a problem to me, I still cant conclude it. 
I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I 
was hoping that with single TM there will be file write conflicts. But that 
doesn't seem to be the case as still get the:
Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalArgumentException: Key group 2 is not in 
KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
I have checked that there's no concurrent access on the ValueState.
Any more leads?
Thanks,Chirag

On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan 
 wrote:  
 
  Hi,
I think I got my issue. Would help if someone can confirm it :)
I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 
All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.
Does this lead to state corruption?
Thanks,Chirag


On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote:  
 
  Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.
Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
If be able to produce the exception, could you first add some logs to printthe 
value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ?I 
think either object being null would cause NullPointerException here. 
For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?
Best,Yun
[1] https://issues.apache.org/jira/browse/FLINK-18587
Best,Yun


 --Original Mail --Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021Recipients:User 
Subject:Multiple Exceptions during Load Test in State 
Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 

I have 2 Task Managers with 2 taskslots and 4 cores each. 

Below is our setup:

 

Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic

  

public class Aggregator_KeyedExpression extendsKeyedProcessFunction {

 

    private ValueStatevalueState;

 

    @Override

    public void open() throws Exception {

ValueStateDescriptor descriptor =

   new ValueStateDescriptor(

   "totalPrize",Integer.class);

 

    valueState =getRuntimeContext().getState(descriptor);

    }

 

@Override

    public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)

   throws Exception {

 

if(valueState.value() == null) {

   valueState.update(0);

    }

    

valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line

    

int sum =valueState.value();



    GameZoneOutputoutput = new GameZoneOutput();

   output.setPlayerId(inEvent.getPlayerId());

   output.setNetPrize(sum);

   outEvents.add(output);

 

    }

 

    @Override

    public void close() throws Exception {

   valueState.clear();

    }

}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions and