Re: Source vs SourceFunction and testing

2022-05-24 Thread Qingsheng Ren
Hi Piotr,

I’d like to share my understanding about this. Source and SourceFunction are 
both interfaces to data sources. SourceFunction was designed and introduced 
earlier and as the project evolved, many shortcomings emerged. Therefore, the 
community re-designed the source interface and introduced the new Source API in 
FLIP-27 [1]. 

Finally we will deprecate the SourceFunction and use Source as the only 
interface for all data sources, but considering the huge cost of migration 
you’ll see SourceFunction and Source co-exist for some time, like the 
ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource as 
a pioneer has already migrated to the new Source API.

I think the API to end users didn't change a lot: both 
env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, 
and you could apply downstream transformations onto it. 

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 

Cheers,

Qingsheng

> On May 25, 2022, at 03:19, Piotr Domagalski  wrote:
> 
> Hi Ken,
> 
> Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> navigating the type system and being still confused about differences between 
> Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> 
> I think the DataStream<> type is what I'm looking for? That is, then I can 
> use:
> 
> DataStream source = env.fromSource(getKafkaSource(params), 
> watermarkStrategy, "Kafka");
> when using KafkaSource in the normal setup
> 
> and
> DataStream s = env.addSource(new ParallelTestSource<>(...));
> when using the testing source [1]
> 
> Does that sound right?
> 
> [1] 
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> 
> On Tue, May 24, 2022 at 7:57 PM Ken Krugler  
> wrote:
> Hi Piotr,
> 
> The way I handle this is via a workflow class that uses a builder approach to 
> specifying inputs, outputs, and any other configuration settings.
> 
> The inputs are typically DataStream.
> 
> This way I can separate out the Kafka inputs, and use testing sources that 
> give me very precise control over the inputs (e.g. I can hold up on right 
> side data to ensure my stateful left join junction is handling deferred joins 
> properly). I can also use Kafka unit test support (either kafka-junit or 
> Spring embedded Kafka) if needed.
> 
> Then in the actual tool class (with a main method) I’ll wire up the real 
> Kafka sources, with whatever logic is required to convert the consumer 
> records to what the workflow is expecting.
> 
> — Ken
> 
>> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
>> 
>> Hi,
>> 
>> I'm wondering: what ithe recommended way to structure the job which one 
>> would like to test later on with `MiniCluster`.
>> 
>> I've looked at the flink-training repository examples [1] and they tend to 
>> expose the main job as a class that accepts a `SourceFunction` and a 
>> `SinkFunction`, which make sense. But then, my job is normally constructed 
>> with `KafkaSource` which is then passed to `env.fromSource(...`.
>> 
>> Is there any recommended way of handling these discrepancies, ie. having to 
>> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>> 
>> [1] 
>> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>> 
>> -- 
>> Piotr Domagalski
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 
> 
> -- 
> Piotr Domagalski



Re: accuracy validation of streaming pipeline

2022-05-24 Thread Leonard Xu
Hi, vtygoss

> I'm working on migrating from full-data-pipeline(with spark) to 
> incremental-data-pipeline(with flink cdc), and i met a problem about accuracy 
> validation between pipeline based flink and spark.

Glad to hear that !



> For bounded data, it's simple to validate the two result sets are consitent 
> or not. 
> But, for unbouned data and event-driven application, how to make sure the 
> data stream produced is correct, especially when there are some retract 
> functions with high impactions, e.g. row_number. 
> 
> Is there any document for this preblom?  Thanks for your any suggestions or 
> replies. 

The validation feature belongs data quality scope from my understanding, it’s 
usually provided by the platform e.g. the Data Integration Platform. As the 
underlying pipeline engine/tools, Flink CDC should expose more metrics or data 
quality checking abilities but we didn’t offers them yet, and these 
enhancements is on our roadmap.  Currently, you can use Flink source/sink 
operator’s metric as a rough validation, you can also compare the records count 
in your source database and sink system multiple times for more accurate 
validation.

Best,
Leonard



Re: accuracy validation of streaming pipeline

2022-05-24 Thread Shengkai Fang
Hi, all.

>From my understanding, the accuracy for the sync pipeline requires to
snapshot the source and sink at some points.  It is just like we have a
checkpoint that contains all the data at some time for both sink and
source. Then we can compare the content in the checkpoint and find the
difference.

The main problem is how can we snapshot the data in the source/sink or
provide some meaningful metrics to compare at the points.

Best,
Shengkai

Xuyang  于2022年5月24日周二 21:32写道:

> I think for an unbounded data, we can only check the result at one point
> of time, that is the work what Watermark[1] does. What about tag one time
> and to validate the data accuracy at that moment?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#watermark
>
> 在 2022-05-20 16:02:39,"vtygoss"  写道:
>
> Hi community!
>
>
> I'm working on migrating from full-data-pipeline(with spark) to
> incremental-data-pipeline(with flink cdc), and i met a problem about
> accuracy validation between pipeline based flink and spark.
>
>
> For bounded data, it's simple to validate the two result sets are
> consitent or not.
>
> But, for unbouned data and event-driven application, how to make sure the
> data stream produced is correct, especially when there are some retract
> functions with high impactions, e.g. row_number.
>
>
> Is there any document for this preblom?  Thanks for your any suggestions
> or replies.
>
>
> Best Regards!
>
>


Re: Application mode deployment through API call

2022-05-24 Thread Shengkai Fang
Hi, Peter.

I am not sure whether this doc is enough or not. The doc[1] lists all the
available REST API in the Flink runtime now. You can use the RestClient[2]
to send request to the JM for later usage.

Best,
Shengkai

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
[2]
https://github.com/apache/flink/blob/646ff2d36f40704f5dca017b8fffed78bd51b307/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java

Peter Schrott  于2022年5月24日周二 19:52写道:

> Hi Vikash,
>
> Could you be more precise about the shared libraries? Is there any
> documentation about this?
>
> Thanks, Peter
>
> On Tue, May 24, 2022 at 1:23 PM Vikash Dat  wrote:
>
>> Similar to agent Biao, Application mode is okay if you only have a single
>> app, but when running multiple apps session mode is better for control. In
>> my experience, the CLIFrontend is not as robust as the REST API, or you
>> will end up having to rebuild a very similar Rest API. For the meta space
>> issue, have you tried adding shared libraries to the flink lib folder?
>>
>> On Mon, May 23, 2022 at 23:31 Shengkai Fang  wrote:
>>
>>> Hi, all.
>>>
>>> > is there any plan in the Flink community to provide an easier way of
>>> deploying Flink with application mode on YARN
>>>
>>> Yes. Jark has already opened a ticket about how to use the sql client to
>>> submit the SQL in application mode[1]. What's more, in FLIP-222 we are able
>>> to manage the jobs in SQL, which will list all submitted jobs and their web
>>> UI[2].
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-26541
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-222%3A+Support+full+query+lifecycle+statements+in+SQL+client
>>>
>>>
>>>


Re: Flink Job Manager unable to recognize Task Manager Available slots

2022-05-24 Thread Teoh, Hong
Hi Sunitha,

Without more information about your setup, I would assume you are trying to 
return JobManager (and HA setup) into a stable state. A couple of questions:

  *   Since your job is cancelled, I would assume that the current job’s HA 
state is not important, so we can delete the checkpoint pointer and data.
  *   Are there other jobs running on the same cluster whose HA state you want 
to salvage?

I can think of the following options:

  1.  If there are no other jobs running on the same cluster, and the HA state 
is not important, the easiest way is to totally replace your Zookeeper 
instances. (this will start the JobManager afresh, but will cause the HA state 
for all other jobs running on the same cluster to be lost)
  2.  Manually clear the Zookeeper HA state for the problematic job. This will 
keep the HA state of other jobs running on the same cluster.

To perform step 2, see below:
The zookeeper stores “Active” jobs in a znode hierarchy as shown below (You can 
imagine this like a pseudo file system). I am assuming the jobid you have 
pasted in logs.


  *   /flink/default/running_job_registry/3a97d1d50f663027ae81efe0f0aa
This has the status of the job (e.g. RUNNING)

  *   /flink/default/leader/resource_manager_lock
This has the information about which JM has the ResourceManager (which is the 
component responsible for registering the task slots in the cluster

There are other znodes as well, which are all interesting (e.g. 
/flink/default/checkpoints, /flink/default/checkpoint-counter), but I’ve 
highlighted the relevant ones.

To clear this, you can simply log unto your zookeeper nodes, and delete the 
znodes. The JobManager will repopulate them when the job starts up.

  1.  Log unto your zookeeper nodes (e.g. execute into your zookeeper container)
  2.  Execute the zookeeper CLI. This usually comes prepackaged with zookeeper, 
and you can simply run the pre-packaged script bin/zkCli.sh.

Explore the pseudo-file system by doing ls or get (e.g. ls /flink/default )

  3.  You can delete the znodes associated to your job

rmr /flink/default/running_job_registry/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/jobgraphs/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/checkpoints/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/checkpoint-counter/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/leaderlatch/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/leader/3a97d1d50f663027ae81efe0f0aa

This should result in your JobManager recovering from the faulty job.

Regards,
Hong






From: "s_penakalap...@yahoo.com" 
Date: Tuesday, 24 May 2022 at 18:40
To: User 
Subject: RE: [EXTERNAL]Flink Job Manager unable to recognize Task Manager 
Available slots


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Team,

Any inputs please badly stuck.

Regards,
Sunitha

On Sunday, May 22, 2022, 12:34:22 AM GMT+5:30, s_penakalap...@yahoo.com 
 wrote:


Hi All,

Help please!

We have standalone Flink service installed in individual VM and clubed to form 
a cluster with HA and checkpoint in place. When cancelling Job, Flink cluster 
went down and its unable to start up normally as Job manager is continuously 
going down with the below error:

2022-05-21 14:33:09,314 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 
3a97d1d50f663027ae81efe0f0aa.

Each attempt to restart cluster failed with the same error so the whole cluster 
became unrecoverable and not operating, please help on the below points:
1> In which Fink/zookeeper folder job recovery details are stored and how can 
we clear all old job instance so that Flink cluster will not try to recover and 
start fresh to manually submit all job.

2> Since cluster is HA, we have 2 Job manager's even though one JM is going 
down Flink is started but available slots are showing up as 0 (task manager's 
are up but not displayed in web UI).

Regards
Sunitha.



Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
Hi Piotr,

Yes, that should work (using DataStream as the common result of both 
source creation options)

— Ken

> On May 24, 2022, at 12:19 PM, Piotr Domagalski  wrote:
> 
> Hi Ken,
> 
> Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> navigating the type system and being still confused about differences between 
> Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> 
> I think the DataStream<> type is what I'm looking for? That is, then I can 
> use:
> 
> DataStream source = env.fromSource(getKafkaSource(params), 
> watermarkStrategy, "Kafka");
> when using KafkaSource in the normal setup
> 
> and
> DataStream s = env.addSource(new ParallelTestSource<>(...));
> when using the testing source [1]
> 
> Does that sound right?
> 
> [1] 
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
>  
> 
> On Tue, May 24, 2022 at 7:57 PM Ken Krugler  > wrote:
> Hi Piotr,
> 
> The way I handle this is via a workflow class that uses a builder approach to 
> specifying inputs, outputs, and any other configuration settings.
> 
> The inputs are typically DataStream.
> 
> This way I can separate out the Kafka inputs, and use testing sources that 
> give me very precise control over the inputs (e.g. I can hold up on right 
> side data to ensure my stateful left join junction is handling deferred joins 
> properly). I can also use Kafka unit test support (either kafka-junit or 
> Spring embedded Kafka) if needed.
> 
> Then in the actual tool class (with a main method) I’ll wire up the real 
> Kafka sources, with whatever logic is required to convert the consumer 
> records to what the workflow is expecting.
> 
> — Ken
> 
>> On May 24, 2022, at 8:34 AM, Piotr Domagalski > > wrote:
>> 
>> Hi,
>> 
>> I'm wondering: what ithe recommended way to structure the job which one 
>> would like to test later on with `MiniCluster`.
>> 
>> I've looked at the flink-training repository examples [1] and they tend to 
>> expose the main job as a class that accepts a `SourceFunction` and a 
>> `SinkFunction`, which make sense. But then, my job is normally constructed 
>> with `KafkaSource` which is then passed to `env.fromSource(...`.
>> 
>> Is there any recommended way of handling these discrepancies, ie. having to 
>> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>> 
>> [1] 
>> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>>  
>> 
>> 
>> -- 
>> Piotr Domagalski
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com 
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi Ken,

Thanks Ken. I guess the problem I had was, as a complete newbie to Flink,
navigating the type system and being still confused about differences
between Source, SourceFunction, DataStream, DataStreamOperator, etc.

I think the DataStream<> type is what I'm looking for? That is, then I can
use:

DataStream source = env.fromSource(getKafkaSource(params),
watermarkStrategy, "Kafka");
when using KafkaSource in the normal setup

and
DataStream s = env.addSource(new ParallelTestSource<>(...));
when using the testing source [1]

Does that sound right?

[1]
https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26

On Tue, May 24, 2022 at 7:57 PM Ken Krugler 
wrote:

> Hi Piotr,
>
> The way I handle this is via a workflow class that uses a builder approach
> to specifying inputs, outputs, and any other configuration settings.
>
> The inputs are typically DataStream.
>
> This way I can separate out the Kafka inputs, and use testing sources that
> give me very precise control over the inputs (e.g. I can hold up on right
> side data to ensure my stateful left join junction is handling deferred
> joins properly). I can also use Kafka unit test support (either kafka-junit
> or Spring embedded Kafka) if needed.
>
> Then in the actual tool class (with a main method) I’ll wire up the real
> Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
>
> — Ken
>
> On May 24, 2022, at 8:34 AM, Piotr Domagalski 
> wrote:
>
> Hi,
>
> I'm wondering: what ithe recommended way to structure the job which one
> would like to test later on with `MiniCluster`.
>
> I've looked at the flink-training repository examples [1] and they tend to
> expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fromSource(...`.
>
> Is there any recommended way of handling these discrepancies, ie. having
> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>
> [1]
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>
> --
> Piotr Domagalski
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

-- 
Piotr Domagalski


Flink DataStream and remote Stateful Functions interoperability

2022-05-24 Thread Himanshu Sareen
Team,

I'm working on a POC where our existing Stateful Functions ( remote ) can 
interact with Datastream API.
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/flink-datastream/

I started Flink cluster - ./bin/start-cluster.sh
Then I submitted the .jar to Flink.

However, on submitting only Embedded function is called by Datastream code.

I'm unable to invoke stateful functions as module.yaml is not loaded.

Can someone help me in understanding how can we deploy Stateful function code 
(module.yaml) and Datastream api code parllely on Flink cluster.


Regards
Himanshu



Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
Hi Piotr,

The way I handle this is via a workflow class that uses a builder approach to 
specifying inputs, outputs, and any other configuration settings.

The inputs are typically DataStream.

This way I can separate out the Kafka inputs, and use testing sources that give 
me very precise control over the inputs (e.g. I can hold up on right side data 
to ensure my stateful left join junction is handling deferred joins properly). 
I can also use Kafka unit test support (either kafka-junit or Spring embedded 
Kafka) if needed.

Then in the actual tool class (with a main method) I’ll wire up the real Kafka 
sources, with whatever logic is required to convert the consumer records to 
what the workflow is expecting.

— Ken

> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
> 
> Hi,
> 
> I'm wondering: what ithe recommended way to structure the job which one would 
> like to test later on with `MiniCluster`.
> 
> I've looked at the flink-training repository examples [1] and they tend to 
> expose the main job as a class that accepts a `SourceFunction` and a 
> `SinkFunction`, which make sense. But then, my job is normally constructed 
> with `KafkaSource` which is then passed to `env.fromSource(...`.
> 
> Is there any recommended way of handling these discrepancies, ie. having to 
> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> 
> [1] 
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>  
> 
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Flink Job Manager unable to recognize Task Manager Available slots

2022-05-24 Thread s_penakalap...@yahoo.com
 Hi Team,
Any inputs please badly stuck.
Regards,Sunitha
On Sunday, May 22, 2022, 12:34:22 AM GMT+5:30, s_penakalap...@yahoo.com 
 wrote:  
 
 Hi All,
Help please!
We have standalone Flink service installed in individual VM and clubed to form 
a cluster with HA and checkpoint in place. When cancelling Job, Flink cluster 
went down and its unable to start up normally as Job manager is continuously 
going down with the below error:
2022-05-21 14:33:09,314 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
occurred in the cluster entrypoint.java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 
3a97d1d50f663027ae81efe0f0aa.
Each attempt to restart cluster failed with the same error so the whole cluster 
became unrecoverable and not operating, please help on the below points:1> In 
which Fink/zookeeper folder job recovery details are stored and how can we 
clear all old job instance so that Flink cluster will not try to recover and 
start fresh to manually submit all job.
2> Since cluster is HA, we have 2 Job manager's even though one JM is going 
down Flink is started but available slots are showing up as 0 (task manager's 
are up but not displayed in web UI).
RegardsSunitha.
  

Re: TolerableCheckpointFailureNumber not always applying

2022-05-24 Thread Gaël Renoux
I get the idea, but in our case this was a transient error: it was a
network issue, which was solved later without any change in Flink (see last
line of stack-trace). Errors in the sync phase are not always non-transient
(in our case, they are pretty much never).

To be honest, I have trouble imagining a case in Production where you'd
want the job to fail if a checkpoint fails. In a test environment, sure,
you want to crash as soon as possible if something goes wrong. But in
Production? I'd rather continue working as long as my job can work. Sure,
I'm in trouble if the job crashes and I don't have a recent checkpoint -
but then, crashing exactly when I don't have a checkpoint is the worst
thing that can happen.

So I think the safest solution (when Flink is in doubt about whether it's
transient or not) would be to assume the error is transient and apply the
tolerable failures configuration. In that case, the worst case scenario is
that your job goes on, and you have to verify your checkpoints to see if
everything is alright - which is something you should always do anyway, in
case you have a transient error that's not going away.



On Tue, May 24, 2022 at 6:04 AM Hangxiang Yu  wrote:

> In my opinion,  some exceptions in the async phase like timeout may happen
> related to the network, state size which will change, so maybe next time
> these failures will not occur. So the config makes sense for these.
> But this failure in the sync phase usually means the program will always
> fail and it will influence the. normal procedure, it has to be stopped.
> If you don't need to recover from the checkpoint, maybe you could disable
> it. But it's not recommended for a streaming job.
>
> Best,
> Hangxiang.
>
> On Tue, May 24, 2022 at 12:51 AM Gaël Renoux 
> wrote:
>
>> Got it, thank you. I misread the documentation and thought the async
>> referred to the task itself, not the process of taking a checkpoint.
>>
>> I guess there is currently no way to make a job never fail on a failed
>> checkpoint?
>>
>> Gaël Renoux - Lead R&D Engineer
>> E - gael.ren...@datadome.co
>> W - www.datadome.co
>>
>>
>>
>> On Mon, May 23, 2022 at 4:35 PM Hangxiang Yu  wrote:
>>
>>> Hi, Gaël Renoux.
>>> As you could see in [1], There are some descriptions about the config:
>>> "This only applies to the following failure reasons: IOException on the
>>> Job Manager, failures in the async phase on the Task Managers and
>>> checkpoint expiration due to a timeout. Failures originating from the sync
>>> phase on the Task Managers are always forcing failover of an affected task.
>>> Other types of checkpoint failures (such as checkpoint being subsumed) are
>>> being ignored."
>>>
>>> From the stack-trace, I see the exception is thrown in the sync phase.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 5:18 PM Gaël Renoux 
>>> wrote:
>>>
 Hello everyone,

 We're having an issue on our Flink job: it restarted because it failed
 a checkpoint, even though it shouldn't have. We've set the
 tolerableCheckpointFailureNumber to 1 million to never have the job restart
 because of this. However, the job did restart following a checkpoint
 failure in a Kafka sink (stack-trace below).

 I'm about to open an issue on Flink's Jira, but I thought I'd check if
 I'm missing something first. Is there a known limitation somewhere? Or
 should the tolerableCheckpointFailureNumber apply in that case?

 The stack-trace:

 Sink: result-to-kafka (1/1)#0 (0dbd01ad4663f5dd642569381694f57e)
> switched from RUNNING to FAILED with failure cause: java.io.IOException:
> Could not perform checkpoint 853 for operator Sink: result-to-kafka 
> (1/1)#0.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.stre

Re: Source vs SourceFunction and testing

2022-05-24 Thread Aeden Jameson
Depending on the kind of testing you're hoping to do you may want to
look into https://github.com/mguenther/kafka-junit. For example,
you're looking for some job level smoke tests that just answer the
question "Is everything wired up correctly?"  Personally, I like how
this approach doesn't require you to open up the design for the sake
of testing.


On Tue, May 24, 2022 at 8:34 AM Piotr Domagalski  wrote:
>
> Hi,
>
> I'm wondering: what ithe recommended way to structure the job which one would 
> like to test later on with `MiniCluster`.
>
> I've looked at the flink-training repository examples [1] and they tend to 
> expose the main job as a class that accepts a `SourceFunction` and a 
> `SinkFunction`, which make sense. But then, my job is normally constructed 
> with `KafkaSource` which is then passed to `env.fromSource(...`.
>
> Is there any recommended way of handling these discrepancies, ie. having to 
> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>
> [1] 
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>
> --
> Piotr Domagalski



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj


Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi,

I'm wondering: what ithe recommended way to structure the job which one
would like to test later on with `MiniCluster`.

I've looked at the flink-training repository examples [1] and they tend to
expose the main job as a class that accepts a `SourceFunction` and a
`SinkFunction`, which make sense. But then, my job is normally constructed
with `KafkaSource` which is then passed to `env.fromSource(...`.

Is there any recommended way of handling these discrepancies, ie. having to
use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?

[1]
https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61

-- 
Piotr Domagalski


Re:accuracy validation of streaming pipeline

2022-05-24 Thread Xuyang
I think for an unbounded data, we can only check the result at one point of 
time, that is the work what Watermark[1] does. What about tag one time and to 
validate the data accuracy at that moment?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#watermark



在 2022-05-20 16:02:39,"vtygoss"  写道:

Hi community!




I'm working on migrating from full-data-pipeline(with spark) to 
incremental-data-pipeline(with flink cdc), and i met a problem about accuracy 
validation between pipeline based flink and spark.




For bounded data, it's simple to validate the two result sets are consitent or 
not. 

But, for unbouned data and event-driven application, how to make sure the data 
stream produced is correct, especially when there are some retract functions 
with high impactions, e.g. row_number. 




Is there any document for this preblom?  Thanks for your any suggestions or 
replies. 




Best Regards! 

Re: Python Job Type Support in Flink Kubernetes Operator

2022-05-24 Thread Gyula Fóra
Hi Jeesmon!

Sorry I completely missed this question earlier :)

There is no support currently for Python jobs and I don't really have any
experience with Python jobs so cannot really comment on how easy it would
be to integrate it.

We and most of the companies currently involved with developing the
operators focus on Flink production jobs built on Java so this feature is
not on our radar at the moment.
If this is something interesting for you and you would like to investigate
and contribute to it we would be happy to help you along the way.

Cheers,
Gyula

On Tue, May 24, 2022 at 5:24 AM Jeesmon Jacob  wrote:

> Hi Gyula,
>
> Any idea on this? We are exploring current limitations of using the
> operator for Flink deployment and if there is a plan to support Python jobs
> in future will help us.
>
> Thanks,
> Jeesmon
>
> On Fri, May 20, 2022 at 3:46 PM Jeesmon Jacob  wrote:
>
>> Hi there,
>>
>> Is there a plan to support Python Job Type in Flink Kubernetes Operator?
>> If yes, any ETA?
>>
>> According to this previous operator overview only Java jobs are supported
>> in operator. This page was recently modified to remove the features table.
>>
>>
>> https://github.com/apache/flink-kubernetes-operator/blob/73369b851f2cd92a6818bb84e21157518d63a48d/docs/content/docs/concepts/overview.md
>>
>> Job Type Jar job full
>>
>> SQL Job no
>>
>> Python Job no
>>
>> Thanks,
>> Jeesmon
>>
>


Re: Python Job Type Support in Flink Kubernetes Operator

2022-05-24 Thread Jeesmon Jacob
Hi Gyula,

Any idea on this? We are exploring current limitations of using the
operator for Flink deployment and if there is a plan to support Python jobs
in future will help us.

Thanks,
Jeesmon

On Fri, May 20, 2022 at 3:46 PM Jeesmon Jacob  wrote:

> Hi there,
>
> Is there a plan to support Python Job Type in Flink Kubernetes Operator?
> If yes, any ETA?
>
> According to this previous operator overview only Java jobs are supported
> in operator. This page was recently modified to remove the features table.
>
>
> https://github.com/apache/flink-kubernetes-operator/blob/73369b851f2cd92a6818bb84e21157518d63a48d/docs/content/docs/concepts/overview.md
>
> Job Type Jar job full
>
> SQL Job no
>
> Python Job no
>
> Thanks,
> Jeesmon
>


Re: Application mode deployment through API call

2022-05-24 Thread Peter Schrott
Hi Vikash,

Could you be more precise about the shared libraries? Is there any
documentation about this?

Thanks, Peter

On Tue, May 24, 2022 at 1:23 PM Vikash Dat  wrote:

> Similar to agent Biao, Application mode is okay if you only have a single
> app, but when running multiple apps session mode is better for control. In
> my experience, the CLIFrontend is not as robust as the REST API, or you
> will end up having to rebuild a very similar Rest API. For the meta space
> issue, have you tried adding shared libraries to the flink lib folder?
>
> On Mon, May 23, 2022 at 23:31 Shengkai Fang  wrote:
>
>> Hi, all.
>>
>> > is there any plan in the Flink community to provide an easier way of
>> deploying Flink with application mode on YARN
>>
>> Yes. Jark has already opened a ticket about how to use the sql client to
>> submit the SQL in application mode[1]. What's more, in FLIP-222 we are able
>> to manage the jobs in SQL, which will list all submitted jobs and their web
>> UI[2].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-26541
>> [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-222%3A+Support+full+query+lifecycle+statements+in+SQL+client
>>
>>
>>


Re: Json Deserialize in DataStream API with array length not fixed

2022-05-24 Thread Qingsheng Ren
Hi Zain,

I assume you are using DataStream API as described in the subject of your 
email, so I think you can define any functions/transformations to parse the 
json value, even the schema is changing. 

It looks like the value of field “array_coordinates” is a an escaped 
json-formatted STRING instead of an json object, so I prefer to parse the input 
json string first using Jackson (or any json parser you like), extract the 
field “array_coordinates” as a string, remove all backslashs to un-escape the 
string, and use Jackson again to parse it. 

If you are using Table / SQL API, I’m afaid you have to use UDTF to parse the 
input because the schema varies in the field “array_coordinates”. 

Hope this could be helpful!

Cheers, 

Qingsheng

> On May 21, 2022, at 14:58, Zain Haider Nemati  wrote:
> 
> Hi Folks,
> I have data coming in this format:
> 
> {
> “data”: {
> “oid__id”:  “61de4f26f01131783f162453”,
> “array_coordinates”:“[ { \“speed\” : \“xxx\“, \“accuracy\” : 
> \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” : 
> \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : { 
> \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” : 
> \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : 
> \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : 
> \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
> “batchId”:  “xxx",
> “agentId”:  “xxx",
> “routeKey”: “40042-12-01-2022",
> “__v”:  0
> },
> “metadata”: {
> “timestamp”:“2022-05-02T18:49:52.619827Z”,
> “record-type”:  “data”,
> “operation”:“load”,
> “partition-key-type”:   “primary-key”,
> “schema-name”:  “xxx”,
> “table-name”:   “xxx”
> }
> }
> 
> Where length of array coordinates array varies is not fixed in the source is 
> their any way to define a json deserializer for this? If so would really 
> appreciate if I can get some help on this



Re: Application mode deployment through API call

2022-05-24 Thread Vikash Dat
Similar to agent Biao, Application mode is okay if you only have a single
app, but when running multiple apps session mode is better for control. In
my experience, the CLIFrontend is not as robust as the REST API, or you
will end up having to rebuild a very similar Rest API. For the meta space
issue, have you tried adding shared libraries to the flink lib folder?

On Mon, May 23, 2022 at 23:31 Shengkai Fang  wrote:

> Hi, all.
>
> > is there any plan in the Flink community to provide an easier way of
> deploying Flink with application mode on YARN
>
> Yes. Jark has already opened a ticket about how to use the sql client to
> submit the SQL in application mode[1]. What's more, in FLIP-222 we are able
> to manage the jobs in SQL, which will list all submitted jobs and their web
> UI[2].
>
> [1] https://issues.apache.org/jira/browse/FLINK-26541
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-222%3A+Support+full+query+lifecycle+statements+in+SQL+client
>
>
>


Re: How can I set job parameter in flink sql

2022-05-24 Thread Qingsheng Ren
Hi,

You can take use of the configuration “pipeline.global-job-parameters” [1] to 
pass your custom configs all the way into the UDF. For example you can execute 
this in SQL client:

SET pipeline.global-job-parameters=black_list_path:/root/list.properties;

Then you can get the value “/root/list.properties” by 
context.getJobParameter(“black_list_path”, “your_default_value”) in the open() 
method of your UDF.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#pipeline-global-job-parameters

Cheers, 

Qingsheng

> On May 11, 2022, at 14:36, wang <24248...@163.com> wrote:
> 
> Hi dear engineer,
> 
> I want to override the function open() in my UDF, like:
> 
> 
> 
> In open() function, I want to fetch the configred value "black_list_path", 
> then simply print that value out. And I config this value in ./sql-client.sh 
> console:
> 
> SET black_list_path = /root/list.properties
> 
> Then I run this UDF, but what printed is /config/list.properties(this is the 
> default value as I set in context.getJobParameter("black_list_path", 
> "/config/list/properties")), not /root/list.properties which I set in 
> ./sql-client.sh console.
> 
> So could you please show me the correct way to set black_list_path is sql ? 
> Thanks so much!
> 
> 
> Thanks && Reards,
> Hunk
> 
> 
> 
> 
> 
>  



GlobalCommitter in Flink's two-phase commit

2022-05-24 Thread di wu
Hello
Regarding the GlobalCommitter in Flink's two-phase commit, 
I see it was introduced in FLIP-143, but it seems to have been removed again in 
FLP-191 and marked as Deprecated in the source code.
I haven't found any relevant information about the use of GlobalCommitter. 


There are two questions I would like to ask:
1. What are the general usage scenarios of GlobalCommitter?
2. Why should GlobalCommitter be removed in the new version of the api?


Thanks && Regards,


di.wu