Re: Restore from checkpoint

2024-05-20 Thread Jiadong Lu

Hi, Phil

I don't have more expertise about the flink-python module. But the error 
you have is a familiar error if you have written some code to handle 
directory path.


The correct form of Path/URI will be :
1. "/home/foo"
2. "file:///home/foo/boo"
3. "hdfs:///home/foo/boo"
4. or Win32 directory form

Best regards,
Jiadong Lu

On 2024/5/20 02:28, Phil Stavridis wrote:

Hi Lu,

Thanks for your reply. In what way are the paths to get passed to the job that needs 
to used the checkpoint? Is the standard way, using -s :/ or by passing 
the path in the module as a Python arg?

Kind regards
Phil


On 18 May 2024, at 03:19, jiadong.lu  wrote:

Hi Phil,

AFAIK, the error indicated your path was incorrect.
your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.

Best.
Jiadong.Lu

On 5/18/24 2:37 AM, Phil Stavridis wrote:

Hi,
I am trying to test how the checkpoints work for restoring state, but not sure 
how to run a new instance of a flink job, after I have cancelled it, using the 
checkpoints which I store in the filesystem of the job manager, e.g. 
/opt/flink/checkpoints.
I have tried passing the checkpoint as an argument in the function and use it 
while setting the checkpoint but it looks like the way it is done is something 
like below:
docker-compose exec jobmanager flink run -s 
:/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py /opt/app/flink_job.py
But I am getting error:
Caused by: java.io.IOException: Checkpoint/savepoint path 
':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file URI. 
Either the pointer path is invalid, or the checkpoint was created by a 
different state backend.
What is wrong with the  way the job is re-submitted to the cluster?
Kind regards
Phil




Re: JVM Metaspace for Task Mangers and Job Managers are not getting released.

2023-05-13 Thread Jiadong lu

Hi, Ajinkya

Maybe some threads in your job were not shut down when the job was closed?

Best,
Jiadong Lu

On 2023/5/13 4:58, Ajinkya Pathrudkar wrote:

Hello,

I am observing JVM Metaspace memory for Task Managers and Job Manager is 
not getting released. Any thoughts?


image.png


Thanks,
Ajinkya


Re: How to know when a pipeline ends

2023-05-13 Thread Jiadong lu

Hi Luke.

I hope this email finds you well. I wanted to share my agreement with 
Shammon's solution regarding your query. Additionally, I would like to 
provide some helpful hints that might assist you further:


1. To create a PackagedProgram, you can utilize the 
PackagedProgram.Builder class.
2. Building a JobGraph can be achieved by employing the 
PackagedProgramUtils.createJobGraph method.
3. Initializing a RestClusterClient with your Flink cluster 
configuration will allow you to interact with the cluster.

4. By submitting the jobgraph, you will obtain a JobID.
5. Finally, you can use the JobID to communicate with your job within 
the Flink cluster.


I hope these suggestions prove beneficial to you in your current 
endeavor. Should you require any further assistance, please do not 
hesitate to reach out. The solution that i mentioned below is my current 
solution of manage the flink job.


Best,
Jiadong Lu

On 2023/5/13 2:00, Luke Xiong wrote:

Hi Weihua and Shammon,

Thanks for the pointers.I tried both, unfortunately neither works.

By enabling "execution.attached", there doesn't seem to be any 
difference than the default settings. doSomeCleanupTasks() is called 
right away while the pipeline is still running; and 
env.executeAsync().getJobStatus() causes an exception:
     org.apache.flink.util.FlinkRuntimeException: The Job Status cannot 
be requested when in Web Submission.


FYI, I am using 1.15 and the job is submitted with */jars/:jarid/run*

Regards,
Luke

On Fri, May 12, 2023 at 1:32 AM Weihua Hu <mailto:huweihua@gmail.com>> wrote:



Hi, Luke

You can enable "execution.attached", then env.execute() will wait
until the job is finished.


[1]https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached
 
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached>

Best,
Weihua


On Fri, May 12, 2023 at 8:59 AM Shammon FY mailto:zjur...@gmail.com>> wrote:

Hi Luke,

Maybe you can get 'JobClient' after submit the job and check the
job status with 'JobClient.getJobStatus()'

Best,
Shammon FY


On Fri, May 12, 2023 at 2:58 AM Luke Xiong mailto:leix...@gmail.com>> wrote:

Hi,

My flink job needs to do something when the pipeline
execution has ended. The job code is like this:

createSomeStream().applySomeOperators();
env.execute(jobName);
doSomeCleanupTasks();

It looks like doSomeCleanupTasks() can be called while the
pipeline is still running. The job is for processing a
bounded stream, so it doesn't run forever. Is it possible to
achieve this so doSomeCleanupTasks is called only when the
pipeline has processed all the data? This happens when the
runtime mode is STREAMING. Would running it in BATCH mode
make any difference?

Regards,
Luke




Re: how to configure window of join operator in batch mode

2023-04-26 Thread Jiadong Lu

Hi Shanmmon,

Thank you for your quick response.

To give you some context, I am working on a project that involves 
joining two streams and performing some left/inner join operations based 
on certain keys. As for using batch mode, my intention is to have a 
unified approach that works for both stream and batch processing.


If I decide not to use Flink's join/coGroup API, I would need to 
implement a join operation manually by saving one stream's data and 
reading it from the other stream. This could potentially make the 
solution more complex, given the nature of this particular scenario.


Thank you for your time and I look forward to hearing from you soon.

Best,
Jiadong Lu

On 2023/4/26 18:13, Shammon FY wrote:

Hi Jiadong

Using the process time window in Batch jobs may be a little strange for 
me. I prefer to partition the data according to the day level, and then 
the Batch job reads data from different partitions instead of using Window.


Best,
Shammon FY

On Wed, Apr 26, 2023 at 12:03 PM Jiadong Lu <mailto:archzi...@gmail.com>> wrote:


Hi, Shammon,
Thank you for your reply.

Yes, the window configured with `Time.days(1)` has no special meaning,
it is just used to group all data into the same global window.
I tried using `GlobalWindow` for this scenario, but `GlobalWindow` also
need a `Trigger` like
`org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger`
to tigger all data for window process.

So I think `ProcessingTimeWindow` with `Time.days(10)` may be a good
solution for this scenario. What do you think?

As for what you mentioned
  > use join directly
I have no idea about using join without window. Would you mind
writing a
demo about it?

Your help is greatly appreciated in advance.

    Best,
    Jiadong Lu

On 2023/4/26 09:53, Shammon FY wrote:
 > Hi Jiadong,
 >
 > I think it depends on the specific role of the window here for
you. If
 > this window has no specific business meaning and is only used for
 > performance optimization, maybe you can consider to use join directly
 >
 > Best,
 > Shammon FY
 >
 > On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu mailto:archzi...@gmail.com>
 > <mailto:archzi...@gmail.com <mailto:archzi...@gmail.com>>> wrote:
 >
 >     Hello,everyone,
 >
 >     I am confused about the window of join/coGroup operator in
Batch mode.
 >     Here is my demo code, and it works fine for me at present. I
wonder if
 >     this approach that using process time window in batch mode is
 >     appropriate? and does this approach have any problems? I want
to use
 >     this solution to solve my problem(join two stream in batch mode).
 >
 >     ```java
 >     public static void main(String[] args) throws Exception {
 >
 >               StreamExecutionEnvironment env =
 >     StreamExecutionEnvironment.getExecutionEnvironment();
 >
 >               DataStream s1 =
env.fromCollection(Stream.of(1,
 >     2, 3,
 >     4, 5, 6, 7).collect(Collectors.toList()));
 >               DataStream s2 =
env.fromCollection(Stream.of(6,
 >     5, 4,
 >     3, 2, 1).collect(Collectors.toList()));
 >
 >               s1.coGroup(s2)
 >                       .where(new KeySelector() {
 >                           @Override
 >                           public Integer getKey(Integer value) throws
 >     Exception {
 >                               return value;
 >                           }
 >                       })
 >                       .equalTo(new KeySelector() {
 >                           @Override
 >                           public Integer getKey(Integer value) throws
 >     Exception {
 >                               return value;
 >                           }
 >
 >       }).window(TumblingProcessingTimeWindows.of(Time.days(1)))
 >                       .apply(new CoGroupFunction     Tuple2>() {
 >                           @Override
 >                           public void coGroup(Iterable
first,
 >     Iterable second, Collector>
out)
 >     throws Exception {
 >                               if (!second.iterator().hasNext()) {
 >                                   for (Integer integer : first) {
 >                                       out.collect(new
Tuple2<>(integer,
 >     null));
 >                                   }
 >                               } else {
 >                               

Re: how to configure window of join operator in batch mode

2023-04-25 Thread Jiadong Lu

Hi, Shammon,
Thank you for your reply.

Yes, the window configured with `Time.days(1)` has no special meaning,
it is just used to group all data into the same global window.
I tried using `GlobalWindow` for this scenario, but `GlobalWindow` also 
need a `Trigger` like 
`org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger` 
to tigger all data for window process.


So I think `ProcessingTimeWindow` with `Time.days(10)` may be a good 
solution for this scenario. What do you think?


As for what you mentioned
> use join directly
I have no idea about using join without window. Would you mind writing a 
demo about it?


Your help is greatly appreciated in advance.

Best,
Jiadong Lu

On 2023/4/26 09:53, Shammon FY wrote:

Hi Jiadong,

I think it depends on the specific role of the window here for you. If 
this window has no specific business meaning and is only used for 
performance optimization, maybe you can consider to use join directly


Best,
Shammon FY

On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu <mailto:archzi...@gmail.com>> wrote:


Hello,everyone,

I am confused about the window of join/coGroup operator in Batch mode.
Here is my demo code, and it works fine for me at present. I wonder if
this approach that using process time window in batch mode is
appropriate? and does this approach have any problems? I want to use
this solution to solve my problem(join two stream in batch mode).

```java
public static void main(String[] args) throws Exception {

          StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

          DataStream s1 = env.fromCollection(Stream.of(1,
2, 3,
4, 5, 6, 7).collect(Collectors.toList()));
          DataStream s2 = env.fromCollection(Stream.of(6,
5, 4,
3, 2, 1).collect(Collectors.toList()));

          s1.coGroup(s2)
                  .where(new KeySelector() {
                      @Override
                      public Integer getKey(Integer value) throws
Exception {
                          return value;
                      }
                  })
                  .equalTo(new KeySelector() {
                      @Override
                      public Integer getKey(Integer value) throws
Exception {
                          return value;
                      }

  }).window(TumblingProcessingTimeWindows.of(Time.days(1)))

                  .apply(new CoGroupFunction>() {
                      @Override
                      public void coGroup(Iterable first,
Iterable second, Collector> out)
throws Exception {
                          if (!second.iterator().hasNext()) {
                              for (Integer integer : first) {
                                  out.collect(new Tuple2<>(integer,
null));
                              }
                          } else {
                              for (Integer integer : first) {
                                  for (Integer integer1 : second) {
                                      out.collect(new Tuple2<>(integer,
integer1));
                                  }
                              }
                          }
                      }
                  }).printToErr();
          env.setParallelism(1);
          env.setRuntimeMode(RuntimeExecutionMode.BATCH);
          env.execute();
      }
```

Thanks in advance.

-- 
Jiadong Lu




how to configure window of join operator in batch mode

2023-04-25 Thread Jiadong Lu

Hello,everyone,

I am confused about the window of join/coGroup operator in Batch mode.
Here is my demo code, and it works fine for me at present. I wonder if 
this approach that using process time window in batch mode is 
appropriate? and does this approach have any problems? I want to use 
this solution to solve my problem(join two stream in batch mode).


```java
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


DataStream s1 = env.fromCollection(Stream.of(1, 2, 3, 
4, 5, 6, 7).collect(Collectors.toList()));
DataStream s2 = env.fromCollection(Stream.of(6, 5, 4, 
3, 2, 1).collect(Collectors.toList()));


s1.coGroup(s2)
.where(new KeySelector() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
})
.equalTo(new KeySelector() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
}).window(TumblingProcessingTimeWindows.of(Time.days(1)))
.apply(new CoGroupFunctionTuple2>() {

@Override
public void coGroup(Iterable first, 
Iterable second, Collector> out) 
throws Exception {

if (!second.iterator().hasNext()) {
for (Integer integer : first) {
out.collect(new Tuple2<>(integer, null));
}
} else {
for (Integer integer : first) {
for (Integer integer1 : second) {
out.collect(new Tuple2<>(integer, 
integer1));

}
}
}
}
}).printToErr();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.execute();
}
```

Thanks in advance.

--
Jiadong Lu


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-29 Thread Jiadong Lu

Congratulations !!!

Best,
Jiadong Lu

On 2023/3/27 17:23, Yu Li wrote:

Dear Flinkers,


As you may have noticed, we are pleased to announce that Flink Table Store has 
joined the Apache Incubator as a separate project called Apache 
Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
streaming data lake platform for high-speed data ingestion, change data 
tracking and efficient real-time analytics, with the vision of supporting a 
larger ecosystem and establishing a vibrant and neutral open source community.


We would like to thank everyone for their great support and efforts for the 
Flink Table Store project, and warmly welcome everyone to join the development 
and activities of the new project. Apache Flink will continue to be one of the 
first-class citizens supported by Paimon, and we believe that the Flink and 
Paimon communities will maintain close cooperation.


亲爱的Flinkers,


正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 
Apache 孵化器独立孵化 [1] [2] [3]。新项目的名字是 
Apache Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订 
阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态, 
并建立一个充满活力和中立的开源社区。



在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢 
迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon 支持的 
主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。



Best Regards,

Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)


致礼,

李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)


[1] https://paimon.apache.org/ <https://paimon.apache.org/>

[2] https://github.com/apache/incubator-paimon 
<https://github.com/apache/incubator-paimon>


[3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal 
<https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal>