NPE when aggregate window.

2021-04-12 Thread Si-li Liu
Hi,

I encounter a weird NPE when try to do aggregate on a fixed window. If I
set a small parallism number the whole job uses only one TaskManager, this
NPE will not happen. But when the job scales to two TaskManagers, the
TaskManager will crash at Create stage. The Flink version I use is 1.11.1.

The NPE exception stack is:

2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] -
Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger,
AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed
(7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED.
java.io.IOException: Exception while applying AggregateFunction in
aggregating state
at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1
]
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1
.jar:1.11.1]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11
.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar:
1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11
.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11
.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.NullPointerException
at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable
.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
... 13 more

My aggregate code is

public class AggregateDataEntry implements
AggregateFunction, Map,
Map> {

@Override
public Map createAccumulator() {
return new HashMap<>();
}

@Override
public Map add(Tuple2
value, Map accumulator) {
accumulator.merge(value.f0, value.f1, DataIndex::add);
return accumulator;
}

@Override
public Map getResult(Map
accumulator) {
return accumulator;
}

@Override
public Map merge(Map a,
Map b) {
a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex,
DataIndex::add));
return b;
}
}

Could anyone know something about this NPE, thanks!
-- 
Best regards

Sili Liu


Re: [External] : Re: Conflict in the document - About native Kubernetes per job mode

2021-04-12 Thread Fuyao Li
Hello Yang,

It is very kind of you to give such a detailed explanation! Thanks for 
clarification.

For the small document fix I mentioned, what do you think?

Best,
Fuyao

From: Yang Wang 
Date: Monday, April 12, 2021 at 23:03
To: Fuyao Li 
Cc: user , Yan Wang 
Subject: [External] : Re: Conflict in the document - About native Kubernetes 
per job mode
Hi Fuyao,

Currently, Flink only supports perjob mode for Yarn. The standalone job cluster 
has been replaced with standalone application mode
after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration do 
not support per-job mode.

In your attached video, it is a PoC implementation for presentation. We have 
introduced the Kubernetes application mode in release 1.11
and agree to not have a per-job cluster support. The only reason is that it is 
not very convenient to ship the job graphs, user jars, artifacts in
Kubernetes environment.

[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
上午8:10写道:
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode
[4] 
https://www.youtube.com/watch?v=pdFPr_VOWTU&t=833s

Best,
Fuyao


Re: Conflict in the document - About native Kubernetes per job mode

2021-04-12 Thread Yang Wang
Hi Fuyao,

Currently, Flink only supports perjob mode for Yarn. The standalone job
cluster has been replaced with standalone application mode
after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration
do not support per-job mode.

In your attached video, it is a PoC implementation for presentation. We
have introduced the Kubernetes application mode in release 1.11
and agree to not have a per-job cluster support. The only reason is that it
is not very convenient to ship the job graphs, user jars, artifacts in
Kubernetes environment.

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode

Best,
Yang

Fuyao Li  于2021年4月13日周二 上午8:10写道:

> Hello Community, Yang,
>
>
>
> I noticed a conflict in the document for per-job mode support for
> Kubernetes.
>
> In the doc here [1], it mentions
>
> in a Flink Job Cluster, the available cluster manager (like YARN or
> Kubernetes) is used to spin up a cluster for each submitted job and this
> cluster is available to that job only.
>
> It implies per job mode is supported in Kubernetes.
>
>
>
> However, in the docs [2] and [3], it clearly points out per-job mode is
> not supported in Kubernetes.
>
>
>
> *This is a conflict statement and is kind of misleading. If needed, I can
> create an MR to delete the statement in [1] for Kubernetes.. It is a small
> fix. *
>
>
>
> I also noticed another thing in the video [4] at 25:08. Yang, you are
> executing a command with -e kubernetes-per-job flag. I tried and found such
> command is not supported in Flink distribution at all. I noticed the
> version you are using is 1.11-snapshot during the demo. Are you modifying
> the source code and generated an internal version of Flink….?
>
>
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode
>
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode
>
> [4] https://www.youtube.com/watch?v=pdFPr_VOWTU&t=833s
>
>
>
> Best,
>
> Fuyao
>


Re: how to convert DataStream to Table

2021-04-12 Thread Svend
Hi,

Here's an example that works for me:


"""
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*$*;

import java.util.List;

public class Stream2Table {

public static void main(String[] args) {

var streamingEnv = 
StreamExecutionEnvironment.*getExecutionEnvironment*();
var tableEnv = StreamTableEnvironment.*create*(streamingEnv);

var userRows = streamingEnv.fromCollection(
List.*of*(
Row.*of*("user1", "al...@mail.org 
", "Alice"),
Row.*of*("user2", "b...@mail.org 
", "Bob")
),
new RowTypeInfo(Types.*STRING*, Types.*STRING*, 
Types.*STRING*));

var table = tableEnv
.fromDataStream(userRows,
*$*("user_id"), *$*("handle"), *$*("name"));

table.execute().print();
}

}
"""

You can also dig here, you'll probably find better examples
https://github.com/apache/flink/tree/master/flink-examples/flink-examples-table

Cheers,

Svend


On Sun, 11 Apr 2021, at 3:35 PM, vtygoss wrote:
> 
> Hi All,

> 

> there is a scenario where I need to process OGG Log data in kafka using Flink 
> Sql. I can convert the OGG Log Stream to DataStream and each event 
> has RowKind, but i have trouble converting DataStream to a Table.

> For test, i tried StreamTableEnvironment#fromDataStream and 
> createTemporaryView API, both TableSchema is 

> ```

> root

>  |-- f0: LEGACY('RAW', 'ANY')

> ```

> 

> i want to get the schema :

> 

> ```

> root 

>  |— column1: Type,

>  |— column2: Type, 

> ….

> ```

> 

> 

> how to convert DataStream with RowKind to Table? 

> 

> 

> Thank you very much for your reply

> 


Conflict in the document - About native Kubernetes per job mode

2021-04-12 Thread Fuyao Li
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode
[4] https://www.youtube.com/watch?v=pdFPr_VOWTU&t=833s

Best,
Fuyao


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Rahul Patwari
Hi Roman, Arvid,

So, to achieve "at least once" guarantee, currently, automatic restart of
Flink should be disabled?
Is there any workaround to get "at least once" semantics with Flink
Automatic restarts in this case?

Regards,
Rahul

On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan  wrote:

> Hi,
>
> Thanks for the clarification.
>
> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
>
> That's currently not possible, at least with the default connector.
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>  wrote:
> >
> > Hi Roman,
> >
> > Thanks for the reply.
> > This is what I meant by Internal restarts - Automatic restore of Flink
> Job from a failure. For example, pipeline restarts when Fixed delay or
> Failure Rate restart strategies are configured.
> >
> > Quoting documentation in this link - Configuring Kafka Consumer start
> position configuration
> >
> >> Note that these start position configuration methods do not affect the
> start position when the job is automatically restored from a failure
> >
> >
> >
> > It seems that there will be data loss even when offsets are managed
> externally when there are pipeline restarts due to a failure, say, an
> exception. On the other hand, when the pipeline is stopped and
> resubmitted(say, an upgrade), there won't be any data loss as offsets are
> retrieved from an external store and configured while starting Kafka
> Consumer.
> >
> > We do not want to enable checkpointing as the pipeline is stateless. We
> have Deduplication logic in the pipeline and the processing is idempotent.
> >
> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
> >
> > Thanks,
> > Rahul
> >
> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> Could you please explain what you mean by internal restarts?
> >>
> >> If you commit offsets or timestamps from sink after emitting records
> >> to the external system then there should be no data loss.
> >> Otherwise (if you commit offsets earlier), you have to persist
> >> in-flight records to avoid data loss (i.e. enable checkpointing).
> >>
> >> Regards,
> >> Roman
> >>
> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
> >>  wrote:
> >> >
> >> > Hello,
> >> >
> >> > Context:
> >> >
> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
> >> > The pipeline has a Windowing operator(Used only for introducing a
> delay in processing records) and AsyncI/O operators (used for
> Lookup/Enrichment).
> >> >
> >> > "At least Once" Processing semantics is needed for the pipeline to
> avoid data loss.
> >> >
> >> > Checkpointing is disabled and we are dependent on the auto offset
> commit of Kafka consumer for fault tolerance currently.
> >> >
> >> > As auto offset commit indicates that "the record is successfully
> read", instead of "the record is successfully processed", there will be
> data loss if there is a restart when the offset is committed to Kafka but
> not successfully processed by the Flink Pipeline, as the record is NOT
> replayed again when the pipeline is restarted.
> >> >
> >> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >> >
> >> > Question:
> >> >
> >> > We are looking for other ways to guarantee "at least once" processing
> without checkpointing. One such way is to manage Kafka Offsets Externally.
> >> >
> >> > We can maintain offsets of each partition of each topic in
> Cassandra(or maintain timestamp, where all records with timestamps less
> than this timestamp are successfully processed) and configure Kafka
> consumer Start Position - setStartFromTimestamp() or
> setStartFromSpecificOffsets()
> >> >
> >> > This will be helpful if the pipeline is manually restarted (say,
> JobManager pod is restarted). But, how to avoid data loss in case of
> internal restarts?
> >> >
> >> > Has anyone used this approach?
> >> > What are other ways to guarantee "at least once" processing without
> checkpointing for a stateless Flink pipeline?
> >> >
> >> > Thanks,
> >> > Rahul
>


Re: Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-12 Thread Kevin Lam
That's really helpful, thanks Till!

On Thu, Apr 8, 2021 at 6:32 AM Till Rohrmann  wrote:

> Hi Kevin,
>
> when decreasing the TaskManager count I assume that you also decrease the
> parallelism of the Flink job. There are three aspects which can then cause
> a slower recovery.
>
> 1) Each Task gets a larger key range assigned. Therefore, each TaskManager
> has to download more data in order to restart the Task. Moreover, there are
> fewer nodes downloading larger portions of the data (less parallelization).
> 2) If you rescaled the parallelism, then it can happen that a Task gets a
> key range assigned which requires downloading of multiple key range parts
> from the previous run/savepoint. The new key range might not need all the
> data from the savepoint parts and hence you download some data which is not
> really used in the end.
> 3) When rescaling the job, then Flink has to rebuild the RocksDB instance
> which is an expensive and slow operation. What happens is that Flink
> creates for every savepoint part which it needs for its key range a RocksDB
> instance and then extracts the part which is only relevant for its key
> range into a new RocksDB instance. This causes a lot of read and write
> amplification.
>
> Cheers,
> Till
>
> On Wed, Apr 7, 2021 at 4:07 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> We are trying to benchmark savepoint size vs. restore time.
>>
>> One thing we've observed is that when we reduce the number of task
>> managers, the time to restore from a savepoint increases drastically:
>>
>> 1/ Restoring from 9.7tb savepoint onto 156 task managers takes 28 minutes
>> 2/ Restoring from the save savepoint onto 30 task managers takes over 3
>> hours
>>
>> *Is this expected? How does the restore process work? Is this just a
>> matter of having lower restore parallelism for 30 task managers vs 156 task
>> managers? *
>>
>> Some details
>>
>> - Running on kubernetes
>> - Used Rocksdb with a local ssd for state backend
>> - Savepoint is hosted on GCS
>> - The smaller task manager case is important to us because we expect to
>> deploy our application with a high number of task managers, and downscale
>> once a backfill is completed
>>
>> Differences between 1/ and 2/:
>>
>> 2/ has decreased task manager count 156 -> 30
>> 2/ has decreased operator parallelism by a factor of ~10
>> 2/ uses a striped SSD (3 ssds mounted as a single logical volume) to hold
>> rocksdb files
>>
>> Thanks in advance for your help!
>>
>


Re: How to know if task-local recovery kicked in for some nodes?

2021-04-12 Thread Sonam Mandal
Hi Till,

Got it, that definitely makes sense, was just looking for some ballpark number 
to start with. Appreciate your help!

Thanks,
Sonam

From: Till Rohrmann 
Sent: Monday, April 12, 2021 1:00 AM
To: Sonam Mandal 
Cc: dhanesh arole ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: How to know if task-local recovery kicked in for some nodes?

Hi Sonam,

The state size probably depends a bit on your infrastructure. Assuming you have 
1 GBps network connection and local SSDs, then I guess you should see a 
difference if your local state size is  > 1 GB.

Cheers,
Till

On Wed, Apr 7, 2021 at 1:46 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hi Till and Dhanesh,

Thanks for the insights into both on how to check that this kicks in and on the 
expected behavior. My understanding too was that if multiple TMs are used for 
the job, any TMs that don’t go down can take advantage of local recovery.

Do you have any insights on a good minimum state size we should experiment with 
to check recovery time differences between the two modes?

Thanks,
Sonam

From: dhanesh arole mailto:davcdhane...@gmail.com>>
Sent: Wednesday, April 7, 2021 3:43:11 AM
To: Till Rohrmann mailto:trohrm...@apache.org>>
Cc: Sonam Mandal mailto:soman...@linkedin.com>>; Tzu-Li 
(Gordon) Tai mailto:tzuli...@apache.org>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: How to know if task-local recovery kicked in for some nodes?

Hi Till,

You are right. To give you more context about our setup, we are running 1 task 
slot per task manager and total number of task manager replicas equal to job 
parallelism. The issue actually exacerbates during rolling deployment of task 
managers as each TM goes offline and comes back online again after some time. 
So during bouncing of every TM pod somehow task allocation changes and finally 
job stabilises once all TMs are restarted.  Maybe a proper blue green setup 
would allow us to make the best use of local recovery during restart of TMs. 
But during intermittent failures of one of the TMs local recovery works as 
expected on the other healthy TM instances ( I.e it does not download from 
remote ).

On Wed, 7 Apr 2021 at 10:35 Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Dhanesh,

if some of the previously used TMs are still available, then Flink should try 
to redeploy tasks onto them also in case of a global failover. Only those tasks 
which have been executed on the lost TaskManager will need new slots and have 
to download the state from the remote storage.

Cheers,
Till

On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole 
mailto:davcdhane...@gmail.com>> wrote:
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod 
gets killed and restarts again ( i.e. the entire task manager process restarts 
) then local recovery doesn't happen. Task manager restore process actually 
downloads the latest completed checkpoint from the remote state handle even 
when the older localState data is available. This happens because with every 
run allocation-ids for tasks running on task manager change as task manager 
restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data 
kicks in when the task manager process is alive but due to some other reason ( 
like timeout from sink or external dependency ) one of the tasks fails and the 
flink job gets restarted by the job manager.

Please CMIIW


-
Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the 
recovery time. Apart from that you can also look for "Found registered local 
state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged 
on debug. This indicates that the local state is available. However, it does 
not say whether it is actually used. E.g. when doing a rescaling operation we 
change the assignment of key group ranges which prevents local state from being 
used. However in case of a recovery the above-mentioned log message should 
indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
mailto:tzuli...@apache.org>> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether 
there is a way to validate that some tasks of the job recovered from the local 
state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we 
run a job with parallelism 4. To simulate failure, we kill one of the Task 
Manager pods (we run on Kubernetes). I want to see if the local state

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Roman Khachatryan
Hi,

Thanks for the clarification.

> Other than managing offsets externally, Are there any other ways to guarantee 
> "at least once" processing without enabling checkpointing?

That's currently not possible, at least with the default connector.

Regards,
Roman

On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
 wrote:
>
> Hi Roman,
>
> Thanks for the reply.
> This is what I meant by Internal restarts - Automatic restore of Flink Job 
> from a failure. For example, pipeline restarts when Fixed delay or Failure 
> Rate restart strategies are configured.
>
> Quoting documentation in this link - Configuring Kafka Consumer start 
> position configuration
>
>> Note that these start position configuration methods do not affect the start 
>> position when the job is automatically restored from a failure
>
>
>
> It seems that there will be data loss even when offsets are managed 
> externally when there are pipeline restarts due to a failure, say, an 
> exception. On the other hand, when the pipeline is stopped and 
> resubmitted(say, an upgrade), there won't be any data loss as offsets are 
> retrieved from an external store and configured while starting Kafka Consumer.
>
> We do not want to enable checkpointing as the pipeline is stateless. We have 
> Deduplication logic in the pipeline and the processing is idempotent.
>
> Other than managing offsets externally, Are there any other ways to guarantee 
> "at least once" processing without enabling checkpointing?
>
> Thanks,
> Rahul
>
> On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Could you please explain what you mean by internal restarts?
>>
>> If you commit offsets or timestamps from sink after emitting records
>> to the external system then there should be no data loss.
>> Otherwise (if you commit offsets earlier), you have to persist
>> in-flight records to avoid data loss (i.e. enable checkpointing).
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>>  wrote:
>> >
>> > Hello,
>> >
>> > Context:
>> >
>> > We have a stateless Flink Pipeline which reads from Kafka topics.
>> > The pipeline has a Windowing operator(Used only for introducing a delay in 
>> > processing records) and AsyncI/O operators (used for Lookup/Enrichment).
>> >
>> > "At least Once" Processing semantics is needed for the pipeline to avoid 
>> > data loss.
>> >
>> > Checkpointing is disabled and we are dependent on the auto offset commit 
>> > of Kafka consumer for fault tolerance currently.
>> >
>> > As auto offset commit indicates that "the record is successfully read", 
>> > instead of "the record is successfully processed", there will be data loss 
>> > if there is a restart when the offset is committed to Kafka but not 
>> > successfully processed by the Flink Pipeline, as the record is NOT 
>> > replayed again when the pipeline is restarted.
>> >
>> > Checkpointing can solve this problem. But, since the pipeline is 
>> > stateless, we do not want to use checkpointing, which will persist all the 
>> > records in Windowing Operator and in-flight Async I/O calls.
>> >
>> > Question:
>> >
>> > We are looking for other ways to guarantee "at least once" processing 
>> > without checkpointing. One such way is to manage Kafka Offsets Externally.
>> >
>> > We can maintain offsets of each partition of each topic in Cassandra(or 
>> > maintain timestamp, where all records with timestamps less than this 
>> > timestamp are successfully processed) and configure Kafka consumer Start 
>> > Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
>> >
>> > This will be helpful if the pipeline is manually restarted (say, 
>> > JobManager pod is restarted). But, how to avoid data loss in case of 
>> > internal restarts?
>> >
>> > Has anyone used this approach?
>> > What are other ways to guarantee "at least once" processing without 
>> > checkpointing for a stateless Flink pipeline?
>> >
>> > Thanks,
>> > Rahul


Re: Query regarding flink metric types

2021-04-12 Thread Roman Khachatryan
Hi Suchithra,

You are right, those metrics can only grow, at least until failover.

isBackPressured is reported as a boolean on subtask level. These samples
are then aggregated and a ratio of (times-back-pressured /
number-of-samples) is reported to the JobManager.

Regards,
Roman


On Fri, Apr 9, 2021 at 12:44 PM V N, Suchithra (Nokia - IN/Bangalore) <
suchithra@nokia.com> wrote:

> Hi Community,
>
>
>
> Need some information regarding metrics type mentioned in flink
> documentation.
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
>
>
>
> For the checkpoint metrics, below metrics are defined as of type gauge. As
> per my understanding gauge type is used to represent a value which can
> increase/decrease whereas counter is used to represent a value which will
> keep increasing. Below metrics will be keep increasing during the job run.
> Hence counter can be appropriate metric type for these. Please share your
> input on this.
>
>
>
> numberOfCompletedCheckpoints
>
> The number of successfully completed checkpoints.
>
> Gauge
>
> numberOfFailedCheckpoints
>
> The number of failed checkpoints.
>
> Gauge
>
> totalNumberOfCheckpoints
>
> The number of total checkpoints (in progress, completed, failed).
>
> Gauge
>
>
>
> Also “isBackPressured"  metric by the name it indicates as it returns
> boolean value Yes/No. Flink documentation says backpressure is measured as
> below,
>
>- *OK*: 0 <= Ratio <= 0.10
>- *LOW*: 0.10 < Ratio <= 0.5
>- *HIGH*: 0.5 < Ratio <= 1
>
> What exactly this metric reports ?
>
>
>
> isBackPressured
>
> Whether the task is back-pressured.
>
> Gauge
>
>
>
> Thanks,
>
> Suchithra
>


Python Integration with Ververica Platform

2021-04-12 Thread Robert Cullen
I've been using the Community Edition v2.4.  Just wondering if there is a
python integration coming in future versions.

tnanks

-- 
Robert Cullen
240-475-4490


Re: Flink 1.11.4?

2021-04-12 Thread Roman Khachatryan
Hi Maciek,

There are no specific plans for 1.11.4 yet as far as I know.
The official policy is to support the current and previous minor
release [1]. So 1.12 and 1.13 will be officially supported once 1.13
is released.
However, it's likely that 1.11.4 will still be released.

[1]
https://flink.apache.org/downloads.html#update-policy-for-old-releases

Regards,
Roman


On Mon, Apr 12, 2021 at 10:35 AM Maciek Próchniak  wrote:
>
> Hello,
>
> I'd like to ask if there are any plans to release 1.11.4 - I understand
> it will be last bugfix release for 1.11.x branch, as 1.13.0 is "just
> round the corner"?
>
> There are a few fixes we'd like to use - e.g.
> https://issues.apache.org/jira/browse/FLINK-9844,
> https://issues.apache.org/jira/browse/FLINK-21164
>
>
> thanks,
>
> maciek
>


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Rahul Patwari
Hi Roman,

Thanks for the reply.
This is what I meant by Internal restarts - Automatic restore of Flink Job
from a failure. For example, pipeline restarts when Fixed delay

or Failure Rate

restart strategies are configured.

Quoting documentation in this link - Configuring Kafka Consumer start
position configuration



Note that these start position configuration methods do not affect the
> start position when the job is automatically restored from a failure



It seems that there will be data loss even when offsets are managed
externally when there are pipeline restarts due to a failure, say, an
exception. On the other hand, when the pipeline is stopped and
resubmitted(say, an upgrade), there won't be any data loss as offsets are
retrieved from an external store and configured while starting Kafka
Consumer.

We do not want to enable checkpointing as the pipeline is stateless. We
have Deduplication logic in the pipeline and the processing is idempotent.

Other than managing offsets externally, Are there any other ways to
guarantee "at least once" processing without enabling checkpointing?

Thanks,
Rahul

On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan  wrote:

> Hi,
>
> Could you please explain what you mean by internal restarts?
>
> If you commit offsets or timestamps from sink after emitting records
> to the external system then there should be no data loss.
> Otherwise (if you commit offsets earlier), you have to persist
> in-flight records to avoid data loss (i.e. enable checkpointing).
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>  wrote:
> >
> > Hello,
> >
> > Context:
> >
> > We have a stateless Flink Pipeline which reads from Kafka topics.
> > The pipeline has a Windowing operator(Used only for introducing a delay
> in processing records) and AsyncI/O operators (used for Lookup/Enrichment).
> >
> > "At least Once" Processing semantics is needed for the pipeline to avoid
> data loss.
> >
> > Checkpointing is disabled and we are dependent on the auto offset commit
> of Kafka consumer for fault tolerance currently.
> >
> > As auto offset commit indicates that "the record is successfully read",
> instead of "the record is successfully processed", there will be data loss
> if there is a restart when the offset is committed to Kafka but not
> successfully processed by the Flink Pipeline, as the record is NOT replayed
> again when the pipeline is restarted.
> >
> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >
> > Question:
> >
> > We are looking for other ways to guarantee "at least once" processing
> without checkpointing. One such way is to manage Kafka Offsets Externally.
> >
> > We can maintain offsets of each partition of each topic in Cassandra(or
> maintain timestamp, where all records with timestamps less than this
> timestamp are successfully processed) and configure Kafka consumer Start
> Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
> >
> > This will be helpful if the pipeline is manually restarted (say,
> JobManager pod is restarted). But, how to avoid data loss in case of
> internal restarts?
> >
> > Has anyone used this approach?
> > What are other ways to guarantee "at least once" processing without
> checkpointing for a stateless Flink pipeline?
> >
> > Thanks,
> > Rahul
>


Re: Flink Metric isBackPressured not available

2021-04-12 Thread Roman Khachatryan
Hi,

The metric is registered upon task deployment and reported periodically.

Which Flink version are you using? The metric was added in 1.10.
Are you checking it in the UI?

Regards,
Roman

On Fri, Apr 9, 2021 at 8:50 PM Claude M  wrote:
>
> Hello,
>
> The documentation here 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html 
> states there is a isBackPressured metric available yet I don't see it.  Any 
> ideas why?
>
>
> Thanks


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Roman Khachatryan
Hi,

Could you please explain what you mean by internal restarts?

If you commit offsets or timestamps from sink after emitting records
to the external system then there should be no data loss.
Otherwise (if you commit offsets earlier), you have to persist
in-flight records to avoid data loss (i.e. enable checkpointing).

Regards,
Roman

On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
 wrote:
>
> Hello,
>
> Context:
>
> We have a stateless Flink Pipeline which reads from Kafka topics.
> The pipeline has a Windowing operator(Used only for introducing a delay in 
> processing records) and AsyncI/O operators (used for Lookup/Enrichment).
>
> "At least Once" Processing semantics is needed for the pipeline to avoid data 
> loss.
>
> Checkpointing is disabled and we are dependent on the auto offset commit of 
> Kafka consumer for fault tolerance currently.
>
> As auto offset commit indicates that "the record is successfully read", 
> instead of "the record is successfully processed", there will be data loss if 
> there is a restart when the offset is committed to Kafka but not successfully 
> processed by the Flink Pipeline, as the record is NOT replayed again when the 
> pipeline is restarted.
>
> Checkpointing can solve this problem. But, since the pipeline is stateless, 
> we do not want to use checkpointing, which will persist all the records in 
> Windowing Operator and in-flight Async I/O calls.
>
> Question:
>
> We are looking for other ways to guarantee "at least once" processing without 
> checkpointing. One such way is to manage Kafka Offsets Externally.
>
> We can maintain offsets of each partition of each topic in Cassandra(or 
> maintain timestamp, where all records with timestamps less than this 
> timestamp are successfully processed) and configure Kafka consumer Start 
> Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
>
> This will be helpful if the pipeline is manually restarted (say, JobManager 
> pod is restarted). But, how to avoid data loss in case of internal restarts?
>
> Has anyone used this approach?
> What are other ways to guarantee "at least once" processing without 
> checkpointing for a stateless Flink pipeline?
>
> Thanks,
> Rahul


Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Rahul Patwari
Hello,

*Context*:

We have a stateless Flink Pipeline which reads from Kafka topics.
The pipeline has a Windowing operator(Used only for introducing a delay in
processing records) and AsyncI/O operators (used for Lookup/Enrichment).

"At least Once" Processing semantics is needed for the pipeline to avoid
data loss.

Checkpointing is disabled and we are dependent on the auto offset commit of
Kafka consumer for fault tolerance currently.

As auto offset commit indicates that "the record is successfully read",
instead of "the record is successfully processed", there will be data loss
if there is a restart when the offset is committed to Kafka but not
successfully processed by the Flink Pipeline, as the record is NOT replayed
again when the pipeline is restarted.

Checkpointing can solve this problem. But, since the pipeline is stateless,
we do not want to use checkpointing, which will persist all the records in
Windowing Operator and in-flight Async I/O calls.

*Question*:

We are looking for other ways to guarantee "at least once" processing
without checkpointing. One such way is to manage Kafka Offsets Externally.

We can maintain offsets of each partition of each topic in Cassandra(or
maintain timestamp, where all records with timestamps less than this
timestamp are successfully processed) and configure Kafka consumer Start
Position

- setStartFromTimestamp() or setStartFromSpecificOffsets()

This will be helpful if the pipeline is manually restarted (say, JobManager
pod is restarted). *But, how to avoid data loss in case of internal
restarts?*

Has anyone used this approach?
What are other ways to guarantee "at least once" processing without
checkpointing for a stateless Flink pipeline?

Thanks,
Rahul


Re: how to convert DataStream to Table

2021-04-12 Thread Roman Khachatryan
Hi,

I'm pulling in Timo and Jark as they know Table API better.

Regards,
Roman

On Sun, Apr 11, 2021 at 3:36 PM vtygoss  wrote:
>
> Hi All,
>
>
> there is a scenario where I need to process OGG Log data in kafka using Flink 
> Sql. I can convert the OGG Log Stream to DataStream and each event 
> has RowKind, but i have trouble converting DataStream to a Table.
>
> For test, i tried StreamTableEnvironment#fromDataStream and 
> createTemporaryView API, both TableSchema is
>
> ```
>
> root
>
>  |-- f0: LEGACY('RAW', 'ANY')
>
> ```
>
>
> i want to get the schema :
>
>
> ```
>
> root
>
>  |— column1: Type,
>
>  |— column2: Type,
>
> ….
>
> ```
>
>
>
> how to convert DataStream with RowKind to Table?
>
>
>
> Thank you very much for your reply
>
>


Flink 1.11.4?

2021-04-12 Thread Maciek Próchniak

Hello,

I'd like to ask if there are any plans to release 1.11.4 - I understand 
it will be last bugfix release for 1.11.x branch, as 1.13.0 is "just 
round the corner"?


There are a few fixes we'd like to use - e.g. 
https://issues.apache.org/jira/browse/FLINK-9844, 
https://issues.apache.org/jira/browse/FLINK-21164



thanks,

maciek



Re: Re: Does it support rate-limiting in flink 1.12?

2021-04-12 Thread Roman Khachatryan
In Flink, you can only limit memory usage, e.g. via
taskmanager.memory.process.size [1]
(throttling could be implemented using the DataStream API, but you
mentioned you are using SQL).
Quotas on other resources can be set in the underlying resource manager.

But I'd suggest investigating the failure and understand what's
causing it. Probably, high resource usage is not the root cause.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#memory-configuration

Regards,
Roman

On Mon, Apr 12, 2021 at 10:17 AM 张颖  wrote:
>
> Hi,
> This is not my intention.
> I was meaning that I run stream jobs and batch jobs in the same cluster, but 
> the batch job almost preemption all the resource in the cluster(maybe lead to 
> the machine loadaveage to 150 or cpu to 100% or disk io to 100%), which lead 
> my steam job to a series of problems (such as tm lost and connection time 
> out). So I want wo limit the speed of processing data on batch job.
>
>
>
>
>
>
>
> At 2021-04-12 15:49:31, "Roman Khachatryan"  wrote:
> >Hi,
> >
> >I'm not sure that I fully understand your question.
> >Is the intention to prioritize some jobs over the others in the same
> >Flink cluster? Currently, it is not possible (FLIP-156 and further
> >work aim to address this [1]). At the moment, you can either
> >- deploy the jobs in separate clusters (per-job mode [2]) and rely on
> >the underlying resource manager for resource isolation
> >- or allocate less task slots to a lower priority job by configuring:
> >parallelism, operator chaining and slot sharing groups
> >
> >[1] 
> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements
> >[2] 
> >https://ci.apache.org/projects/flink/flink-docs-stable/deployment/#per-job-mode
> >
> >Regards,
> >Roman
> >
> >
> >
> >On Mon, Apr 12, 2021 at 9:07 AM 张颖  wrote:
> >>
> >> When I run a sql job with blink planner in my cluster,the task is almost 
> >> preemption the whole resources in the cluster,  and this is a bad effect 
> >> to the stream task.As it is not necessary on speed,so is there any way to 
> >> control the rate in my batch task?
> >>
> >>
> >>
> >> this is the machine performance in running some operator:
> >> https://issues.apache.org/jira/browse/FLINK-22204
> >>
> >>
> >>
> >>
>
>
>
>


Re: How to know if task-local recovery kicked in for some nodes?

2021-04-12 Thread Till Rohrmann
Hi Sonam,

The state size probably depends a bit on your infrastructure. Assuming you
have 1 GBps network connection and local SSDs, then I guess you should see
a difference if your local state size is  > 1 GB.

Cheers,
Till

On Wed, Apr 7, 2021 at 1:46 PM Sonam Mandal  wrote:

> Hi Till and Dhanesh,
>
> Thanks for the insights into both on how to check that this kicks in and
> on the expected behavior. My understanding too was that if multiple TMs are
> used for the job, any TMs that don’t go down can take advantage of local
> recovery.
>
> Do you have any insights on a good minimum state size we should experiment
> with to check recovery time differences between the two modes?
>
> Thanks,
> Sonam
> --
> *From:* dhanesh arole 
> *Sent:* Wednesday, April 7, 2021 3:43:11 AM
> *To:* Till Rohrmann 
> *Cc:* Sonam Mandal ; Tzu-Li (Gordon) Tai <
> tzuli...@apache.org>; user@flink.apache.org 
> *Subject:* Re: How to know if task-local recovery kicked in for some
> nodes?
>
> Hi Till,
>
> You are right. To give you more context about our setup, we are running 1
> task slot per task manager and total number of task manager replicas equal
> to job parallelism. The issue actually exacerbates during rolling
> deployment of task managers as each TM goes offline and comes back online
> again after some time. So during bouncing of every TM pod somehow task
> allocation changes and finally job stabilises once all TMs are restarted.
> Maybe a proper blue green setup would allow us to make the best use of
> local recovery during restart of TMs. But during intermittent failures of
> one of the TMs local recovery works as expected on the other healthy TM
> instances ( I.e it does not download from remote ).
>
> On Wed, 7 Apr 2021 at 10:35 Till Rohrmann  wrote:
>
> Hi Dhanesh,
>
> if some of the previously used TMs are still available, then Flink should
> try to redeploy tasks onto them also in case of a global failover. Only
> those tasks which have been executed on the lost TaskManager will need new
> slots and have to download the state from the remote storage.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole 
> wrote:
>
> Hi Sonam,
>
> We have a similar setup. What I have observed is, when the task manager
> pod gets killed and restarts again ( i.e. the entire task manager process
> restarts ) then local recovery doesn't happen. Task manager restore process
> actually downloads the latest completed checkpoint from the remote state
> handle even when the older localState data is available. This happens
> because with every run allocation-ids for tasks running on task manager
> change as task manager restart causes global job failure and restart.
>
> Local recovery - i.e task restore process using locally stored checkpoint
> data kicks in when the task manager process is alive but due to some other
> reason ( like timeout from sink or external dependency ) one of the tasks
> fails and the flink job gets restarted by the job manager.
>
> Please CMIIW
>
>
> -
> Dhanesh Arole
>
> On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann 
> wrote:
>
> Hi Sonam,
>
> The easiest way to see whether local state has been used for recovery is
> the recovery time. Apart from that you can also look for "Found registered
> local state for checkpoint {} in subtask ({} - {} - {}" in the logs which
> is logged on debug. This indicates that the local state is available.
> However, it does not say whether it is actually used. E.g. when doing a
> rescaling operation we change the assignment of key group ranges which
> prevents local state from being used. However in case of a recovery the
> above-mentioned log message should indicate that we use local state
> recovery.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi Sonam,
>
> Pulling in Till (cc'ed), I believe he would likely be able to help you
> here.
>
> Cheers,
> Gordon
>
> On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal  wrote:
>
> Hello,
>
> We are experimenting with task local recovery and I wanted to know whether
> there is a way to validate that some tasks of the job recovered from the
> local state rather than the remote state.
>
> We've currently set this up to have 2 Task Managers with 2 slots each, and
> we run a job with parallelism 4. To simulate failure, we kill one of the
> Task Manager pods (we run on Kubernetes). I want to see if the local state
> of the other Task Manager was used or not. I do understand that the state
> for the killed Task Manager will need to be fetched from the checkpoint.
>
> Also, do you have any suggestions on how to test such failure scenarios in
> a better way?
>
> Thanks,
> Sonam
>
> --
> - Dhanesh ( sent from my mobile device. Pardon me for any typos )
>


Re: Does it support rate-limiting in flink 1.12?

2021-04-12 Thread Roman Khachatryan
Hi,

I'm not sure that I fully understand your question.
Is the intention to prioritize some jobs over the others in the same
Flink cluster? Currently, it is not possible (FLIP-156 and further
work aim to address this [1]). At the moment, you can either
- deploy the jobs in separate clusters (per-job mode [2]) and rely on
the underlying resource manager for resource isolation
- or allocate less task slots to a lower priority job by configuring:
parallelism, operator chaining and slot sharing groups

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/#per-job-mode

Regards,
Roman



On Mon, Apr 12, 2021 at 9:07 AM 张颖  wrote:
>
> When I run a sql job with blink planner in my cluster,the task is almost 
> preemption the whole resources in the cluster,  and this is a bad effect to 
> the stream task.As it is not necessary on speed,so is there any way to 
> control the rate in my batch task?
>
>
>
> this is the machine performance in running some operator:
> https://issues.apache.org/jira/browse/FLINK-22204
>
>
>
>


Does it support rate-limiting in flink 1.12?

2021-04-12 Thread 张颖
When I run a sql job with blink planner in my cluster,the task is almost 
preemption the whole resources in the cluster,  and this is a bad effect to the 
stream task.As it is not necessary on speed,so is there any way to control the 
rate in my batch task?

 

this is the machine performance in running some operator:
https://issues.apache.org/jira/browse/FLINK-22204