Re: [External] : Re: How to get flink to use POJO serializer when enum is present in POJO class

2022-05-12 Thread Fuyao Li
Hi Tejas,

Yes, you can write a typefactory for enum. But I am assuming Flink should be 
able to recognize enum by default…

Anyways, you can do something like this:

Types.ENUM(RuleType.class);

This will return you a TypeInfomation which can be used to construct a 
typefactory..

BTW, could you take a look at my question in email: “How to define 
TypeInformation for Flink recursive resolved POJO”  ?

Thanks,
Fuyao




From: Tejas B 
Date: Thursday, May 12, 2022 at 16:32
To: Weihua Hu 
Cc: user 
Subject: [External] : Re: How to get flink to use POJO serializer when enum is 
present in POJO class
Hi Weihua,
This is the error I am getting :
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for 
CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any 
of the 1 provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
 ... 11 more Caused by: 
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to 
restore operator state backend at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
 at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find 
class: 11 at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
 at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
 at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

From the error it looks like it's falling back to Kryo serializer instead of 
POJO serializer.

Thanks,
Tejas


On Thu, May 12, 2022 at 7:33 AM Weihua Hu 
mailto:huweihua@gmail.com>> wrote:
Hi, Tejas

These code is works in my idea environment.
Could you provide more error info or log?


Best,
Weihua


2022年5月10日 下午1:22,Tejas B mailto:tejasub1...@gmail.com>> 
写道:

Hi,
I am trying to get flink schema evolution to work for me using POJO serializer. 
But I found out that if an enum is present in the POJO then the POJO serializer 
is not used. Example of my POJO is as follows :

public class Rule {

String id;

int val;

RuleType ruleType;

//Newly added field

//int val2 = 0;



public Rule() {}



public Rule(String id, int val, RuleType ruleType) {

this.id = id;

this.val = val;

this.ruleType = ruleType;

//this.val2 = val2;

}



public String getId() {

return id;

}



public void setId(String id) {

this.id = id;

}



public int getVal() {

return val;

}



public void setVal(int val) {

this.val = val;

}



public RuleType getRuleType() {

return ruleType;

}



public void setRuleType(RuleType ruleType) {

this.ruleType = ruleType;

}



//public int getVal2() {

//return val2;

//}



//public void setVal2(int val2) {

//this.val2 = val2;

//}



@Override

public boolean equals(Object o) {

if (this == o) return true;

if (o == null || getClass() != o.getClass()) return false;

Rule rule = (Rule) o;

return val == rule.val && 
id.equals(rule.id)
 && ruleType == rule.ruleType;

}



@Override

public int hashCode() {

return Objects.hash(id, val, ruleType);

}



@Override

public String toString() {

return "Rule{" +

"name='" + id + '\'' +

", val=" + val +

", ruleType=" + ruleType +

'}';

}

}

RuleType is an enum class as follows :

public enum RuleType {
X,
Y,
Z

}

Now for the Rule class the schema evolution (Adding a new field called val2), 
works only if I write a custom typeFactory for this class.

Is there a way that I can write typeFactory for the enum class ? Why does the 
flink not recognize enum in a POJO class ?



Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO

2022-05-12 Thread Fuyao Li
Updated the FieldDefinition class inline to avoid confusion. I am just listing 
a few fields in the class (not all). It is all following suggested POJO 
approach.

From: Fuyao Li 
Date: Thursday, May 12, 2022 at 09:46
To: Weihua Hu 
Cc: User 
Subject: Re: [External] : Re: How to define TypeInformation for Flink recursive 
resolved POJO
Hi Weihua,

I am following all the standards mentioned here. The code structure is listed 
in the previous email.

@Data
Class Metadata {
   @TypeInfo(StringFieldDefinitionMapTypeInfoFactory.class)
Map fields;
@TypeInfo(StringSetTypeInfoFactory.class)
private Set validColumns = new HashSet<>();
}

@Data
Class FieldDefinition {
   private Metadata parentMetadata; // causing recusive resolving when type 
info is added
}

public class StringFieldDefinitionMapTypeInfoFactory extends 
TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type 
type, Map> map) {
return new MapTypeInfo(TypeInformation.of(String.class),TypeInformation.of(FieldDefinition.class));
}
}

public class StringSetTypeInfoFactory extends TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type type, Map> map) {
return TypeInformation.of(new TypeHint>() {});
}
}



I am using @Data in @Lombok to expose the getters and setters. For List<>, 
Map<>, Set<>, due to the type erase behavior. I have to provide the type 
information through type 
factory<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*defining-type-information-using-a-factory__;Iw!!ACWV5N9M2RV99hQ!LCjf2FOQwsLmZ9_DZ_ioN-csa1vlsKaVaDrr9MXfvyMxaotX-_wb9cIjGXvwLRrMEE3FHOmWy9wgTHwi$>
 and type hints.

I have two problems:

  1.  the main blocker here is that it has recursive logic mentioned below.
  2.  Set type hint doesn’t work well.

How could I define the TypeInformation for Set, it seems not supported in java 
type.

I provided something like this and then annotated thIS field in the POJO.

public class StringSetTypeInfoFactory extends TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type type, Map> map) {
return TypeInformation.of(new TypeHint>() {});
}
}

But I still get the following errors.
Generic types have been disabled in the ExecutionConfig and type java.util.Set 
is treated as a generic type.


Thanks,
Fuyao


From: Weihua Hu 
Date: Thursday, May 12, 2022 at 07:24
To: Fuyao Li 
Cc: user 
Subject: [External] : Re: How to define TypeInformation for Flink recursive 
resolved POJO
Hi, Fuyao

How did you define these classes?  There is some requirements for POJO as flink 
docs[1] said:

  *   The class must be public.
  *   It must have a public constructor without arguments (default constructor).
  *   All fields are either public or must be accessible through getter and 
setter functions. For a field called foo the getter and setter methods must be 
named getFoo() and setFoo().
  *   The type of a field must be supported by a registered serializer.

[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*pojos__;Iw!!ACWV5N9M2RV99hQ!PdVwjLMJx2-0lTqUPEqHsfgBJE4bUJE2NCqYoBQfpxWMkIIk3mmVJWPhqgeHbj7ramdK5s05pknkWT94hjPBiA$>



Best,
Weihua



2022年5月12日 上午3:03,Fuyao Li mailto:fuyao...@oracle.com>> 写道:

Hi Community,

I have a POJO that has nested recursively resolved structure. How should I 
define the @TypeInfo annotation correctly to avoid stack overflow exception 
when starting the application.

Basically,
Class Metadata
Map fields

Class FieldDefinition
Metadata parentMetadata

The Metadata class got resolved recursively and causing stack overflow. I had 
to design this way since the metadata structure model looks like this.

Is there any ways to fix this issue? Or I must treat this as a Generic type and 
don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to 
Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); 
statement, there won’t be any problem during program startup.

Thanks,
Fuyao



Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO

2022-05-12 Thread Fuyao Li
Hi Weihua,

I am following all the standards mentioned here. The code structure is listed 
in the previous email.

@Data
Class Metadata {
   @TypeInfo(StringFieldDefinitionMapTypeInfoFactory.class)
Map fields;
@TypeInfo(StringSetTypeInfoFactory.class)
private Set validColumns = new HashSet<>();
}

Class FieldDefinition {
   Metadata parentMetadata; // causing recusive resolving when type info is 
added
}

public class StringFieldDefinitionMapTypeInfoFactory extends 
TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type 
type, Map> map) {
return new MapTypeInfo(TypeInformation.of(String.class),TypeInformation.of(FieldDefinition.class));
}
}

public class StringSetTypeInfoFactory extends TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type type, Map> map) {
return TypeInformation.of(new TypeHint>() {});
}
}



I am using @Data in @Lombok to expose the getters and setters. For List<>, 
Map<>, Set<>, due to the type erase behavior. I have to provide the type 
information through type 
factory<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory>
 and type hints.

I have two problems:

  1.  the main blocker here is that it has recursive logic mentioned below.
  2.  Set type hint doesn’t work well.

How could I define the TypeInformation for Set, it seems not supported in java 
type.

I provided something like this and then annotated thIS field in the POJO.

public class StringSetTypeInfoFactory extends TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type type, Map> map) {
return TypeInformation.of(new TypeHint>() {});
}
}

But I still get the following errors.
Generic types have been disabled in the ExecutionConfig and type java.util.Set 
is treated as a generic type.


Thanks,
Fuyao


From: Weihua Hu 
Date: Thursday, May 12, 2022 at 07:24
To: Fuyao Li 
Cc: user 
Subject: [External] : Re: How to define TypeInformation for Flink recursive 
resolved POJO
Hi, Fuyao

How did you define these classes?  There is some requirements for POJO as flink 
docs[1] said:

  *   The class must be public.
  *   It must have a public constructor without arguments (default constructor).
  *   All fields are either public or must be accessible through getter and 
setter functions. For a field called foo the getter and setter methods must be 
named getFoo() and setFoo().
  *   The type of a field must be supported by a registered serializer.

[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*pojos__;Iw!!ACWV5N9M2RV99hQ!PdVwjLMJx2-0lTqUPEqHsfgBJE4bUJE2NCqYoBQfpxWMkIIk3mmVJWPhqgeHbj7ramdK5s05pknkWT94hjPBiA$>



Best,
Weihua


2022年5月12日 上午3:03,Fuyao Li mailto:fuyao...@oracle.com>> 写道:

Hi Community,

I have a POJO that has nested recursively resolved structure. How should I 
define the @TypeInfo annotation correctly to avoid stack overflow exception 
when starting the application.

Basically,
Class Metadata
Map fields

Class FieldDefinition
Metadata parentMetadata

The Metadata class got resolved recursively and causing stack overflow. I had 
to design this way since the metadata structure model looks like this.

Is there any ways to fix this issue? Or I must treat this as a Generic type and 
don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to 
Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); 
statement, there won’t be any problem during program startup.

Thanks,
Fuyao



How to define TypeInformation for Flink recursive resolved POJO

2022-05-11 Thread Fuyao Li
Hi Community,

I have a POJO that has nested recursively resolved structure. How should I 
define the @TypeInfo annotation correctly to avoid stack overflow exception 
when starting the application.

Basically,
Class Metadata
Map fields

Class FieldDefinition
Metadata parentMetadata

The Metadata class got resolved recursively and causing stack overflow. I had 
to design this way since the metadata structure model looks like this.

Is there any ways to fix this issue? Or I must treat this as a Generic type and 
don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to 
Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); 
statement, there won’t be any problem during program startup.

Thanks,
Fuyao




Re: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-18 Thread Fuyao Li
Hi Yun,

Thanks for the reply! This is very helpful.
For the Sink interface, I checked 
ReducingUpsertSink<https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java>
 implementation. If I use the new Sink interface, for these 
committable/transaction related interfaces and classes, I think I can just 
create some dummy class like this one as a place holder and leave it empty like 
this example, this should effectively achieve AT_LEAST_ONCE, right? Thanks.

Best regards,
Fuyao


From: Yun Gao 
Date: Wednesday, February 16, 2022 at 00:54
To: Fuyao Li , user 
Subject: Re: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Very sorry for the late reply.

For the question 1, I think it would not cause data corruption: in Flink the 
checkpoint is achived via
inserting barriers into the stream of normal records, and the snapshot is taken 
in the same thread with
the record processing. Thus the snapshot of the operators would always at the 
boundary of the records.

For the question 3, if the outside system does not support transaction, there 
are might two other ways to
implement the exactly-once semantics:

1. If the record always has a key and the external systems support 
deduplication, then it might be possible to
use AT_LEAST_ONCE sinks and let the external system to deduplicate the records.
2. Another possible method to reduce the requirements on the external systems 
is to use WAL sinks: the record might
be first written into some external systems (like file system) as a kind of 
logs. Once a checkpoint succeed, we could
then write the records before this checkpoint into the external systems. It 
needs note that writting these records into the
external systems must also be retriable: the Flink jobs might still fail during 
writting and after restarted, the writting should
restarted exactly from the next record. This required the external system have 
some method to query the offset of the currently
written records.


For AT_LEASE_ONCE sink 
RichSinkFunction<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java__;!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XBWt9Bsk$>
 should also works, but if possible we still recommend to use the new sink API~


Best,
Yun


--Original Mail --
Sender:Fuyao Li 
Send Date:Tue Feb 15 08:26:32 2022
Recipients:Yun Gao , user 
Subject:Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Yun,

Please ignore my question 2. I think the Sink part is the decisive factor to 
ensure end to end exactly once.

If I want to implement a AT LEAST ONCE sink, which interface should I 
implement? 
MaybeRichSinkFunction<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java__;!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XBWt9Bsk$>
 should be enough? Any suggestions on this?

For question 3, maybe I can add some deduplication code at consumer side to 
uptake the AT_LEASE_ONCE sink produced messages. If OSS doesn’t support exactly 
once semantic, it seems impossible for me to handle it at Flink code side.

Thanks,
Fuyao

From:Fuyao Li 
Date: Thursday, February 10, 2022 at 15:48
To: Yun Gao , user 
Subject: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t 
support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest 
to add some deduplicate mechanisms at Sink to mitigate the issue.

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this 
checkpoint offset. The messages has been processed after the checkpoint before 
the failure will be processed twice here after the restart. Is there any chance 
of data corruption here, for example, breaking the window and sending out 
incomplete records? I am using some session windows based on DataStream event 
time timers.

Question 2:
For the 
KafkaSource<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-source__;Iw!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XDo8FpLM$>,
 I noticed that we don’t have a place to configure the semantic? Maybe enabling 
the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly onc

Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-14 Thread Fuyao Li
Hi Yun,

Please ignore my question 2. I think the Sink part is the decisive factor to 
ensure end to end exactly once.

If I want to implement a AT LEAST ONCE sink, which interface should I 
implement? Maybe 
RichSinkFunction<https://github.com/apache/flink/blob/release-1.14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java>
 should be enough? Any suggestions on this?

For question 3, maybe I can add some deduplication code at consumer side to 
uptake the AT_LEASE_ONCE sink produced messages. If OSS doesn’t support exactly 
once semantic, it seems impossible for me to handle it at Flink code side.

Thanks,
Fuyao

From: Fuyao Li 
Date: Thursday, February 10, 2022 at 15:48
To: Yun Gao , user 
Subject: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t 
support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest 
to add some deduplicate mechanisms at Sink to mitigate the issue.

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this 
checkpoint offset. The messages has been processed after the checkpoint before 
the failure will be processed twice here after the restart. Is there any chance 
of data corruption here, for example, breaking the window and sending out 
incomplete records? I am using some session windows based on DataStream event 
time timers.

Question 2:
For the 
KafkaSource<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-source>,
 I noticed that we don’t have a place to configure the semantic? Maybe enabling 
the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once 
semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is 
exactly once, right? Since OSS has such limitation, is it possible to achieve 
effective EXACTLY_ONCE semantic through additional logic at Flink side since I 
can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement the 
Sink<https://github.com/apache/flink/blob/release-1.14/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java>
 you mentioned.

Thank you very much for the help!
Fuyao


From: Yun Gao 
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li , user 
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job 
restarted
these transaction should be able to be committed again.

If the external system meet such conditions, to implement an exactly-once sink,
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before 
releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. 
It is much
similar to the option b) and are supported since 1.13. It would still be 
supported in the
next several releases and  it also be able to be migrated to the option b) 
easily.

Best,
Yun


--Original Mail --
Sender:Fuyao Li 
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user 
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink 
to achieve EXACTLY_ONCE sink?
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle 
Streaming Service --- very similar to Kafka).  This will be my Flink 
application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE 
semantic? HTTP is stateless here… If possible, what could be added in SDK to 
support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink 
for this. Which option should I use?

 *   Option 
1:TwoPhaseCommitSinkFunction<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjIAX36vY$>
 *   Option 
2:StatefulSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/f

Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-10 Thread Fuyao Li
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t 
support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest 
to add some deduplicate mechanisms at Sink to mitigate the issue.

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this 
checkpoint offset. The messages has been processed after the checkpoint before 
the failure will be processed twice here after the restart. Is there any chance 
of data corruption here, for example, breaking the window and sending out 
incomplete records? I am using some session windows based on DataStream event 
time timers.

Question 2:
For the 
KafkaSource<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-source>,
 I noticed that we don’t have a place to configure the semantic? Maybe enabling 
the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once 
semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is 
exactly once, right? Since OSS has such limitation, is it possible to achieve 
effective EXACTLY_ONCE semantic through additional logic at Flink side since I 
can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement the 
Sink<https://github.com/apache/flink/blob/release-1.14/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java>
 you mentioned.

Thank you very much for the help!
Fuyao


From: Yun Gao 
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li , user 
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job 
restarted
these transaction should be able to be committed again.

If the external system meet such conditions, to implement an exactly-once sink,
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before 
releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. 
It is much
similar to the option b) and are supported since 1.13. It would still be 
supported in the
next several releases and  it also be able to be migrated to the option b) 
easily.

Best,
Yun


--Original Mail --
Sender:Fuyao Li 
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user 
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink 
to achieve EXACTLY_ONCE sink?
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle 
Streaming Service --- very similar to Kafka).  This will be my Flink 
application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE 
semantic? HTTP is stateless here… If possible, what could be added in SDK to 
support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink 
for this. Which option should I use?

 *   Option 
1:TwoPhaseCommitSinkFunction<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjIAX36vY$>
 *   Option 
2:StatefulSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj5cKfxA0$>
 + 
TwoPhaseCommittingSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjA088CIg$>

The legacy FlinkKafkaProducer seems to be using option (a)  This will be 
removed from Flink in the future. The 
newKafkaSink<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-sink__;Iw!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj02BG7zk$>
 seems to be using option (b). Based on the comment in the code, it seems 
option (a) is

Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-09 Thread Fuyao Li
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle 
Streaming Service --- very similar to Kafka).  This will be my Flink 
application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE 
semantic? HTTP is stateless here… If possible, what could be added in SDK to 
support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink 
for this. Which option should I use?
 *   Option 1: 
TwoPhaseCommitSinkFunction
 *   Option 2: 
StatefulSink
 + 
TwoPhaseCommittingSink

The legacy FlinkKafkaProducer seems to be using option (a)  This will be 
removed from Flink in the future. The new 
KafkaSink
 seems to be using option (b). Based on the comment in the code, it seems 
option (a) is recommended, which one should I use? Please suggest if I am 
missing anything, or any other better solutions in my case?


Thanks,
Fuyao







Re: Possibility of supporting Reactive mode for native Kubernetes application mode

2021-11-02 Thread Fuyao Li
I tried to remove the Re: [EXTERNAL] label just now, but it seems to break the 
thread on my side. I guess I can’t do much on my side regarding this. The email 
is forced to add such tag at the company level.

Anyways, l guess we can continue to discuss the issue in this thread.

Thanks,
Fuyao

From: Fuyao Li 
Date: Tuesday, November 2, 2021 at 14:14
To: David Morávek , nicolaus.weid...@ververica.com 

Cc: user , Yang Wang , Robert 
Metzger , tonysong...@gmail.com , 
Sandeep Sooryaprakash 
Subject: [External] : Re: Possibility of supporting Reactive mode for native 
Kubernetes application mode
Hi David, Nicolaus,

Thanks for the reply.


  1.  For your first question, Yes. I want to use the checkpoint to stop and 
restart the application. I think this is similar to the Reactive mode strategy, 
right? (I don’t know the exact implementation behind the Reactive mode). From 
your description and Nicolaus reply, I guess this improvement for checkpoint 
will benefit both Reactive mode and this workflow I designed instead of 
breaking this proposal, right?

  *   For Nicolaus, after such change in 1.15, do you mean the checkpoint can’t 
be used to restart a job? If this is the case, maybe my proposal will not work 
after 1.15…
Please share the Jira link to this design if possible and correct my statement 
if I am wrong.

Nicolaus’s suggestion of leveraging retained checkpoint is exactly what I am 
trying to describe in my 3-step solution.

Quote from Nicolaus:
“
About your second question: You are right that taking and restoring from 
savepoints will incur a performance loss. They cannot be incremental, and 
cannot use native (low-level) data formats - for now. These issues are on the 
list of things to improve for Flink 1.15, so if the changes make it into the 
release, it may improve a lot.
You can restore a job from a retained checkpoint (provided you configured 
retained checkpoints, else they are deleted on job cancellation), see [1] 
(right below the part you linked). It should be possible to rescale using a 
retained checkpoint, despite the docs suggesting otherwise (it was uncertain 
whether this guarantee should/can be given, so it was not stated in the docs. 
This is also expected to change in the future as it is a necessity for further 
reactive mode development).

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/*resuming-from-a-retained-checkpoint__;Iw!!ACWV5N9M2RV99hQ!YHBsB0To67UNyuCoQmiojHORIU7clRLR8Gxw_wzaSKWbKzb9q5O3-41GnkmAygk$>

”


  1.  For using Standalone Kubernetes problem. I have development a Flink 
native Kubernetes operator upon 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVjPq5d6w$>
 . Right now, this operator can basically achieve everything Flink CLI could do 
for session mode and application mode and more. This includes features like 
rescaling with savepoint (stop with savepoint and start from savepoint), stop 
with savepoint, submit/stop/cancel session jobs etc. All of these are automated 
through a unified Kubernetes CRD. For sake of time, I don’t want to write 
another operator for standalone k8s operator… As a result, I am seeking to add 
the reactive scaling function into this operator. Nevertheless, I really 
appreciate the work for reactive mode in standalone Kubernetes.

Based on Nicolaus’s reply. I think if we configure the retain checkpoint 
policy. By default, I think only one checkpoint will be retained (please 
correct me if I am wrong) and we can capture the directory and rescale the 
application.
See 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/*retained-checkpoints__;Iw!!ACWV5N9M2RV99hQ!YHBsB0To67UNyuCoQmiojHORIU7clRLR8Gxw_wzaSKWbKzb9q5O3-41G4U5UMDo$>
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/*execution-checkpointing-externalized-checkpoint-retention__;Iw!!ACWV5N9M2RV99hQ!YHBsB0To67UNyuCoQmiojHORIU7clRLR8Gxw_wzaSKWbKzb9q5O3-41GT1eEC-s$>
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/*resuming-from-a-retained-checkpoint__;Iw!!ACWV5N9M2RV99hQ!YHBsB0To67UNyuCoQmiojHORIU7clRLR8Gxw_wzaSKWbKzb9q5O3-41GnkmAygk$>



  1.  I removed the [EXTERNAL] tag i

Re: Possibility of supporting Reactive mode for native Kubernetes application mode

2021-11-02 Thread Fuyao Li
Hi David, Nicolaus,

Thanks for the reply.


  1.  For your first question, Yes. I want to use the checkpoint to stop and 
restart the application. I think this is similar to the Reactive mode strategy, 
right? (I don’t know the exact implementation behind the Reactive mode). From 
your description and Nicolaus reply, I guess this improvement for checkpoint 
will benefit both Reactive mode and this workflow I designed instead of 
breaking this proposal, right?

  *   For Nicolaus, after such change in 1.15, do you mean the checkpoint can’t 
be used to restart a job? If this is the case, maybe my proposal will not work 
after 1.15…
Please share the Jira link to this design if possible and correct my statement 
if I am wrong.

Nicolaus’s suggestion of leveraging retained checkpoint is exactly what I am 
trying to describe in my 3-step solution.

Quote from Nicolaus:
“
About your second question: You are right that taking and restoring from 
savepoints will incur a performance loss. They cannot be incremental, and 
cannot use native (low-level) data formats - for now. These issues are on the 
list of things to improve for Flink 1.15, so if the changes make it into the 
release, it may improve a lot.
You can restore a job from a retained checkpoint (provided you configured 
retained checkpoints, else they are deleted on job cancellation), see [1] 
(right below the part you linked). It should be possible to rescale using a 
retained checkpoint, despite the docs suggesting otherwise (it was uncertain 
whether this guarantee should/can be given, so it was not stated in the docs. 
This is also expected to change in the future as it is a necessity for further 
reactive mode development).

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint

”


  1.  For using Standalone Kubernetes problem. I have development a Flink 
native Kubernetes operator upon 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVjPq5d6w$>
 . Right now, this operator can basically achieve everything Flink CLI could do 
for session mode and application mode and more. This includes features like 
rescaling with savepoint (stop with savepoint and start from savepoint), stop 
with savepoint, submit/stop/cancel session jobs etc. All of these are automated 
through a unified Kubernetes CRD. For sake of time, I don’t want to write 
another operator for standalone k8s operator… As a result, I am seeking to add 
the reactive scaling function into this operator. Nevertheless, I really 
appreciate the work for reactive mode in standalone Kubernetes.

Based on Nicolaus’s reply. I think if we configure the retain checkpoint 
policy. By default, I think only one checkpoint will be retained (please 
correct me if I am wrong) and we can capture the directory and rescale the 
application.
See 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint



  1.  I removed the [EXTERNAL] tag in the email. This is something 
automatically added by the company’s email box. Sorry for the confusion.


Best Regards,
Fuyao

From: David Morávek 
Date: Tuesday, November 2, 2021 at 05:53
To: Fuyao Li 
Cc: user , Yang Wang , Robert 
Metzger , tonysong...@gmail.com , 
Sandeep Sooryaprakash 
Subject: Re: [External] : Re: Possibility of supporting Reactive mode for 
native Kubernetes application mode
Similar to Reactive mode, checkpoint must be enabled to support such 
functionality. ...

Wouldn't that mean tearing down the whole Flink cluster in order to re-scale? 
That could be quite costly. We're aiming to speed-up the recovery process for 
the reactive mode and this would most likely block you from leveraging these 
efforts.

Maybe let's take a step back, is there anything keeping you from using 
standalone deployment here? If you already have a k8s operator, you could 
simply spawn a new JM / TM deployment without delegating this responsibility to 
Flink. Then the autoscaling could be as simple as creating a custom k8s metrics 
server [1] (or simply use the prometheus based one) and setting up the HPA for 
task managers.

Also I think it will be tricky to simply stop a job and retrieve the latest 
retained checkpoint in your operator. We'll try to make this easier to achieve 
in 1.15 release, but AFAIK there is currently no easy way to do so.

PS: Can you please do something about the "[EXTERNAL]" labels in the email 
subject? It breaks email threading and it makes it harder to keep track of the 
on-going conversati

Re: [External] : Re: Possibility of supporting Reactive mode for native Kubernetes application mode

2021-11-01 Thread Fuyao Li
Hello David,

Thanks for the detailed explanation. This is really helpful!

I also had an offline discussion with Yang Wang. He also told me this could be 
done in the future, but not part of the recent plan.

As suggested, I think I can do the follow things to achieve some auto-scaling 
component in Flink operator.


  1.  Scrape metrics from Kubernetes (POD CPU usage/Memory usage/…) and Flink 
metrics system (For example, leveraging Prometheus metric exporter[1] in Flink)
  2.  With metrics in step(1), implement logic in operator to support 
customized scaling policy based on user definition and requirements. All these 
things should be configurable by FlinkApplication CRD.
  3.  Similar to Reactive mode, checkpoint must be enabled to support such 
functionality. When a certain criteria is meet, Flink operator will 
automatically triggered a stop command without manual intervention (checkpoint 
is retained) and bring up the Flink application from the last checkpoint with 
the new configurations it calculates based on policy. I believe this is similar 
to failover process, if the application guarantees EXACTLY_ONCE semantic [2], I 
think the restarted application should still follow the exactly once semantic.

In general, I think it should work. If you think there is any issues with this 
proposal, please suggest in this thread. Thanks!

Reference:
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing

Best,
Fuyao



From: David Morávek 
Date: Friday, October 29, 2021 at 23:11
To: Fuyao Li 
Cc: user , Yang Wang , Robert 
Metzger , tonysong...@gmail.com 
Subject: [External] : Re: Possibility of supporting Reactive mode for native 
Kubernetes application mode
Hi Fuyao,

this is a great question ;)

1) First let's be clear on what the reactive mode actually is.

Reactive Mode is related to how the Flink makes use of the newly available 
resources. It greedily uses all of the resources that are available in your 
Flink cluster (if new task manager joins in, it re-scales).

You as a Flink operator are responsible for adding / removing the task 
managers. This can be done manually or by some kind of horizontal auto-scaler 
(from what I understand this is what you want to achieve).

2) K8s native integration leverages an active resource management.

The standalone deployment mode uses the passive resource management, which 
means that YOU are responsible for the resource allocation (manually starting 
up processes). K8s native integration on the other hand allocates resources for 
you based on the static configuration it needs.

Right now there is no mechanism that would be able to make a decision to add / 
remove task managers (eg. based on some metric). -> This is most likely the 
missing part.

3) Summing up the above...

In theory "reactive mode" could be implemented for the application mode with 
active resource management (k8s native / yarn), but this would either require 
Flink to provide a generic auto-scaler component that would be embedded within 
the JobMaster or letting user to provide his own. This would require some 
design discussion as the ideas we have about this are still on the "theoretical 
level".

Even though this may be a future development, there are currently no plans on 
doing that. Main focus in this area is now on making the reactive mode / 
adaptive scheduler production ready (user metrics, fixing the UI, faster TM 
loss / disconnect detection, local recovery) and speeding up the Flink recovery 
mechanism so the re-scaling experience is much smoother.

Best,
D.

On Wed, Oct 27, 2021 at 11:57 PM Fuyao Li 
mailto:fuyao...@oracle.com>> wrote:
Hello Community,

I am checking the reactive mode for Flink deployment. I noticed that this is 
supported in Kubernetes environment, but only for standalone Kubernetes as of 
now. I have read some previous discussion threads regarding this issue. See 
[1][2][3][4][5][6].

Question 1:
It seems that due to some interface and design considerations [4] mentioned by 
Robert and Xintong and official doc[5], this feature is only for standalone k8s 
and it is not available for native Kubernetes now. However, I believe in 
theory, it is possible to be added to native Kubernetes, right? Will this be 
part of the future plan? If not, what is the restriction and is it a hard 
restriction?

Question 2:
I have built an native Kubernetes operator on top of Yang’s work [7] supporting 
various state transfers in native k8s application mode and session mode. Right 
now, I am seeking for adding some similar features like reactive scaling for 
native k8s. From my perspective, what I can do is to enable periodic savepoints 
and scale up/down based certain metrics we collect inside the Flink 
application. Some additional resource considerations n

Possibility of supporting Reactive mode for native Kubernetes application mode

2021-10-27 Thread Fuyao Li
Hello Community,

I am checking the reactive mode for Flink deployment. I noticed that this is 
supported in Kubernetes environment, but only for standalone Kubernetes as of 
now. I have read some previous discussion threads regarding this issue. See 
[1][2][3][4][5][6].

Question 1:
It seems that due to some interface and design considerations [4] mentioned by 
Robert and Xintong and official doc[5], this feature is only for standalone k8s 
and it is not available for native Kubernetes now. However, I believe in 
theory, it is possible to be added to native Kubernetes, right? Will this be 
part of the future plan? If not, what is the restriction and is it a hard 
restriction?

Question 2:
I have built an native Kubernetes operator on top of Yang’s work [7] supporting 
various state transfers in native k8s application mode and session mode. Right 
now, I am seeking for adding some similar features like reactive scaling for 
native k8s. From my perspective, what I can do is to enable periodic savepoints 
and scale up/down based certain metrics we collect inside the Flink 
application. Some additional resource considerations need to be added to 
implement such feature, similar to the adaptive scheduler concept in [9][10] (I 
didn’t dive deep into that, I guess I just need to calculated the new TMs will 
be offered with sufficient k8s resources if the rescale happens?)
I think as a user/operator, I am not supposed by to be able to 
recover/restarted a job from checkpoint [8].
I guess this might cause some performance loss since savepoints are more 
expensive and the Flink application must do both savepoint and checkpoint 
periodically… Is there any possible ways that user can also use checkpoints to 
restart and recover as a user? If Question 1 will be part of the future plan, I 
guess I won’t need much work here.

Reference:
[1] Reactive mode blog: https://flink.apache.org/2021/05/06/reactive-mode.html
[2] example usage of reactive scaling: 
https://github.com/rmetzger/flink-reactive-mode-k8s-demo
[3] FILP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
[4] Discussion thread: 
https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E
[5] Flink doc: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/
[6] Flink Jira: 
https://issues.apache.org/jira/browse/FLINK-10407\
[7] https://github.com/wangyang0918/flink-native-k8s-operator
[8] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#difference-to-savepoints
[9] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
[10] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler

Thanks,
Fuyao


Re: [External] : Re: How to enable customize logging library based on SLF4J for Flink deployment in Kubernetes

2021-10-25 Thread Fuyao Li
Thanks! I got your point. Will try it out.

From: Chesnay Schepler 
Date: Tuesday, October 19, 2021 at 01:44
To: Fuyao Li , user 
Cc: Rohit Gupta 
Subject: Re: [External] : Re: How to enable customize logging library based on 
SLF4J for Flink deployment in Kubernetes
1) Adding it as a dependency to the Flink application does not work with an 
actual Flink cluster, because said dependency must be available when the 
cluster is started. It works in the IDE because there everything is put onto 
the same classpath.

2) folder structure shouldn't be relevant. So long as it is in somewhere in the 
lib directory of the Flink distribution it should be picked up.

3) Put the jar into the lib directory, do not add the config map, start the 
cluster and check the logs. The classpath will be logged very early in the 
startup procedure.

On 18/10/2021 22:52, Fuyao Li wrote:
Hi Chesnay,
Thanks for the reply.

  1.  The internal logging framework is built upon slf4j/log4j2 (The same one 
Flink uses, but it comes with an additional POM dependency). I have added such 
dependency in the Flink application POM file. But it seems only to work locally 
in IDE. When it is in the Flink cluster environment, it can’t work.
  2.  I tried to only add the configmap and put a single jar into lib/ folder, 
and it seems it still can’t find the classpath. How should I organize the 
folder structure? /lib/internal-logging/xxx.jar or this jar file must be 
directly under /lib, something like /lib/xxx.jar?
  3.  I got you point, I guess it is stilling using Flink default logging 
classpath and that causes the issue of not recognizing the internal framework? 
How to check the classpath of the Flink logging? Could you share me some 
blogs..? I am not familiar with this.

Best,
Fuyao

From: Chesnay Schepler <mailto:ches...@apache.org>
Date: Tuesday, September 28, 2021 at 07:06
To: Fuyao Li <mailto:fuyao...@oracle.com>, user 
<mailto:user@flink.apache.org>
Cc: Rohit Gupta <mailto:rohit.c.gu...@oracle.com>
Subject: [External] : Re: How to enable customize logging library based on 
SLF4J for Flink deployment in Kubernetes
Could you clarify whether this internal framework uses a custom slfj4/log4j2 
version, or is it just using what Flink comes with?

Did you only add the configmap and put a single jar into lib, or did you make 
other changes in Flink?

Can you remove just the configmap, start the cluster, and provide us with the 
classpath that Flink is logging?


On 25/09/2021 01:57, Fuyao Li wrote:
Hi Flink Community,

I am trying enable a company internal logging framework built upon SLF4J and 
log4j. This logging framework has another separate jar and specific logging 
configurations. After debugging, I am able to make Flink application running 
correctly in the local IDE with the internal logging framework after adding 
related SLF4J, log4j dependencies, and logging framework dependencies.

However, I still run into errors when I deploy this into the Kubernetes 
environment. I tried to add the logging framework jar to /opt/flink/lib/ 
folder, but it doesn’t help much. I am not sure which part I am missing here. I 
have attached relevant information below. Thanks for your help.

This is the log4j2-console.properties I proposed, I have injected this as a 
configmap (mounted to /opt/flink/conf inside the pod using a Flink native 
Kubernetes Operator I build).
Such configuration will run correctly in Local IDE and generate logs in the 
internal logging framework expected shape. (I have rename it to 
log4j2.properties and put it into resources/ folder during local debug.)

packages = oracle.spectra.logging.base
status = WARN
monitorInterval = 30
shutdownHook = disable

rootLogger.level = ${sys:spectra-log-level:-INFO}
rootLogger.appenderRef.asyncC.ref = AsyncCAppender
rootLogger.appenderRef.asyncF.ref = AsyncFAppender

appender.asyncC.name = AsyncCAppender
appender.asyncC.type = Async
appender.asyncC.bufferSize = 256
appender.asyncC.appenderRef.type = AppenderRef
appender.asyncC.appenderRef.ref = JSONLogConsoleAppender

# Log all infos to the console
appender.console.name = JSONLogConsoleAppender
appender.console.target = SYSTEM_OUT
appender.console.type = Console
appender.console.layout.type = SpectraJsonLayout
appender.console.layout.compact = true
appender.console.layout.eventEol = true

appender.asyncF.name = AsyncFAppender
appender.asyncF.type = Async
appender.asyncF.bufferSize = 256
appender.asyncF.appenderRef.type = AppenderRef
appender.asyncF.appenderRef.ref = RollingFileAppender

# Log all infos in the given rolling file
appender.rolling.type = RollingFile
appender.rolling.name = RollingFileAppender
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = SpectraJsonLayout
appender.rolling.layout.compact = false
appender.rolling.layout.eventEol = true
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = S

Re: [External] : Timeout settings for Flink jobs?

2021-10-18 Thread Fuyao Li
I don’t know any out of the box solution for the use case you mentioned. You 
can add an operator to orchestrate your Flink clusters, when certain conditions 
are met, trigger a stop with savepoint will achieve something like you 
mentioned. Maybe Arvid can share more information.

From: Sharon Xie 
Date: Monday, October 18, 2021 at 13:34
To: Arvid Heise 
Cc: Fuyao Li , user@flink.apache.org 

Subject: Re: [External] : Timeout settings for Flink jobs?
It's promising that I can #isEndOfStream at the source. Is there a way I can 
terminate a job from the sink side instead? We want to terminate a job based on 
a few conditions (either hit the timeout limit or the output count limit).

On Mon, Oct 18, 2021 at 2:22 AM Arvid Heise 
mailto:ar...@apache.org>> wrote:
Unfortunately, DeserializationSchema#isEndOfStream is only ever supported for 
KafkaConsumer. It's going to be removed entirely, once we drop the 
KafkaConsumer.

For newer applications, you can use KafkaSource, which allows you to specify an 
end offset explicitly.

On Fri, Oct 15, 2021 at 7:05 PM Fuyao Li 
mailto:fuyao...@oracle.com>> wrote:
Hi Sharon,

I think for DataStream API, you can override the isEndOfStream() method in the 
DeserializationSchema to control the input data source to end and thus end the 
workflow.

Thanks,
Fuyao

From: Sharon Xie mailto:sharon.xie...@gmail.com>>
Date: Monday, October 11, 2021 at 12:43
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: [External] : Timeout settings for Flink jobs?
Hi there,

We have a use case where we want to terminate a job when a time limit is 
reached. Is there a Flink setting that we can use for this use case?


Thanks,
Sharon


Re: [External] : Re: How to enable customize logging library based on SLF4J for Flink deployment in Kubernetes

2021-10-18 Thread Fuyao Li
Hi Chesnay,
Thanks for the reply.

  1.  The internal logging framework is built upon slf4j/log4j2 (The same one 
Flink uses, but it comes with an additional POM dependency). I have added such 
dependency in the Flink application POM file. But it seems only to work locally 
in IDE. When it is in the Flink cluster environment, it can’t work.
  2.  I tried to only add the configmap and put a single jar into lib/ folder, 
and it seems it still can’t find the classpath. How should I organize the 
folder structure? /lib/internal-logging/xxx.jar or this jar file must be 
directly under /lib, something like /lib/xxx.jar?
  3.  I got you point, I guess it is stilling using Flink default logging 
classpath and that causes the issue of not recognizing the internal framework? 
How to check the classpath of the Flink logging? Could you share me some 
blogs..? I am not familiar with this.

Best,
Fuyao

From: Chesnay Schepler 
Date: Tuesday, September 28, 2021 at 07:06
To: Fuyao Li , user 
Cc: Rohit Gupta 
Subject: [External] : Re: How to enable customize logging library based on 
SLF4J for Flink deployment in Kubernetes
Could you clarify whether this internal framework uses a custom slfj4/log4j2 
version, or is it just using what Flink comes with?

Did you only add the configmap and put a single jar into lib, or did you make 
other changes in Flink?

Can you remove just the configmap, start the cluster, and provide us with the 
classpath that Flink is logging?


On 25/09/2021 01:57, Fuyao Li wrote:
Hi Flink Community,

I am trying enable a company internal logging framework built upon SLF4J and 
log4j. This logging framework has another separate jar and specific logging 
configurations. After debugging, I am able to make Flink application running 
correctly in the local IDE with the internal logging framework after adding 
related SLF4J, log4j dependencies, and logging framework dependencies.

However, I still run into errors when I deploy this into the Kubernetes 
environment. I tried to add the logging framework jar to /opt/flink/lib/ 
folder, but it doesn’t help much. I am not sure which part I am missing here. I 
have attached relevant information below. Thanks for your help.

This is the log4j2-console.properties I proposed, I have injected this as a 
configmap (mounted to /opt/flink/conf inside the pod using a Flink native 
Kubernetes Operator I build).
Such configuration will run correctly in Local IDE and generate logs in the 
internal logging framework expected shape. (I have rename it to 
log4j2.properties and put it into resources/ folder during local debug.)

packages = oracle.spectra.logging.base
status = WARN
monitorInterval = 30
shutdownHook = disable

rootLogger.level = ${sys:spectra-log-level:-INFO}
rootLogger.appenderRef.asyncC.ref = AsyncCAppender
rootLogger.appenderRef.asyncF.ref = AsyncFAppender

appender.asyncC.name = AsyncCAppender
appender.asyncC.type = Async
appender.asyncC.bufferSize = 256
appender.asyncC.appenderRef.type = AppenderRef
appender.asyncC.appenderRef.ref = JSONLogConsoleAppender

# Log all infos to the console
appender.console.name = JSONLogConsoleAppender
appender.console.target = SYSTEM_OUT
appender.console.type = Console
appender.console.layout.type = SpectraJsonLayout
appender.console.layout.compact = true
appender.console.layout.eventEol = true

appender.asyncF.name = AsyncFAppender
appender.asyncF.type = Async
appender.asyncF.bufferSize = 256
appender.asyncF.appenderRef.type = AppenderRef
appender.asyncF.appenderRef.ref = RollingFileAppender

# Log all infos in the given rolling file
appender.rolling.type = RollingFile
appender.rolling.name = RollingFileAppender
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = SpectraJsonLayout
appender.rolling.layout.compact = false
appender.rolling.layout.eventEol = true
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO


# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF


This is the error I got from the Job Manager pod in the Kubernetes.
sed: couldn't open temporary file /opt

Re: [External] : Timeout settings for Flink jobs?

2021-10-15 Thread Fuyao Li
Hi Sharon,

I think for DataStream API, you can override the isEndOfStream() method in the 
DeserializationSchema to control the input data source to end and thus end the 
workflow.

Thanks,
Fuyao

From: Sharon Xie 
Date: Monday, October 11, 2021 at 12:43
To: user@flink.apache.org 
Subject: [External] : Timeout settings for Flink jobs?
Hi there,

We have a use case where we want to terminate a job when a time limit is 
reached. Is there a Flink setting that we can use for this use case?


Thanks,
Sharon


How to enable customize logging library based on SLF4J for Flink deployment in Kubernetes

2021-09-24 Thread Fuyao Li
Hi Flink Community,

I am trying enable a company internal logging framework built upon SLF4J and 
log4j. This logging framework has another separate jar and specific logging 
configurations. After debugging, I am able to make Flink application running 
correctly in the local IDE with the internal logging framework after adding 
related SLF4J, log4j dependencies, and logging framework dependencies.

However, I still run into errors when I deploy this into the Kubernetes 
environment. I tried to add the logging framework jar to /opt/flink/lib/ 
folder, but it doesn’t help much. I am not sure which part I am missing here. I 
have attached relevant information below. Thanks for your help.

This is the log4j2-console.properties I proposed, I have injected this as a 
configmap (mounted to /opt/flink/conf inside the pod using a Flink native 
Kubernetes Operator I build).
Such configuration will run correctly in Local IDE and generate logs in the 
internal logging framework expected shape. (I have rename it to 
log4j2.properties and put it into resources/ folder during local debug.)

packages = oracle.spectra.logging.base
status = WARN
monitorInterval = 30
shutdownHook = disable

rootLogger.level = ${sys:spectra-log-level:-INFO}
rootLogger.appenderRef.asyncC.ref = AsyncCAppender
rootLogger.appenderRef.asyncF.ref = AsyncFAppender

appender.asyncC.name = AsyncCAppender
appender.asyncC.type = Async
appender.asyncC.bufferSize = 256
appender.asyncC.appenderRef.type = AppenderRef
appender.asyncC.appenderRef.ref = JSONLogConsoleAppender

# Log all infos to the console
appender.console.name = JSONLogConsoleAppender
appender.console.target = SYSTEM_OUT
appender.console.type = Console
appender.console.layout.type = SpectraJsonLayout
appender.console.layout.compact = true
appender.console.layout.eventEol = true

appender.asyncF.name = AsyncFAppender
appender.asyncF.type = Async
appender.asyncF.bufferSize = 256
appender.asyncF.appenderRef.type = AppenderRef
appender.asyncF.appenderRef.ref = RollingFileAppender

# Log all infos in the given rolling file
appender.rolling.type = RollingFile
appender.rolling.name = RollingFileAppender
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = SpectraJsonLayout
appender.rolling.layout.compact = false
appender.rolling.layout.eventEol = true
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO


# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF


This is the error I got from the Job Manager pod in the Kubernetes.
sed: couldn't open temporary file /opt/flink/conf/sedAHNLHl: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedBkNR6o: Read-only file 
system
/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedMGJAkn: Read-only file 
system
/docker-entrypoint.sh: line 86: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
/docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Starting kubernetes-application as a console application on host 
faw-poc-demo-67864b696b-9zbbc.
2021-09-24 23:42:53,579 main ERROR Unable to locate plugin type for 
SpectraJsonLayout
2021-09-24 23:42:53,582 main ERROR Unable to locate plugin type for 
SpectraJsonLayout
2021-09-24 23:42:53,675 main ERROR Unable to locate plugin for SpectraJsonLayout
2021-09-24 23:42:53,689 main ERROR Could not create plugin of type class 
org.apache.logging.log4j.core.appender.RollingFileAppender for element 
RollingFile: java.lang.NullPointerException java.lang.NullPointerException
at 
org.apache.logging.log4j.core.config.plugins.visitors.PluginElementVisitor.findNamedNode(PluginElementVisitor.java:104)
at 
org.apache.logging.log4j.core.config.plugins.visitors.PluginElementVisitor.visit(PluginElementVisitor.java:88)
at 
org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.injectFields(PluginBuilder.java:185)
at 

Re: Questions on reading JDBC data with Flink Streaming API

2021-08-10 Thread Fuyao Li
Hello James,

To stream real time data out of the database. You need to spin up a CDC 
instance. For example, Debezium[1]. With the CDC engine, it streams out changed 
data to Kafka (for example). You can consume the message from Kafka using 
FlinkKafkaConsumer.
For history data, it could be considered as a bounded data stream processing 
using JDBC connector.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/

Thanks,
Fuyao

From: James Sandys-Lumsdaine 
Date: Tuesday, August 10, 2021 at 07:58
To: user@flink.apache.org 
Subject: [External] : Questions on reading JDBC data with Flink Streaming API

Hello,



I'm starting a new Flink application to allow my company to perform lots of 
reporting. We have an existing legacy system with most the data we need held in 
SQL Server databases. We will need to consume data from these databases 
initially before starting to consume more data from newly deployed Kafka 
streams.



I've spent a lot of time reading the Flink book and web pages but I have some 
simple questions and assumptions I hope you can help with so I can progress.



Firstly, I am wanting to use the DataStream API so we can both consume historic 
data and also realtime data. I do not think I want to use the DataSet API but I 
also don't see the point in using the SQL/Table apis as I would prefer to write 
my functions in Java classes. I need to maintain my own state and it seems 
DataStream keyed functions are the way to go.



Now I am trying to actually write code against our production databases I need 
to be able to read in "streams" of data with SQL queries - there does not 
appear to be a JDBC source connector so I think I have to make the JDBC call 
myself and then possibly create a DataSource using env.fromElements(). 
Obviously this is a "bounded" data set but how else am I meant to get historic 
data loaded in? In the future I want to include a Kafka stream as well which 
will only have a few weeks worth of data so I imagine I will sometimes need to 
merge data from a SQL Server/Snowflake database with a live stream from a Kafka 
stream. What is the best practice for this as I don't see examples discussing 
this.



With retrieving data from a JDBC source, I have also seen some examples using a 
StreamingTableEnvironment - am I meant to use this somehow instead to query 
data from a JDBC connection into my DataStream functions etc? Again, I want to 
write my functions in Java not some Flink SQL. Is it best practice to use a 
StreamingTableEnvironment to query JDBC data if I'm only using the DataStream 
API?



Thanks in advance - I'm sure I will have plenty more high-level questions like 
this.



Re: [External] : Big data architecture

2021-08-09 Thread Fuyao Li
Hello Aissa,

I guess you might be interested in this video: 
https://www.youtube.com/watch?v=X3L75Rz64Ns=PL2oL9cdRCATGOSFvG3O5QbSuAcvkmr_KV=19

Thanks,
Fuyao

From: Aissa Elaffani 
Date: Thursday, July 15, 2021 at 03:55
To: user@flink.apache.org 
Subject: [External] : Big data architecture
Hello Guys,

I'm sorry for asking you this question, it does not have any link with Apache 
Flink, but if someone can help I would be so grateful. I want to build a big 
data architecture for batch processing, we have a lot of data that is generated 
everyday and we receive it upon a lot of sources especially 2 oracle systems 
and other external data (scraping, external files, ...), and we want to build a 
big data architecture for the storage and the processing of this data; usually 
we process the data of yesterday (J-1). I've already worked on a big data 
project, but it was for real time streaming. I used kafka as a distributed 
messaging system and apache flink for consuming data. So in this case I'm kind 
of lost, especially since it is my first batch processing project.
If anyone can help me, oriente me to what technologies i can use for storage 
and processing i would be so grateful, and i'm sorry again for asking you this 
question that does not have any link with flink ... But I know that there are 
some genius Senior big data architects that can help me build this architecture.

Thank you so much, I wish you all LUCK & SUCCESS !



Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-11 Thread Fuyao Li
Hello All,

I have solved the problem. Just in case someone need a reference in the future. 
I will share my problem solution here.

Problem 1: Log issue
Flink 1.12 have changed the default log4j configuration file name from 
log4j.properties to log4j-console.properties. From the operator’s perspective, 
the operator pods’s /opt/flink/conf directory must contain 
log4j-console.properties to enable logging to function properly. 
Log4j.properties won’t be recognized.

Problem 2:
stopWithSavepoint doesn’t work issue and Flink CLI stop/cancel/savepoint 
command doesn’t work with native Kubernetes.

For the CLI commands, I should add �Ctarget=Kubernetes-application 
-Dkubernetes.cluster-id= to all application mode Flink CLI commands 
to make it work. For stop/cancel/savepoint command, I was directly following 
the doc here [1] without adding those configurations parameters. Flink 
documentation doesn’t point out explicitly and I was kind of confused here 
earlier.

Maybe the doc can add a note here to be more informative?

For stop command not working with my code issue, it was due to my Kafka-client 
is too low. I was using Kafka-client 1.1.0 (a very old version) and it works 
okay with my flink application. Because of the log issue, I didn’t managed to 
notice such error earlier. Actually, I got such an error during executing stop 
command.

java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: 'void 
org.apache.kafka.clients.producer.KafkaProducer.close(java.time.Duration)'

Stop command introduces some better semantic for restart a job and it calls 
this method in the Flink application. A low version of Kafka client will run 
into failure. Cancel command will not have such an issue. I didn’t look deep 
into the source code implementation for this, maybe you can share more insights 
about this.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/cli/

Thanks,
Fuyao

From: Yang Wang 
Date: Friday, May 7, 2021 at 20:45
To: Fuyao Li 
Cc: Austin Cawley-Edwards , matth...@ververica.com 
, user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Since your problem is about the flink-native-k8s-operator, let's move the 
discussion there.

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年5月8日周六 
上午5:41写道:
Hi Austin, Yang, Matthias,

I am following up to see if you guys have any idea regarding this problem.

I also created an issue here to describe the problem. [1]

After looking into the source code[1], I believe for native k8s, three 
configuration files should be added to the flink-config- configmap 
automatically. However, it just have the flink-conf.yaml in the operator 
created flink application. And that is also causing the start command 
difference mentioned in the issue.


Native k8s using Flink CLI: Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 
-Xms1073741824 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/opt/flink/log/jobmanager.log 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=201326592b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=1073741824b -D 
jobmanager.memory.jvm-overhead.max=201326592b


Operator flink app:
Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 
-Xms3462817376 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=429496736b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=3462817376b -D 
jobmanager.memory.jvm-overhead.max=429496736b

Please share your opinion on this. Thanks!

[1] 
https://github.com/wangyang0918/flink-native-k8s-operator/issues/4<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator/issues/4__;!!GqivPVa7Brio!Kic66xJyvdUvhTTsvM8QZXyYZYUhVLI_tUdwbipnafqSDd7kF4wQn5taaZWaOoA$>
[2] 
https://github.com/apache/flink/blob/release-1.12/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.12/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java__;!!GqivPVa7Brio!Kic66xJyvdUvhTTsvM8QZXyYZYUhVLI_tUdwbipnafqSDd7kF4wQn5taji-Q22E$>

Have a good weekend!
Best,
Fuyao


From: Fuyao Li mailto:fuyao...@oracle.com>>
Date: Tuesday, May 4, 2021 at 19:52
To: Austin Cawley-Edwards 
mailto:austin.caw...@gmail.com>

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-07 Thread Fuyao Li
Hi Austin, Yang, Matthias,

I am following up to see if you guys have any idea regarding this problem.

I also created an issue here to describe the problem. [1]

After looking into the source code[1], I believe for native k8s, three 
configuration files should be added to the flink-config- configmap 
automatically. However, it just have the flink-conf.yaml in the operator 
created flink application. And that is also causing the start command 
difference mentioned in the issue.


Native k8s using Flink CLI: Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 
-Xms1073741824 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/opt/flink/log/jobmanager.log 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=201326592b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=1073741824b -D 
jobmanager.memory.jvm-overhead.max=201326592b


Operator flink app:
Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 
-Xms3462817376 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=429496736b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=3462817376b -D 
jobmanager.memory.jvm-overhead.max=429496736b

Please share your opinion on this. Thanks!

[1] https://github.com/wangyang0918/flink-native-k8s-operator/issues/4
[2] 
https://github.com/apache/flink/blob/release-1.12/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java

Have a good weekend!
Best,
Fuyao


From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 19:52
To: Austin Cawley-Edwards , matth...@ververica.com 
, Yang Wang 
Cc: user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello All,

I also checked the native-k8s’s automatically generated configmap. It only 
contains the flink-conf.yaml, but no log4j.properties. I am not very familiar 
with the implementation details behind native k8s.

That should be the root cause, could you check the implementation and help me 
to locate the potential problem.
Yang’s initial code: 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java__;!!GqivPVa7Brio!K_YcBE0y2rd7zQtMtIKmSNNUpMmUTUSA8VdkNCJ8i9w2tH2nwNWoq3j7UZwZEh4$>
My modified version: 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java__;!!GqivPVa7Brio!K_YcBE0y2rd7zQtMtIKmSNNUpMmUTUSA8VdkNCJ8i9w2tH2nwNWoq3j78oKszhY$>

Thank you so much.

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 19:34
To: Austin Cawley-Edwards , matth...@ververica.com 
, Yang Wang 
Cc: user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello Austin, Yang,

For the logging issue, I think I have found something worth to notice.

They are all based on base image flink:1.12.1-scala_2.11-java11

Dockerfile: 
https://pastebin.ubuntu.com/p/JTsHygsTP6/<https://urldefense.com/v3/__https:/pastebin.ubuntu.com/p/JTsHygsTP6/__;!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVs7J-Fd4Q$>

In the JM and TM provisioned by the k8s operator. There is only flink-conf.yaml 
in the $FLINK_HOME/conf directory. Even if I tried to add these configurations 
to the image in advance. It seems the operator is seems overriding it and 
removing all other log4j configurations. This is causing the logs can’t be 
printed correctly.
root@flink-demo-5fc78c8cf-hgvcj:/opt/flink/conf# ls
flink-conf.yaml


However, for the pods that is provisioned by flink native k8s CLI. There exists 
some log4j related configurations.

root@test-application-cluster-79c7f9dcf7-44bq8:/opt/flink/conf# ls
flink-conf.yaml  log4j-console.properties  logback-console.xml


The native Kubernetes operator pod can print logs correctly because it has the 
log4j.properties file mounted to the opt/flink/conf/ directory. [1]
For the Fl

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Fuyao Li
Hello All,

I also checked the native-k8s’s automatically generated configmap. It only 
contains the flink-conf.yaml, but no log4j.properties. I am not very familiar 
with the implementation details behind native k8s.

That should be the root cause, could you check the implementation and help me 
to locate the potential problem.
Yang’s initial code: 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
My modified version: 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java

Thank you so much.

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 19:34
To: Austin Cawley-Edwards , matth...@ververica.com 
, Yang Wang 
Cc: user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello Austin, Yang,

For the logging issue, I think I have found something worth to notice.

They are all based on base image flink:1.12.1-scala_2.11-java11

Dockerfile: 
https://pastebin.ubuntu.com/p/JTsHygsTP6/<https://urldefense.com/v3/__https:/pastebin.ubuntu.com/p/JTsHygsTP6/__;!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVs7J-Fd4Q$>

In the JM and TM provisioned by the k8s operator. There is only flink-conf.yaml 
in the $FLINK_HOME/conf directory. Even if I tried to add these configurations 
to the image in advance. It seems the operator is seems overriding it and 
removing all other log4j configurations. This is causing the logs can’t be 
printed correctly.
root@flink-demo-5fc78c8cf-hgvcj:/opt/flink/conf# ls
flink-conf.yaml


However, for the pods that is provisioned by flink native k8s CLI. There exists 
some log4j related configurations.

root@test-application-cluster-79c7f9dcf7-44bq8:/opt/flink/conf# ls
flink-conf.yaml  log4j-console.properties  logback-console.xml


The native Kubernetes operator pod can print logs correctly because it has the 
log4j.properties file mounted to the opt/flink/conf/ directory. [1]
For the Flink pods, it seems that it only have a flink-conf.yaml injected 
there. [2][3] No log4j related configmap is configured. That makes the logs in 
those pods no available.

I am not sure how to inject something similar to the flink pods? Maybe adding 
some similar structure that exists in [1], into the cr.yaml ? So that such 
configmap will make the log4j.properties available for flink CRD?

I am kind of confused at how to implement this. The deployment is a one-step 
operation in [4]. I don’t know how to make a configmap available to it? Maybe I 
can only use the new feature – pod template in Flink 1.13 to do this?



[1] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml#L58<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml*L58__;Iw!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVsYsKSwdA$>
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/cr.yaml#L21<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/cr.yaml*L21__;Iw!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVs-oWyvPk$>
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java#L83<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java*L83__;Iw!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVsk28At-8$>
[4] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L176<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java*L176__;Iw!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVsc49StKI$>

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 15:23
To: Austin Cawley-Edwards , matth...@ververica.com 

Cc: user , Yang Wang , Austin 
Cawley-Edwards 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello All,

For Logging issue:
Hi Austin,

This is the my full code implementation link [1], I just removed the credential 
related things. The operator definition can be found here.[2] You can also 
check other parts if you find any problem.
The operator uses log4j2 and you can see there is a log appender for operator 
[3] and the operator pod can indeed pri

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Fuyao Li
Hello Austin, Yang,

For the logging issue, I think I have found something worth to notice.

They are all based on base image flink:1.12.1-scala_2.11-java11

Dockerfile: https://pastebin.ubuntu.com/p/JTsHygsTP6/

In the JM and TM provisioned by the k8s operator. There is only flink-conf.yaml 
in the $FLINK_HOME/conf directory. Even if I tried to add these configurations 
to the image in advance. It seems the operator is seems overriding it and 
removing all other log4j configurations. This is causing the logs can’t be 
printed correctly.
root@flink-demo-5fc78c8cf-hgvcj:/opt/flink/conf# ls
flink-conf.yaml


However, for the pods that is provisioned by flink native k8s CLI. There exists 
some log4j related configurations.

root@test-application-cluster-79c7f9dcf7-44bq8:/opt/flink/conf# ls
flink-conf.yaml  log4j-console.properties  logback-console.xml


The native Kubernetes operator pod can print logs correctly because it has the 
log4j.properties file mounted to the opt/flink/conf/ directory. [1]
For the Flink pods, it seems that it only have a flink-conf.yaml injected 
there. [2][3] No log4j related configmap is configured. That makes the logs in 
those pods no available.

I am not sure how to inject something similar to the flink pods? Maybe adding 
some similar structure that exists in [1], into the cr.yaml ? So that such 
configmap will make the log4j.properties available for flink CRD?

I am kind of confused at how to implement this. The deployment is a one-step 
operation in [4]. I don’t know how to make a configmap available to it? Maybe I 
can only use the new feature – pod template in Flink 1.13 to do this?



[1] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml#L58
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/cr.yaml#L21
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java#L83
[4] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L176

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 15:23
To: Austin Cawley-Edwards , matth...@ververica.com 

Cc: user , Yang Wang , Austin 
Cawley-Edwards 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello All,

For Logging issue:
Hi Austin,

This is the my full code implementation link [1], I just removed the credential 
related things. The operator definition can be found here.[2] You can also 
check other parts if you find any problem.
The operator uses log4j2 and you can see there is a log appender for operator 
[3] and the operator pod can indeed print out logs. But for the flink 
application JM and TM pod, I can see the errors mentioned earlier. Sed error 
and ERROR StatusLogger No Log4j 2 configuration file found.

I used to use log4j for flink application, to avoid potential incompatible 
issue, I have already upgraded the POM for flink application to use log4j2. But 
the logging problem still exists.

This is my log4j2.properties file in flink application. [6] This is the loggin 
related pom dependencies for flink application [7].

The logs can be printed during normal native k8s deployment and IDE debugging. 
When it comes to the operator, it seems not working. Could this be caused by 
class namespace conflict? Since I introduced the presto jar in the flink 
distribution. This is my Dockerfile to build the flink application jar [5].

Please share your idea on this.

For the stopWithSavepoint issue,
Just to note, I understand cancel command (cancelWithSavepoint() ) is a 
deprecated feature and it may not guarantee exactly once semantic and get 
inconsistent result, like Timer related things? Please correct me if I am 
wrong. The code that works with cancelWithSavepoint() is shared in [4] below.


[1] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator__;!!GqivPVa7Brio!M43rnv3I8fCMsbDbdQLrgzWtsF6rSJJlebU1hGD9w0OMn702aTN6N-zS0pc8xyg$>
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml__;!!GqivPVa7Brio!M43rnv3I8fCMsbDbdQLrgzWtsF6rSJJlebU1hGD9w0OMn702aTN6N-zSxwZ7y0c$>
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/resources/log4j2.properties<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/resources/log4j2.properties__;!!GqivPVa7Brio!M43rnv3I8fCMsbDbdQLrgzWtsF6rSJJlebU1hGD9w0OMn702aTN6N-zSmcVTvfY$>
[4] 
https://pastebin.ubuntu.com/p

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Fuyao Li
Hello All,

For Logging issue:
Hi Austin,

This is the my full code implementation link [1], I just removed the credential 
related things. The operator definition can be found here.[2] You can also 
check other parts if you find any problem.
The operator uses log4j2 and you can see there is a log appender for operator 
[3] and the operator pod can indeed print out logs. But for the flink 
application JM and TM pod, I can see the errors mentioned earlier. Sed error 
and ERROR StatusLogger No Log4j 2 configuration file found.

I used to use log4j for flink application, to avoid potential incompatible 
issue, I have already upgraded the POM for flink application to use log4j2. But 
the logging problem still exists.

This is my log4j2.properties file in flink application. [6] This is the loggin 
related pom dependencies for flink application [7].

The logs can be printed during normal native k8s deployment and IDE debugging. 
When it comes to the operator, it seems not working. Could this be caused by 
class namespace conflict? Since I introduced the presto jar in the flink 
distribution. This is my Dockerfile to build the flink application jar [5].

Please share your idea on this.

For the stopWithSavepoint issue,
Just to note, I understand cancel command (cancelWithSavepoint() ) is a 
deprecated feature and it may not guarantee exactly once semantic and get 
inconsistent result, like Timer related things? Please correct me if I am 
wrong. The code that works with cancelWithSavepoint() is shared in [4] below.


[1] https://github.com/FuyaoLi2017/flink-native-kubernetes-operator
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/resources/log4j2.properties
[4] https://pastebin.ubuntu.com/p/tcxT2FwPRS/
[5] https://pastebin.ubuntu.com/p/JTsHygsTP6/
[6] https://pastebin.ubuntu.com/p/2wgdcxVfSy/
[7] https://pastebin.ubuntu.com/p/Sq8xRjQyVY/


Best,
Fuyao

From: Austin Cawley-Edwards 
Date: Tuesday, May 4, 2021 at 14:47
To: matth...@ververica.com 
Cc: Fuyao Li , user , Yang Wang 
, Austin Cawley-Edwards 
Subject: [External] : Re: StopWithSavepoint() method doesn't work in Java based 
flink native k8s operator
Hey all,

Thanks for the ping, Matthias. I'm not super familiar with the details of @Yang 
Wang<mailto:danrtsey...@gmail.com>'s operator, to be honest :(. Can you share 
some of your FlinkApplication specs?

For the `kubectl logs` command, I believe that just reads stdout from the 
container. Which logging framework are you using in your application and how 
have you configured it? There's a good guide for configuring the popular ones 
in the Flink docs[1]. For instance, if you're using the default Log4j 2 
framework you should configure a ConsoleAppender[2].

Hope that helps a bit,
Austin

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/advanced/logging/<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/advanced/logging/__;!!GqivPVa7Brio!IkcTZZ5rY-669_XS8ldTeXg0NeH1nsQkupDh_zuUHAC4yqDOoiJ6f2EvCjPpPPQ$>
[2]: 
https://logging.apache.org/log4j/2.x/manual/appenders.html#ConsoleAppender<https://urldefense.com/v3/__https:/logging.apache.org/log4j/2.x/manual/appenders.html*ConsoleAppender__;Iw!!GqivPVa7Brio!IkcTZZ5rY-669_XS8ldTeXg0NeH1nsQkupDh_zuUHAC4yqDOoiJ6f2EvHl40sbA$>

On Tue, May 4, 2021 at 1:59 AM Matthias Pohl 
mailto:matth...@ververica.com>> wrote:
Hi Fuyao,
sorry for not replying earlier. The stop-with-savepoint operation shouldn't 
only suspend but terminate the job. Is it that you might have a larger state 
that makes creating the savepoint take longer? Even though, considering that 
you don't experience this behavior with your 2nd solution, I'd assume that we 
could ignore this possibility.

I'm gonna add Austin to the conversation as he worked with k8s operators as 
well already. Maybe, he can also give you more insights on the logging issue 
which would enable us to dig deeper into what's going on with 
stop-with-savepoint.

Best,
Matthias

On Tue, May 4, 2021 at 4:33 AM Fuyao Li 
mailto:fuyao...@oracle.com>> wrote:
Hello,

Update:
I think stopWithSavepoint() only suspend the job. It doesn’t actually terminate 
(./bin/flink cancel) the job. I switched to cancelWithSavepoint() and it works 
here.

Maybe stopWithSavepoint() should only be used to update the configurations like 
parallelism? For updating the image, this seems to be not suitable, please 
correct me if I am wrong.

For the log issue, I am still a bit confused. Why it is not available in 
kubectl logs. How should I get access to it?

Thanks.
Best,
Fuyao

From: Fuyao Li mailto:fuyao...@oracle.com>>
Date: Sunday, May 2, 2021 at 00:36
To: user mailto:user@flink.apache.org>>, Yang Wang 
mailto:danrtsey...@gmail.com>>
Subject: [External] : Re: StopW

Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-03 Thread Fuyao Li
Hello,

Update:
I think stopWithSavepoint() only suspend the job. It doesn’t actually terminate 
(./bin/flink cancel) the job. I switched to cancelWithSavepoint() and it works 
here.

Maybe stopWithSavepoint() should only be used to update the configurations like 
parallelism? For updating the image, this seems to be not suitable, please 
correct me if I am wrong.

For the log issue, I am still a bit confused. Why it is not available in 
kubectl logs. How should I get access to it?

Thanks.
Best,
Fuyao

From: Fuyao Li 
Date: Sunday, May 2, 2021 at 00:36
To: user , Yang Wang 
Subject: [External] : Re: StopWithSavepoint() method doesn't work in Java based 
flink native k8s operator
Hello,

I noticed that first trigger a savepoint and then delete the deployment might 
cause the duplicate data issue. That could pose a bad influence to the semantic 
correctness. Please give me some hints on how to make the stopWithSavepoint() 
work correctly with Fabric8io Java k8s client to perform this image update 
operation. Thanks!

Best,
Fuyao



From: Fuyao Li 
Date: Friday, April 30, 2021 at 18:03
To: user , Yang Wang 
Subject: [External] : Re: StopWithSavepoint() method doesn't work in Java based 
flink native k8s operator
Hello Community, Yang,

I have one more question for logging. I also noticed that if I execute kubectl 
logs  command to the JM. The pods provisioned by the operator can’t print out 
the internal Flink logs in the kubectl logs. I can only get something like the 
logs below. No actual flink logs is printed here… Where can I find the path to 
the logs? Maybe use a sidecar container to get it out? How can I get the logs 
without checking the Flink WebUI? Also, the sed error makes me confused here. 
In fact, the application is already up and running correctly if I access the 
WebUI through Ingress.

Reference: 
https://github.com/wangyang0918/flink-native-k8s-operator/issues/4<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator/issues/4__;!!GqivPVa7Brio!PZPkOj4s7du8ItEG-AxKGR2EN6pWDuKfwcjZNKbpLfhXHRD3IoaH6zptEJWo5vM$>


[root@bastion deploy]# kubectl logs -f flink-demo-594946fd7b-822xk

sed: couldn't open temporary file /opt/flink/conf/sedh1M3oO: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sed8TqlNR: Read-only file 
system
/docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedvO2DFU: Read-only file 
system
/docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
/docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 
-Xms3462817376 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=429496736b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=3462817376b -D 
jobmanager.memory.jvm-overhead.max=429496736b
ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user programmatically 
provided configurations. Set system property 'log4j2.debug' to show Log4j 2 
internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html<https://urldefense.com/v3/__https:/logging.apache.org/log4j/2.x/manual/configuration.html__;!!GqivPVa7Brio!PZPkOj4s7du8ItEG-AxKGR2EN6pWDuKfwcjZNKbpLfhXHRD3IoaH6zptpRoiZsE$>
 for instructions on how to configure Log4j 2
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner 
(file:/opt/flink/lib/flink-dist_2.11-1.12.1.jar) to field 
java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release


 The logs stops here, flink applications logs doesn’t get printed here 
anymore-

^C
[root@bastion deploy]# kubectl logs -f flink-demo-taskmanager-1-1
sed: couldn't open temporary file /opt/flink/conf/sedaNDoNR: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/seddze7tQ: Read-only file 
system
/docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedYveZoT: Read-only file 
system
/docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
/docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx697932173 
-Xms697932173 -XX:MaxDire

Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-02 Thread Fuyao Li
Hello,

I noticed that first trigger a savepoint and then delete the deployment might 
cause the duplicate data issue. That could pose a bad influence to the semantic 
correctness. Please give me some hints on how to make the stopWithSavepoint() 
work correctly with Fabric8io Java k8s client to perform this image update 
operation. Thanks!

Best,
Fuyao



From: Fuyao Li 
Date: Friday, April 30, 2021 at 18:03
To: user , Yang Wang 
Subject: [External] : Re: StopWithSavepoint() method doesn't work in Java based 
flink native k8s operator
Hello Community, Yang,

I have one more question for logging. I also noticed that if I execute kubectl 
logs  command to the JM. The pods provisioned by the operator can’t print out 
the internal Flink logs in the kubectl logs. I can only get something like the 
logs below. No actual flink logs is printed here… Where can I find the path to 
the logs? Maybe use a sidecar container to get it out? How can I get the logs 
without checking the Flink WebUI? Also, the sed error makes me confused here. 
In fact, the application is already up and running correctly if I access the 
WebUI through Ingress.

Reference: 
https://github.com/wangyang0918/flink-native-k8s-operator/issues/4<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator/issues/4__;!!GqivPVa7Brio!PZPkOj4s7du8ItEG-AxKGR2EN6pWDuKfwcjZNKbpLfhXHRD3IoaH6zptEJWo5vM$>


[root@bastion deploy]# kubectl logs -f flink-demo-594946fd7b-822xk

sed: couldn't open temporary file /opt/flink/conf/sedh1M3oO: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sed8TqlNR: Read-only file 
system
/docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedvO2DFU: Read-only file 
system
/docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
/docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 
-Xms3462817376 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=429496736b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=3462817376b -D 
jobmanager.memory.jvm-overhead.max=429496736b
ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user programmatically 
provided configurations. Set system property 'log4j2.debug' to show Log4j 2 
internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html<https://urldefense.com/v3/__https:/logging.apache.org/log4j/2.x/manual/configuration.html__;!!GqivPVa7Brio!PZPkOj4s7du8ItEG-AxKGR2EN6pWDuKfwcjZNKbpLfhXHRD3IoaH6zptpRoiZsE$>
 for instructions on how to configure Log4j 2
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner 
(file:/opt/flink/lib/flink-dist_2.11-1.12.1.jar) to field 
java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release


 The logs stops here, flink applications logs doesn’t get printed here 
anymore-

^C
[root@bastion deploy]# kubectl logs -f flink-demo-taskmanager-1-1
sed: couldn't open temporary file /opt/flink/conf/sedaNDoNR: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/seddze7tQ: Read-only file 
system
/docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedYveZoT: Read-only file 
system
/docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
/docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx697932173 
-Xms697932173 -XX:MaxDirectMemorySize=300647712 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=166429984b -D 
taskmanager.memory.network.min=166429984b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=665719939b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=563714445b -D 
taskmanager.memory.task.off-heap.size=0b --configDir /opt/flink/conf 
-Djobmanager.memory.jvm-overhead.min='429496736b' 
-Dpipeline.classpaths='file:usrlib/quickstart-0.1.jar' 
-Dtaskmanager.resource-id='flink-demo-tas

Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-04-30 Thread Fuyao Li
 
(file:/opt/flink/lib/flink-dist_2.11-1.12.1.jar) to method 
java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release
Apr 29, 2021 12:58:34 AM oracle.simplefan.impl.FanManager configure
SEVERE: attempt to configure ONS in FanManager failed with 
oracle.ons.NoServersAvailable: Subscription time out


 The logs stops here, flink applications logs doesn’t get printed here 
anymore-


Best,
Fuyao


From: Fuyao Li 
Date: Friday, April 30, 2021 at 16:50
To: user , Yang Wang 
Subject: [External] : StopWithSavepoint() method doesn't work in Java based 
flink native k8s operator
Hello Community, Yang,

I am trying to extend the flink native Kubernetes operator by adding some new 
features based on the repo [1]. I wrote a method to release the image update 
functionality. [2] I added the
triggerImageUpdate(oldFlinkApp, flinkApp, effectiveConfig);

under the existing method.

triggerSavepoint(oldFlinkApp, flinkApp, effectiveConfig);


I wrote a function to accommodate the image change behavior.[2]

Solution1:
I want to use stopWithSavepoint() method to complete the task. However, I found 
it will get stuck and never get completed. Even if I use get() for the 
completeableFuture. It will always timeout and throw exceptions. See solution 1 
logs [3]

Solution2:
I tried to trigger a savepoint, then delete the deployment in the code and then 
create a new application with new image. This seems to work fine. Log link: [4]

My questions:

  1.  Why solution 1 will get stuck? triggerSavepoint() CompleteableFuture 
could work here… Why stopWithSavepoint() will always get stuck or timeout? Very 
confused.
  2.  For Fabric8io library, I am still new to it, did I do anything wrong in 
the implementation, maybe I should update the jobStatus? Please give me some 
suggestions.
  3.  For work around solution 2, is there any bad influence I didn’t notice?


[1] 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!GqivPVa7Brio!PJIKFBi86alhx1DCxiWp8FkWKToD8XC8tNHFFrYSZj3AKM3zqyiNRjijNSMY0DI$>
[2] 
https://pastebin.ubuntu.com/p/tQShjmdcJt/<https://urldefense.com/v3/__https:/pastebin.ubuntu.com/p/tQShjmdcJt/__;!!GqivPVa7Brio!PJIKFBi86alhx1DCxiWp8FkWKToD8XC8tNHFFrYSZj3AKM3zqyiNRjijoiwPw-I$>
[3] 
https://pastebin.ubuntu.com/p/YHSPpK4W4Z/<https://urldefense.com/v3/__https:/pastebin.ubuntu.com/p/YHSPpK4W4Z/__;!!GqivPVa7Brio!PJIKFBi86alhx1DCxiWp8FkWKToD8XC8tNHFFrYSZj3AKM3zqyiNRjijmgfSmqs$>
[4] 
https://pastebin.ubuntu.com/p/3VG7TtXXfh/<https://urldefense.com/v3/__https:/pastebin.ubuntu.com/p/3VG7TtXXfh/__;!!GqivPVa7Brio!PJIKFBi86alhx1DCxiWp8FkWKToD8XC8tNHFFrYSZj3AKM3zqyiNRjijr_tizPo$>

Best,
Fuyao


Re: Protobuf support with Flink SQL and Kafka Connector

2021-04-30 Thread Fuyao Li
Hello Shipeng,

I am not an expert in Flink, just want to share some of my thoughts. Maybe 
others can give you better ideas.
I think there is no directly available Protobuf support for Flink SQL. However, 
you can write a user-defined format to support it [1].
If you use DataStream API, you can leverage Kryo Serializer to serialize and 
deserialize with Protobuf format. [2]. There is an out-of-box integration for 
Protobuf here. You will need to convert it to Flink SQL after data ingestion.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#user-defined-sources-sinks
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html

Best,
Fuyao


From: Shipeng Xie 
Date: Friday, April 30, 2021 at 14:58
To: user@flink.apache.org 
Subject: [External] : Protobuf support with Flink SQL and Kafka Connector
Hi,

In 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/,
 it does not mention protobuf format. Does Flink SQL support protobuf format? 
If not, is there any plan to support it in the near future?
Thanks!



StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-04-30 Thread Fuyao Li
Hello Community, Yang,

I am trying to extend the flink native Kubernetes operator by adding some new 
features based on the repo [1]. I wrote a method to release the image update 
functionality. [2] I added the
triggerImageUpdate(oldFlinkApp, flinkApp, effectiveConfig);

under the existing method.

triggerSavepoint(oldFlinkApp, flinkApp, effectiveConfig);


I wrote a function to accommodate the image change behavior.[2]

Solution1:
I want to use stopWithSavepoint() method to complete the task. However, I found 
it will get stuck and never get completed. Even if I use get() for the 
completeableFuture. It will always timeout and throw exceptions. See solution 1 
logs [3]

Solution2:
I tried to trigger a savepoint, then delete the deployment in the code and then 
create a new application with new image. This seems to work fine. Log link: [4]

My questions:

  1.  Why solution 1 will get stuck? triggerSavepoint() CompleteableFuture 
could work here… Why stopWithSavepoint() will always get stuck or timeout? Very 
confused.
  2.  For Fabric8io library, I am still new to it, did I do anything wrong in 
the implementation, maybe I should update the jobStatus? Please give me some 
suggestions.
  3.  For work around solution 2, is there any bad influence I didn’t notice?


[1] https://github.com/wangyang0918/flink-native-k8s-operator
[2] https://pastebin.ubuntu.com/p/tQShjmdcJt/
[3] https://pastebin.ubuntu.com/p/YHSPpK4W4Z/
[4] https://pastebin.ubuntu.com/p/3VG7TtXXfh/

Best,
Fuyao


Re: [External] : Re: Conflict in the document - About native Kubernetes per job mode

2021-04-16 Thread Fuyao Li
Hello Yang,

Please take a look at the PR when you are free.
https://github.com/apache/flink/pull/15602<https://urldefense.com/v3/__https:/github.com/apache/flink/pull/15602__;!!GqivPVa7Brio!PvSWfGFchHWHUDIT-tZWHhlT14WLLgt0o8bsKbxezhdomLWIwR-rHuUJl1Y99Po$>

Should be a simple change. Thanks!

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, April 13, 2021 at 19:10
To: Yang Wang 
Cc: user 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
Hello Yang,

I also created a PR for this issue. Please take a look.
Refer to 
https://github.com/apache/flink/pull/15602<https://urldefense.com/v3/__https:/github.com/apache/flink/pull/15602__;!!GqivPVa7Brio!PvSWfGFchHWHUDIT-tZWHhlT14WLLgt0o8bsKbxezhdomLWIwR-rHuUJl1Y99Po$>

Thanks,
Fuyao

From: Fuyao Li 
Date: Tuesday, April 13, 2021 at 18:23
To: Yang Wang 
Cc: user 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
Hello Yang,

I tried to create a ticket 
https://issues.apache.org/jira/browse/FLINK-22264<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/FLINK-22264__;!!GqivPVa7Brio!JAL6kfYxsKxUWjI6RfOyzKgsII1Qvg0NKUSVfiwMVonKnBNQXm7kTHJekM03uUU$>
I just registered as a user and I can’t find a place to assign the task to 
myself… Any idea on this jira issue?

Thanks.

Best,
Fuyao

From: Yang Wang 
Date: Tuesday, April 13, 2021 at 03:01
To: Fuyao Li 
Cc: user , Yan Wang 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
I think it makes sense to have such a simple fix.

Could you please create a ticket and attach a PR?

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
下午2:24写道:
Hello Yang,

It is very kind of you to give such a detailed explanation! Thanks for 
clarification.

For the small document fix I mentioned, what do you think?

Best,
Fuyao

From: Yang Wang mailto:danrtsey...@gmail.com>>
Date: Monday, April 12, 2021 at 23:03
To: Fuyao Li mailto:fuyao...@oracle.com>>
Cc: user mailto:user@flink.apache.org>>, Yan Wang 
mailto:y.yan.w.w...@oracle.com>>
Subject: [External] : Re: Conflict in the document - About native Kubernetes 
per job mode
Hi Fuyao,

Currently, Flink only supports perjob mode for Yarn. The standalone job cluster 
has been replaced with standalone application mode
after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration do 
not support per-job mode.

In your attached video, it is a PoC implementation for presentation. We have 
introduced the Kubernetes application mode in release 1.11
and agree to not have a per-job cluster support. The only reason is that it is 
not very convenient to ship the job graphs, user jars, artifacts in
Kubernetes environment.

[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-85*Flink*Application*Mode__;Kysr!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_Bg9TMlg$>

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
上午8:10写道:
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html*flink-job-cluster__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_YCOfjX4$>
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html*per-job-cluster-mode__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_-CMjlT8$>
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/nativ

Re: [External] : Re: Conflict in the document - About native Kubernetes per job mode

2021-04-13 Thread Fuyao Li
Hello Yang,

I also created a PR for this issue. Please take a look.
Refer to https://github.com/apache/flink/pull/15602

Thanks,
Fuyao

From: Fuyao Li 
Date: Tuesday, April 13, 2021 at 18:23
To: Yang Wang 
Cc: user 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
Hello Yang,

I tried to create a ticket 
https://issues.apache.org/jira/browse/FLINK-22264<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/FLINK-22264__;!!GqivPVa7Brio!JAL6kfYxsKxUWjI6RfOyzKgsII1Qvg0NKUSVfiwMVonKnBNQXm7kTHJekM03uUU$>
I just registered as a user and I can’t find a place to assign the task to 
myself… Any idea on this jira issue?

Thanks.

Best,
Fuyao

From: Yang Wang 
Date: Tuesday, April 13, 2021 at 03:01
To: Fuyao Li 
Cc: user , Yan Wang 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
I think it makes sense to have such a simple fix.

Could you please create a ticket and attach a PR?

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
下午2:24写道:
Hello Yang,

It is very kind of you to give such a detailed explanation! Thanks for 
clarification.

For the small document fix I mentioned, what do you think?

Best,
Fuyao

From: Yang Wang mailto:danrtsey...@gmail.com>>
Date: Monday, April 12, 2021 at 23:03
To: Fuyao Li mailto:fuyao...@oracle.com>>
Cc: user mailto:user@flink.apache.org>>, Yan Wang 
mailto:y.yan.w.w...@oracle.com>>
Subject: [External] : Re: Conflict in the document - About native Kubernetes 
per job mode
Hi Fuyao,

Currently, Flink only supports perjob mode for Yarn. The standalone job cluster 
has been replaced with standalone application mode
after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration do 
not support per-job mode.

In your attached video, it is a PoC implementation for presentation. We have 
introduced the Kubernetes application mode in release 1.11
and agree to not have a per-job cluster support. The only reason is that it is 
not very convenient to ship the job graphs, user jars, artifacts in
Kubernetes environment.

[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-85*Flink*Application*Mode__;Kysr!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_Bg9TMlg$>

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
上午8:10写道:
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html*flink-job-cluster__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_YCOfjX4$>
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html*per-job-cluster-mode__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_-CMjlT8$>
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html*per-job-cluster-mode__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_yXjZ_Mk$>
[4] 
https://www.youtube.com/watch?v=pdFPr_VOWTU=833s<https://urldefense.com/v3/__https:/www.youtube.com/watch?v=pdFPr_VOWTU=833s__;!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_J8hhHqE$>

Best,
Fuyao


Re: [External] : Re: Conflict in the document - About native Kubernetes per job mode

2021-04-13 Thread Fuyao Li
Hello Yang,

I tried to create a ticket https://issues.apache.org/jira/browse/FLINK-22264
I just registered as a user and I can’t find a place to assign the task to 
myself… Any idea on this jira issue?

Thanks.

Best,
Fuyao

From: Yang Wang 
Date: Tuesday, April 13, 2021 at 03:01
To: Fuyao Li 
Cc: user , Yan Wang 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
I think it makes sense to have such a simple fix.

Could you please create a ticket and attach a PR?

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
下午2:24写道:
Hello Yang,

It is very kind of you to give such a detailed explanation! Thanks for 
clarification.

For the small document fix I mentioned, what do you think?

Best,
Fuyao

From: Yang Wang mailto:danrtsey...@gmail.com>>
Date: Monday, April 12, 2021 at 23:03
To: Fuyao Li mailto:fuyao...@oracle.com>>
Cc: user mailto:user@flink.apache.org>>, Yan Wang 
mailto:y.yan.w.w...@oracle.com>>
Subject: [External] : Re: Conflict in the document - About native Kubernetes 
per job mode
Hi Fuyao,

Currently, Flink only supports perjob mode for Yarn. The standalone job cluster 
has been replaced with standalone application mode
after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration do 
not support per-job mode.

In your attached video, it is a PoC implementation for presentation. We have 
introduced the Kubernetes application mode in release 1.11
and agree to not have a per-job cluster support. The only reason is that it is 
not very convenient to ship the job graphs, user jars, artifacts in
Kubernetes environment.

[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-85*Flink*Application*Mode__;Kysr!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_Bg9TMlg$>

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
上午8:10写道:
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html*flink-job-cluster__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_YCOfjX4$>
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html*per-job-cluster-mode__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_-CMjlT8$>
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html*per-job-cluster-mode__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_yXjZ_Mk$>
[4] 
https://www.youtube.com/watch?v=pdFPr_VOWTU=833s<https://urldefense.com/v3/__https:/www.youtube.com/watch?v=pdFPr_VOWTU=833s__;!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_J8hhHqE$>

Best,
Fuyao


Re: [External] : Re: Conflict in the document - About native Kubernetes per job mode

2021-04-13 Thread Fuyao Li
Hello Yang,

It is very kind of you to give such a detailed explanation! Thanks for 
clarification.

For the small document fix I mentioned, what do you think?

Best,
Fuyao

From: Yang Wang 
Date: Monday, April 12, 2021 at 23:03
To: Fuyao Li 
Cc: user , Yan Wang 
Subject: [External] : Re: Conflict in the document - About native Kubernetes 
per job mode
Hi Fuyao,

Currently, Flink only supports perjob mode for Yarn. The standalone job cluster 
has been replaced with standalone application mode
after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration do 
not support per-job mode.

In your attached video, it is a PoC implementation for presentation. We have 
introduced the Kubernetes application mode in release 1.11
and agree to not have a per-job cluster support. The only reason is that it is 
not very convenient to ship the job graphs, user jars, artifacts in
Kubernetes environment.

[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-85*Flink*Application*Mode__;Kysr!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_Bg9TMlg$>

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
上午8:10写道:
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html*flink-job-cluster__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_YCOfjX4$>
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html*per-job-cluster-mode__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_-CMjlT8$>
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html*per-job-cluster-mode__;Iw!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_yXjZ_Mk$>
[4] 
https://www.youtube.com/watch?v=pdFPr_VOWTU=833s<https://urldefense.com/v3/__https:/www.youtube.com/watch?v=pdFPr_VOWTU=833s__;!!GqivPVa7Brio!JyehsOnEdEuM_rB3oA0QXTln72YUl4ueOuSdQWGAjKmpVFjVnz8jiZb_J8hhHqE$>

Best,
Fuyao


Conflict in the document - About native Kubernetes per job mode

2021-04-12 Thread Fuyao Li
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode
[4] https://www.youtube.com/watch?v=pdFPr_VOWTU=833s

Best,
Fuyao


Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-06 Thread Fuyao Li
Hi Yang,

Thanks for the reply, those information is very helpful.

Best,
Fuyao

From: Yang Wang 
Date: Tuesday, April 6, 2021 at 01:11
To: Fuyao Li 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Sorry for the late reply.

It is not very hard to develop your own deployer. Actually, I have 3 days for 
developing the PoC version of flink-native-k8s-operator. So
if you want to have a fully functional K8s operator, maybe two weeks is enough. 
But if you want to put it into production, you may need
some more time to polish it for easier use.

Flink native K8s integration is not going to replace the standalone mode. 
First, not all the Flink standalone clusters are running on the K8s.
And standalone mode could work really well with reactive mode[1].


Flink native K8s integration is not going to replace the K8s operator. 
Actually, the Flink K8s operator is not on the same level of Flink native
integration. The Flink k8s operator is responsible for managing the lifecycle 
of Flink application. Also it is to make the submission more K8s style.
The google and lyft Flink k8s operator could support native mode. They just do 
not have the support right now.


Kubernetes HA could work both for standalone mode and native mode. You could 
find the configuration here[2]. However, you might
need some changes on the Flink k8s operator to make it work. Because we need to 
add more args(e.g. --host) to the JobManager start commands.


[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-159*3A*Reactive*Mode__;JSsr!!GqivPVa7Brio!LlA2Z6MFKqpH1XysG69hwElbXv4nC7ZsP5YFYscbPpNQVoXSn_r_39Hr_K1mDh0$>
[2]. 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/*high-availability-with-standalone-kubernetes__;Iw!!GqivPVa7Brio!LlA2Z6MFKqpH1XysG69hwElbXv4nC7ZsP5YFYscbPpNQVoXSn_r_39Hr5zvpqB4$>

Best,
Yang


Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月5日周一 
下午1:33写道:
Hello Yang,

I am just following up the previous email to see if you got some time to reply.
I also took a deeper look into lyft k8s operator recently. It seems it doesn’t 
support HA natively. It still needs the help of ZooKeeper. In terms of this, 
native k8s is better. Any other ideas? Thanks for your help.

Best,
Fuyao

From: Fuyao Li mailto:fuyao...@oracle.com>>
Date: Thursday, April 1, 2021 at 12:22
To: Yang Wang mailto:danrtsey...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Yang,

Thanks for sharing the insights.

For problem 1:
I think I can’t do telnet in the container. I tried to use curl 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!LlA2Z6MFKqpH1XysG69hwElbXv4nC7ZsP5YFYscbPpNQVoXSn_r_39HrBFdpzbY$>
 and I could see the HTML of Flink dashboard UI. This proves such public IP is 
reachable inside the cluster. Just as you mentioned, there might still be some 
network issues with the cluster. I will do some further check.

For problem 2:
I created a new K8S cluster with bastion server with some public IP assigned to 
it. Finally, I can see something valid from my browser. (There still exist some 
problems with connecting to some databases, but I think these network problems 
are not directly related to Flink, I can investigate into it later.)

For problem 3:
Thanks for sharing the repo you created. I am not sure how much work it could 
take to develop a deployer. I understand is depends on the proficiency, could 
you give a rough estimation? If it is too complicated and some other options 
are not significantly inferior to native Kubernetes. I might prefer to choose 
other options. I am currently comparing different options to deploy in 
Kubernetes.

  1.  Standalone K8S
  2.  Native Kubernetes
  3.  Flink operator (Google Cloud Platform/ Lyft) [1][2]

I also watched the demo video you presented. [3] I noticed you mentioned that 
native K8S is not going to replace the other two options. I still doesn’t fully 
get your idea with limited explanation in the demo. Could you compare the 
tradeoff a little bit? Thanks!
[1] 
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator<https://urldefense.com/v3/__https:/github.com/GoogleCloudPlatform/flink-on-k8s-operator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sY_v6kdJg$>
[2]  
https://github.com/lyft/flinkk8soperator<https://urldefense.com/v3/__https:/github.com/lyft/flinkk8soperator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sYbe30Oc8$>
[3] 

Re: [External] : Union of more then two streams

2021-04-05 Thread Fuyao Li
Hello BB,


  1.  For the datastream approach, you can use broadcast pattern to build state 
to enrich your data instead of join.
 *   You can define something like this,
Class CodebookData{
 private Currency currency;
 private OrganizationUnit organizationUnit;
 ...
}


 *   you can leverage Broadcast stream[1] as you mentioned your code book 
streams doesn’t have much data. This is a good use case for broadcast pattern. 
Connect the wrapper class datastream with the main stream and simply enrich it 
with the state you built. Not sure if this fits into your use case…. Please 
check.
  1.  I am not sure, lateral table join (temporal join) is designed to handle 
some data enrich work load. You have a main table, and probe side table… I 
suppose there is some kind of optimization, maybe I am wrong... In theory, it 
is still based on join, maybe you forget about this part. Anyway, Flink SQL 
will make join easier.


Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Best,
Fuyao

From: B.B. 
Date: Monday, April 5, 2021 at 06:27
To: Fuyao Li 
Subject: Re: [External] : Union of more then two streams
Hi Fuyao,
thanks for you input.
I have follow up questions regarding your advices.

In your DataStream suggested solution in a) case could you elaborate a little 
bit more. When you create that kind of generalized type how would you join it 
with main stream? Which key would you use.
I was thinking of creating wrapper class that inside will have all the data 
from code books. For example
Class CodebookData{
 private Currency currency;
 private OrganizationUnit organizationUnit
 ...
}
But then I have problem which key to use to join with main stream because 
currency has its own key currencyId and organization unit has also its key 
organizationId and so on.

Regarding your 2. suggested solution with Flink SQL what do you mean by
“ For such join, there should be some internal optimization and might get rid 
of some memory consumption issues”.

Thx in advance

BB


On Mon, 5 Apr 2021 at 07:29, Fuyao Li 
mailto:fuyao...@oracle.com>> wrote:
Hello BB,

Just want to share you some of my immature ideas. Maybe some experts can give 
you better solutions and advice.

  1.  DataStream based solution:

 *   To do a union, as you already know, you must have the datastream to be 
of the same format. Otherwise, you can’t do it. There is a work around way to 
solve you problem. You can ingest the datastream with deserializationSchema and 
map different code book streams to the same Java type, there is a field of 
foreign key value (codebook_fk1, cookbook_fk2 values will all stored here), 
another field just contains the name of the foreign value (e.g. cookbook_fk1.) 
All other fields should also be generalized into such Java Type. After that, 
you can do a union for these different code book  streams and join with 
mainstream.
 *   For cascade connect streams, I guess it is not a suggested approach, 
in additional to memory, I think it will also make the watermark hard to 
coordinate.

  1.  Flink SQL approach:

You can try to use Flink temporal table join to do the join work here. [1][2]. 
For such approach, you are cascade the join to enrich the mainstream. This 
seems to be fitting into your use case since your enrich stream doesn’t change 
so often and contains something like currency. For such join, there should be 
some internal optimization and might get rid of some memory consumption issues, 
I guess? Maybe I am wrong. But it worth to take a look.




Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html__;!!GqivPVa7Brio!K_hRpSQQU6PTuqsuTgr5EWEukirSN1zRc53RlMQYK-tCJjuzvXikshp8M__T3j8$>
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#event-time-temporal-join<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html*event-time-temporal-join__;Iw!!GqivPVa7Brio!K_hRpSQQU6PTuqsuTgr5EWEukirSN1zRc53RlMQYK-tCJjuzvXikshp8fpTJ5MA$>

Best,
Fuyao



From: B.B. mailto:bijela.vr...@gmail.com>>
Date: Friday, April 2, 2021 at 01:41
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: [External] : Union of more then two streams
Hi,

I have an architecture question regarding the union of more than two streams in 
Apache Flink.

We are having three and sometime more streams that are some kind of code book 
with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that 
doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and 
store the enrichment data as managed,

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-04 Thread Fuyao Li
Hello Yang,

I am just following up the previous email to see if you got some time to reply.
I also took a deeper look into lyft k8s operator recently. It seems it doesn’t 
support HA natively. It still needs the help of ZooKeeper. In terms of this, 
native k8s is better. Any other ideas? Thanks for your help.

Best,
Fuyao

From: Fuyao Li 
Date: Thursday, April 1, 2021 at 12:22
To: Yang Wang 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Yang,

Thanks for sharing the insights.

For problem 1:
I think I can’t do telnet in the container. I tried to use curl 
144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves 
such public IP is reachable inside the cluster. Just as you mentioned, there 
might still be some network issues with the cluster. I will do some further 
check.

For problem 2:
I created a new K8S cluster with bastion server with some public IP assigned to 
it. Finally, I can see something valid from my browser. (There still exist some 
problems with connecting to some databases, but I think these network problems 
are not directly related to Flink, I can investigate into it later.)

For problem 3:
Thanks for sharing the repo you created. I am not sure how much work it could 
take to develop a deployer. I understand is depends on the proficiency, could 
you give a rough estimation? If it is too complicated and some other options 
are not significantly inferior to native Kubernetes. I might prefer to choose 
other options. I am currently comparing different options to deploy in 
Kubernetes.

  1.  Standalone K8S
  2.  Native Kubernetes
  3.  Flink operator (Google Cloud Platform/ Lyft) [1][2]

I also watched the demo video you presented. [3] I noticed you mentioned that 
native K8S is not going to replace the other two options. I still doesn’t fully 
get your idea with limited explanation in the demo. Could you compare the 
tradeoff a little bit? Thanks!
[1] 
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator<https://urldefense.com/v3/__https:/github.com/GoogleCloudPlatform/flink-on-k8s-operator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sY_v6kdJg$>
[2]  
https://github.com/lyft/flinkk8soperator<https://urldefense.com/v3/__https:/github.com/lyft/flinkk8soperator__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sYbe30Oc8$>
[3] 
https://youtu.be/pdFPr_VOWTU<https://urldefense.com/v3/__https:/youtu.be/pdFPr_VOWTU__;!!GqivPVa7Brio!PSI2zQyxn7aqBiBu0QwpyZPQfjtARF7Q_rImBKFsEo_qREmnZ7nae9sY-or9EkA$>

Best,
Fuyao


From: Yang Wang 
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Thanks for sharing the progress.

1. The flink client is able to list/cancel jobs, based on logs shared above, I 
should be able to ping 144.25.13.78, why I still can NOT ping such address?

I think this is a environment problem. Actually, not every IP address could be 
tested with "ping" command. I suggest you to use "telnet 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>"
 to check the network connectivity.

2. Why is 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>
 not accessible from outside, I mean on my laptop’s browser. I am within the 
company’s VPN and such public load balancer should expose the flink Web UI, 
right? I tried to debug the network configuration, but failed to find a reason, 
could you give me some hints?

Just like my above answer, I think you need to check the network connectivity 
via "telnet 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>".
 Maybe because the firewall is not allowed connecting from your local(e.g. your 
local ip is not in the white list of LoadBalancer IP).

In production, what is the suggested approach to list and cancel jobs? The 
current manual work of “kubectl exec” into pods is not very reliable.. How to 
automate this process and integrate this CI/CD? Please share some blogs there 
is any, thanks.

I think in production environment, you should have your own deployer, which 
will take care of submitting the jobs, list/cancel the jobs. Even the deployer 
could help with triggering savepoint and manage the whole lifecycle of Flink 
applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could 
be a start point of your own deployer if you want to develop it in JAVA.

[1]. 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-

Re: [External] : Union of more then two streams

2021-04-04 Thread Fuyao Li
Hello BB,

Just want to share you some of my immature ideas. Maybe some experts can give 
you better solutions and advice.

  1.  DataStream based solution:
 *   To do a union, as you already know, you must have the datastream to be 
of the same format. Otherwise, you can’t do it. There is a work around way to 
solve you problem. You can ingest the datastream with deserializationSchema and 
map different code book streams to the same Java type, there is a field of 
foreign key value (codebook_fk1, cookbook_fk2 values will all stored here), 
another field just contains the name of the foreign value (e.g. cookbook_fk1.) 
All other fields should also be generalized into such Java Type. After that, 
you can do a union for these different code book  streams and join with 
mainstream.
 *   For cascade connect streams, I guess it is not a suggested approach, 
in additional to memory, I think it will also make the watermark hard to 
coordinate.
  2.  Flink SQL approach:

You can try to use Flink temporal table join to do the join work here. [1][2]. 
For such approach, you are cascade the join to enrich the mainstream. This 
seems to be fitting into your use case since your enrich stream doesn’t change 
so often and contains something like currency. For such join, there should be 
some internal optimization and might get rid of some memory consumption issues, 
I guess? Maybe I am wrong. But it worth to take a look.




Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#event-time-temporal-join

Best,
Fuyao



From: B.B. 
Date: Friday, April 2, 2021 at 01:41
To: user@flink.apache.org 
Subject: [External] : Union of more then two streams
Hi,

I have an architecture question regarding the union of more than two streams in 
Apache Flink.

We are having three and sometime more streams that are some kind of code book 
with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that 
doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and 
store the enrichment data as managed, keyed state (so when compact events from 
kafka expire I have the codebooks saved in state).

The problem is that enriched data foreign keys of every code book is different. 
Eg. codebook_1 has foreign key id codebook_fk1, codebook_2 has foreign key 
codebook_fk2,…. that connects with main stream.
This means I cannot use the keyBy with coProcessFunction.

Is this doable with union or I should cascade a series of connect streams with 
main stream, eg. mainstream.conect(codebook_1) -> 
mainstreamWihtCodebook1.connect(codebook_2) - > 
mainstreamWithCodebook1AndCodebook2.connect(codebook_3) - > ….?
I read somewhere that this later approach is not memory friendly.

Thx.

BB.


Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-01 Thread Fuyao Li
Hi Yang,

Thanks for sharing the insights.

For problem 1:
I think I can’t do telnet in the container. I tried to use curl 
144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves 
such public IP is reachable inside the cluster. Just as you mentioned, there 
might still be some network issues with the cluster. I will do some further 
check.

For problem 2:
I created a new K8S cluster with bastion server with some public IP assigned to 
it. Finally, I can see something valid from my browser. (There still exist some 
problems with connecting to some databases, but I think these network problems 
are not directly related to Flink, I can investigate into it later.)

For problem 3:
Thanks for sharing the repo you created. I am not sure how much work it could 
take to develop a deployer. I understand is depends on the proficiency, could 
you give a rough estimation? If it is too complicated and some other options 
are not significantly inferior to native Kubernetes. I might prefer to choose 
other options. I am currently comparing different options to deploy in 
Kubernetes.

  1.  Standalone K8S
  2.  Native Kubernetes
  3.  Flink operator (Google Cloud Platform/ Lyft) [1][2]

I also watched the demo video you presented. [3] I noticed you mentioned that 
native K8S is not going to replace the other two options. I still doesn’t fully 
get your idea with limited explanation in the demo. Could you compare the 
tradeoff a little bit? Thanks!
[1] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
[2]  https://github.com/lyft/flinkk8soperator
[3] https://youtu.be/pdFPr_VOWTU

Best,
Fuyao


From: Yang Wang 
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Thanks for sharing the progress.

1. The flink client is able to list/cancel jobs, based on logs shared above, I 
should be able to ping 144.25.13.78, why I still can NOT ping such address?

I think this is a environment problem. Actually, not every IP address could be 
tested with "ping" command. I suggest you to use "telnet 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>"
 to check the network connectivity.

2. Why is 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>
 not accessible from outside, I mean on my laptop’s browser. I am within the 
company’s VPN and such public load balancer should expose the flink Web UI, 
right? I tried to debug the network configuration, but failed to find a reason, 
could you give me some hints?

Just like my above answer, I think you need to check the network connectivity 
via "telnet 
144.25.13.78:8081<https://urldefense.com/v3/__http:/144.25.13.78:8081__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJJ4sqJn4$>".
 Maybe because the firewall is not allowed connecting from your local(e.g. your 
local ip is not in the white list of LoadBalancer IP).

In production, what is the suggested approach to list and cancel jobs? The 
current manual work of “kubectl exec” into pods is not very reliable.. How to 
automate this process and integrate this CI/CD? Please share some blogs there 
is any, thanks.

I think in production environment, you should have your own deployer, which 
will take care of submitting the jobs, list/cancel the jobs. Even the deployer 
could help with triggering savepoint and manage the whole lifecycle of Flink 
applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could 
be a start point of your own deployer if you want to develop it in JAVA.

[1]. 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJKkKXY-w$>


Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年3月31日周三 
上午6:37写道:
Hello Yang,

Thank you so much for providing me the flink-client.yaml. I was able to make 
some progress. I didn’t realize I should create an new pod flink-client to 
list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe 
that is the reason why it doesn’t work. However, I still have several questions.

I created the deployment based on your flink-client.yaml
For the LoadBalancer mode:

After apply the cluster role binding yaml below.

# 
https://kubernetes.io/docs/reference/access-authn-authz/rbac/<https://urldefense.com/v3/__https:/kubernetes.io/docs/reference/access-authn-authz/rbac/__;!!GqivPVa7Brio!Mt0Yl_83qJjPOz_3d-057sy-jUkLGLgH3ooSU4r3sg4-wezeZCrvwJHJOLipbis$>
# 
https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-servi

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-03-30 Thread Fuyao Li
Hello Yang,

Thank you so much for providing me the flink-client.yaml. I was able to make 
some progress. I didn’t realize I should create an new pod flink-client to 
list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe 
that is the reason why it doesn’t work. However, I still have several questions.

I created the deployment based on your flink-client.yaml
For the LoadBalancer mode:

After apply the cluster role binding yaml below.

# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# 
https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  namespace: default
  name: service-reader
rules:
- apiGroups: [""] # "" indicates the core API group
  resources: ["services"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

And execute the command:
kubectl create clusterrolebinding service-reader-pod  
--clusterrole=service-reader  --serviceaccount=default:default

I am able to exec in the flink-client pod and list/cancel jobs.

$ kubectl exec -it flink-client-776886cf4f-9h47f bash
kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future 
version. Use kubectl exec [POD] -- [COMMAND] instead.
root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target 
kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
2021-03-30 21:53:14,513 INFO  
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
flink cluster my-first-application-cluster successfully, JobManager Web 
Interface: http://144.25.13.78:8081
Waiting for response...
-- Running/Restarting Jobs ---
24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java 
API Skeleton (RUNNING)
--
No scheduled jobs.
root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78
PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.

^C
--- 144.25.13.78 ping statistics ---
31 packets transmitted, 0 received, 100% packet loss, time 772ms

Question:

  1.  The flink client is able to list/cancel jobs, based on logs shared above, 
I should be able to ping 144.25.13.78, why I still can NOT ping such address?
  2.  Why is 144.25.13.78:8081 not accessible from outside, I mean on my 
laptop’s browser. I am within the company’s VPN and such public load balancer 
should expose the flink Web UI, right? I tried to debug the network 
configuration, but failed to find a reason, could you give me some hints?
  3.  In production, what is the suggested approach to list and cancel jobs? 
The current manual work of “kubectl exec” into pods is not very reliable.. How 
to automate this process and integrate this CI/CD? Please share some blogs 
there is any, thanks.


Best,
Fuyao

From: Yang Wang 
Date: Monday, March 29, 2021 at 20:40
To: Fuyao Li 
Cc: user 
Subject: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Thanks for trying the native Kubernetes integration.

Just like you know, the Flink rest service could be exposed in following three 
types, configured via "kubernetes.rest-service.exposed.type".

* ClusterIP, which means you could only access the Flink rest endpoint inside 
the K8s cluster. Simply, users could start a Flink client in the
K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in 
the pod to create a Flink session/application cluster. Also the
"flink list/cancel" could work well.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
matchLabels:
  app: flink-client
  template:
metadata:
  labels:
app: flink-client
spec:
  containers:
  - name: client
image: flink:1.12.2
imagePullPolicy: Always
args: ["sleep", "86400"]

* NodePort
Currently, we have a limitation that only the Kubernetes master nodes could be 
used to build the Flink exposed rest endpoint. So if your
APIServer node does not have the kube proxy, then the printed URL in the Flink 
client logs could not be used. We already have a ticket[1] to
support one of the slave nodes for accessing the rest endpoint. But I have not 
managed myself to get it done.

* LoadBalancer
Is the resolved rest endpoint 
"http://144.25.13.78:8081/<https://urldefense.com/v3/__http:/144.25.13.78:8081/__;!!GqivPVa7Brio!MEg0isX5VoPxvAeBA5KGLMydlfMhTvjVoI-5fjvprud4hyKk4cnhRZaL6Tas5bs$>"
 accessible on your Flink client side? If it is yes, then I think the Flink 
client
should be able to contact to JobManager rest server to list/cancel the jobs. I 
have verified in Alibaba container service, and it works well.


[1]. 
https://issu

Need help with executing Flink CLI for native Kubernetes deployment

2021-03-26 Thread Fuyao Li
Hi Community, Yang,

I am new to Flink on native Kubernetes and I am trying to do a POC for native 
Kubernetes application mode on Oracle Cloud Infrastructure. I was following the 
documentation here step by step: [1]

I am using Flink 1.12.1, Scala 2.11, java 11.
I was able to create a native Kubernetes Deployment, but I am not able to use 
any further commands like list / cancel etc.. I always run into timeout error. 
I think the issue could be the JobManager Web Interface IP address printed 
after job deployment is not accessible. This issue is causing me not able to 
shut down the deployment with a savepoint. It could be Kubernetes configuration 
issue. I have exposed all related ports traffic and validated the security 
list, but still couldn’t make it work. Any help is appreciated.


The relevant Flink source code is CliFrontend.java class [2]
The ./bin/flink list and cancel command is trying to send traffic to the Flink 
dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and 
NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both 
of them doesn’t work.

# List running job on the cluster (I can’t execute this command successfully 
due to timeout, logs shared below)
$ ./bin/flink list --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job (I can’t execute this command succcessfully)
$ ./bin/flink cancel --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster 

I think those commands needs to communicate with the endpoint that shows after 
the the job submission command.


  1.  Use case 1(deploy with NodePort)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \

-Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1
 \
-Dkubernetes.container.image.pull-policy=IfNotPresent \
-Dkubernetes.container.image.pull-secrets=ocirsecret \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.service-account=flink-service-account \
local:///opt/flink/usrlib/quickstart-0.1.jar


When the expose type is NodePort, the printed messages says the the Flink  
JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is 
my Kubernetes apiserver address. 30996 is the port that exposes the service. 
However, Flink dashboard in this address is not resolvable.
I can only get access to dashboard UI on each node IP address(There are three 
nodes in my K8S cluster)
100.104.154.73:30996
100.104.154.74:30996
100.104.154.75:30996
  I got the following errors when trying to do list command for such a 
native Kubernetes deployment. See in [4]. According to the documentation here 
[3], this shouldn’t happen since Kubernetes api server address should also have 
the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI 
available in Kubernetes apiserver address?



  1.  Use case 2 (deploy with LoadBalancer)
# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \

-Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1
 \
-Dkubernetes.container.image.pull-policy=IfNotPresent \
-Dkubernetes.container.image.pull-secrets=ocirsecret \
-Dkubernetes.rest-service.exposed.type=LoadBalancer \
-Dkubernetes.service-account=flink-service-account \
local:///opt/flink/usrlib/quickstart-0.1.jar


After a while, when the external IP is resolved. It said Flink JobManager web 
interface is at the external-IP (LOAD BALANCER address) at: 
http://144.25.13.78:8081
When I execute the list command, I still got error after waiting for long time 
to let it get timeout. See errors here. [5]

I can still get access to NodeIP:. In such case, I tend to 
believe it is a network issue. But still quite confused since I am already open 
all the traffics..




Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[2] 
https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui
[4] https://pastebin.ubuntu.com/p/WcJMwds52r/
[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/


Thanks for your help in advance.

Best regards,
Fuyao




Re: [External] : How to visualize the results of Flink processing or aggregation?

2021-03-26 Thread Fuyao Li
Hello Xiong,

You can expose monitors through Metric system of Flink.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
Metrics can be exposed by metric reporter:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/metric_reporters.html
That includes Prometheus.

For the DataStream API, you can build customized metrics.
For the Flink SQL/Table API, you can only use the listed predefined metrics in 
Flink. There could be work arounds ways, but no direct way to supply Flink SQL 
customized metrics.

Best regards,
Fuyao

From: Xiong Qiang 
Date: Friday, March 26, 2021 at 09:15
To: user@flink.apache.org 
Subject: [External] : How to visualize the results of Flink processing or 
aggregation?
Hi All,

I am new to Flink, so forgive me if it is a naive question.

The context is:
We have a data streaming coming in, and we will use Flink applications to do 
the processing or aggregations. After the processing or aggregation, we need 
some approaches to visualize the results, to either build a dashboard or setup 
alerts, for example, using Prometheus and Grafana.
However, after reading the documents 
(https://flink.apache.org/flink-architecture.html
 and more links) and examples 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html)
 
(https://github.com/ververica/flink-training/blob/master/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java)
 , I am still not able to close the gap between Flink and a 
monitoring/dashboard tool, e.g. Prometheus/Grafana.

The question is:
How are processing results connected/sinked from Flink to Prometheus/Grafana? 
for example, in the fraud detection example, how is the account id = 3, send to 
Prometheus and Grafana, so that I have a dashboard showing there is one 
suspected account? In the taxi long rides example, how do I send the count of 
long rides from Flink to Prometheus/Grafana?

I understand there are sinks 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/).
 However, I didn't see sinks for Prometheus.

Hope I made my question clear.

Thanks


Re: [External] : Re: Need help with JDBC Broken Pipeline Issue after some idle time

2021-03-09 Thread Fuyao Li
Hi Flink Community,

After configuring the JDBC timeout time, I still could not get rid of the issue.
https://issues.apache.org/jira/browse/FLINK-21674
I created a JIRA task to describe the problem. Any suggestion is appreciated.

Best regards,
Fuyao

From: Fuyao Li 
Date: Wednesday, March 3, 2021 at 15:14
To: XU Qinghui 
Cc: user , Timo Walther 
Subject: Re: [External] : Re: Need help with JDBC Broken Pipeline Issue after 
some idle time
Hi Qinghui,

I agree. I am trying to found internal and resources on the internet to fix the 
issue.  Idle Time 
Limits<https://docs.oracle.com/en/cloud/paas/autonomous-database/adbsa/manage-priorities.html#GUID-241F4C85-24E5-4F8A-B9EE-E3FCEF566D36>
 might be a reason. But after configuring those parameters and updated the 
sqlnet.ora to
WALLET_LOCATION = (SOURCE = (METHOD = file) (METHOD_DATA = (DIRECTORY="… ")))
SSL_SERVER_DN_MATCH=yes
NAMES.DIRECTORY_PATH=(ezconnect,tnsnames)
SQLNET.USE_HTTPS_PROXY=on
DISABLE_OOB=on
SQLNET.RECV_TIMEOUT = 7200
BEQUEATH_DETACH = YES
SQLNET.EXPIRE_TIME = 1
SQLNET.SEND_TIMEOUT = 7200
SQLNET.INBOUND_CONNECT_TIMEOUT = 7200

SQLNET.EXPIRE_TIME is kind of like heartbeat thing to keep the connection alive.

It still doesn’t work after all of these configurations. Pretty weird…

I will post a follow up if I could find the answer… Thanks.

BR,
Fuyao


From: XU Qinghui 
Date: Tuesday, March 2, 2021 at 13:40
To: Fuyao Li 
Cc: user , Timo Walther 
Subject: [External] : Re: Need help with JDBC Broken Pipeline Issue after some 
idle time
It sounds like the jdbc driver's connection is closed somehow, and probably has 
nothing to do with flink itself.
Maybe you could check if there's some settings on the db that could close the 
connection after some inactivity, or otherwise it could be your network drops 
the inactive tcp connection after some time (you can try to use tcp keepalive 
in this case).

BR,


Le mar. 2 mars 2021 à 19:38, Fuyao Li 
mailto:fuyao...@oracle.com>> a écrit :
Sorry for the uncompleted email.

Error log of broken pipeline, the failed SQL will be executed after checkpoint 
automatic recovery. Please share some ideas on this issue. Really appreciate 
it. Thanks!

09:20:02,868 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 3
java.sql.SQLRecoverableException: Closed Connection
at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
at 
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
at 
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
09:20:02,869 WARN  
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing 
records to JDBC failed.
java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchin

Re: [External] : Re: Need help with JDBC Broken Pipeline Issue after some idle time

2021-03-03 Thread Fuyao Li
Hi Qinghui,

I agree. I am trying to found internal and resources on the internet to fix the 
issue.  Idle Time 
Limits<https://docs.oracle.com/en/cloud/paas/autonomous-database/adbsa/manage-priorities.html#GUID-241F4C85-24E5-4F8A-B9EE-E3FCEF566D36>
 might be a reason. But after configuring those parameters and updated the 
sqlnet.ora to
WALLET_LOCATION = (SOURCE = (METHOD = file) (METHOD_DATA = (DIRECTORY="… ")))
SSL_SERVER_DN_MATCH=yes
NAMES.DIRECTORY_PATH=(ezconnect,tnsnames)
SQLNET.USE_HTTPS_PROXY=on
DISABLE_OOB=on
SQLNET.RECV_TIMEOUT = 7200
BEQUEATH_DETACH = YES
SQLNET.EXPIRE_TIME = 1
SQLNET.SEND_TIMEOUT = 7200
SQLNET.INBOUND_CONNECT_TIMEOUT = 7200

SQLNET.EXPIRE_TIME is kind of like heartbeat thing to keep the connection alive.

It still doesn’t work after all of these configurations. Pretty weird…

I will post a follow up if I could find the answer… Thanks.

BR,
Fuyao


From: XU Qinghui 
Date: Tuesday, March 2, 2021 at 13:40
To: Fuyao Li 
Cc: user , Timo Walther 
Subject: [External] : Re: Need help with JDBC Broken Pipeline Issue after some 
idle time
It sounds like the jdbc driver's connection is closed somehow, and probably has 
nothing to do with flink itself.
Maybe you could check if there's some settings on the db that could close the 
connection after some inactivity, or otherwise it could be your network drops 
the inactive tcp connection after some time (you can try to use tcp keepalive 
in this case).

BR,


Le mar. 2 mars 2021 à 19:38, Fuyao Li 
mailto:fuyao...@oracle.com>> a écrit :
Sorry for the uncompleted email.

Error log of broken pipeline, the failed SQL will be executed after checkpoint 
automatic recovery. Please share some ideas on this issue. Really appreciate 
it. Thanks!

09:20:02,868 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 3
java.sql.SQLRecoverableException: Closed Connection
at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
at 
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
at 
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
09:20:02,869 WARN  
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing 
records to JDBC failed.
java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperato

Re: Need help with JDBC Broken Pipeline Issue after some idle time

2021-03-02 Thread Fuyao Li
nceWatermark(InternalTimerServiceImpl.java:302)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Reestablish JDBC connection failed
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
... 29 more
Caused by: java.sql.SQLRecoverableException: Closed Connection
at 
oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
at 
oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
at 
oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
at 
oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
at 
oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
... 30 more


Thanks,

Best regards,
Fuyao

From: Fuyao Li 
Date: Tuesday, March 2, 2021 at 10:33
To: user , Timo Walther 
Subject: Need help with JDBC Broken Pipeline Issue after some idle time
Hi Flink Community,

I need some help with JDBC sink in Datastream API. I can produce some records 
and sink it to database correctly. However, if I wait for 5 minutes between 
insertions. I will run into broken pipeline issue. Ater that, the Flink 
application will restart and recover from checkpoint and execute the failed SQL 
query. I tried hard to search for resources to understand such broken pipeline 
will happen, but I still can’t understand it.

The interesting thing is that, if the idle time is around 3 minutes, everything 
seems to be fine.

It seems to be a timeout related issue, but I just don’t know what should I do 
to fix the issue. I have shared the sink code. Could anyone share some ideas? 
Thank you so much!
My environment settings:
Flink version: 1.12.1
Scala version: 2.11
Java version: 1.11
Flink System parallelism: 1
JDBC Driver: Oracle ojdbc10
Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can 
regard this as an cloud based Oracle Database)

The code for the sink:
boDataStream
.addSink(
JdbcSink.sink(
"INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",
(preparedStatement, testInvoiceBo) -> {
  try {
  Gson gson = new GsonBuilder()
  .excludeFieldsWithoutExposeAnnotation()
  .create();
  String invoiceId = testInvoiceBo.getINVOICE_ID();
  String json = gson.toJson(testInvoiceBo);
  log.info("insertion information: {}", json);
  preparedStatement.setString(1, invoiceId);
  preparedStatement.setString(2, json);
 

Need help with JDBC Broken Pipeline Issue after some idle time

2021-03-02 Thread Fuyao Li
Hi Flink Community,

I need some help with JDBC sink in Datastream API. I can produce some records 
and sink it to database correctly. However, if I wait for 5 minutes between 
insertions. I will run into broken pipeline issue. Ater that, the Flink 
application will restart and recover from checkpoint and execute the failed SQL 
query. I tried hard to search for resources to understand such broken pipeline 
will happen, but I still can’t understand it.

The interesting thing is that, if the idle time is around 3 minutes, everything 
seems to be fine.

It seems to be a timeout related issue, but I just don’t know what should I do 
to fix the issue. I have shared the sink code. Could anyone share some ideas? 
Thank you so much!
My environment settings:
Flink version: 1.12.1
Scala version: 2.11
Java version: 1.11
Flink System parallelism: 1
JDBC Driver: Oracle ojdbc10
Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can 
regard this as an cloud based Oracle Database)

The code for the sink:
boDataStream
.addSink(
JdbcSink.sink(
"INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",
(preparedStatement, testInvoiceBo) -> {
  try {
  Gson gson = new GsonBuilder()
  .excludeFieldsWithoutExposeAnnotation()
  .create();
  String invoiceId = testInvoiceBo.getINVOICE_ID();
  String json = gson.toJson(testInvoiceBo);
  log.info("insertion information: {}", json);
  preparedStatement.setString(1, invoiceId);
  preparedStatement.setString(2, json);
  } catch (JsonIOException e) {
  log.error("Failed to parse JSON", e);
  }
},
new JdbcExecutionOptions.Builder()
.withBatchIntervalMs(0)
.withBatchSize(1)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(DB_URL)
.withDriverName("oracle.jdbc.driver.OracleDriver")
.withUsername("admin")
.withPassword("password")
.build()))
.name("adwSink")
.uid("adwSink")
.setParallelism(1);

The JDBC broken pipeline log:




Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-25 Thread Fuyao Li
t;(2). creates an event timer to process that record once the watermark
> > hits that point.
> >
> > I kind of understand the idea here. Buffer all the data(maybe delete
> > some of the old track if processed) in a track ordered by timestamp and
> > trigger the event timer sequentially with this buffered track.
> >
> > Based on my understanding, this buffered design is only suitable for
> > *offline* data processing, right? (It is a waste of resource to buffer
> > this in real time. )
> >
> > Also, from the article, I think they are using periodic watermark
> > strategy[1]. how can they process the last piece of data records with
> > periodic watermark strategy since there is no more incoming data to
> > advance the watermark? So the last piece of data will never be processed
> > here? Is there a way to gracefully handle this? My use case doesn't
> > allow me to lose any information.
> >
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
> >
> > Best,
> >
> > Fuyao
> >
> >
> > On 11/20/20 08:55, Timo Walther wrote:
> >> Hi Fuyao,
> >>
> >> sorry for not replying earlier.
> >>
> >> You posted a lot of questions. I scanned the thread quickly, let me
> >> try to answer some of them and feel free to ask further questions
> >> afterwards.
> >>
> >> "is it possible to configure the parallelism for Table operation at
> >> operator level"
> >>
> >> No this is not possible at the moment. The reason is 1) we don't know
> >> how to expose such a functionality in a nice way. Maybe we will use
> >> SQL hints in the future [1]. 2) Sometime the planner sets the
> >> paralellism of operators explicitly to 1. All other operators will use
> >> the globally defined parallelism for the pipeline (also to not mess up
> >> retraction messages internally). You will be able to set the
> >> parallelism of the sink operation in Flink 1.12.
> >>
> >> "BoundedOutOfOrderness Watermark Generator is NOT making the event
> >> time to advance"
> >>
> >> Have you checked if you can use an interval join instead of a full
> >> join with state retention? Table/SQL pipelines that don't preserve a
> >> time attribute in the end might also erase the underlying watermarks.
> >> Thus, event time triggers will not work after your join.
> >>
> >> "Why can't I update the watermarks for all 8 parallelisms?"
> >>
> >> You could play around with idleness for your source [2]. Or you set
> >> the source parallelism to 1 (while keeping the rest of the pipeline
> >> globally set to 8), would that be an option?
> >>
> >> "Some type cast behavior of retracted streams I can't explain."
> >>
> >> toAppendStream/toRetractStream still need an update to the new type
> >> system. This is explained in FLIP-136 which will be part of Flink 1.13
> >> [3].
> >>
> >> I hope I could help a bit.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> [1]
> >>
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-113*3A*Supports*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$
> >> [2]
> >>
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$
> >> [3]
> >>
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$
> >>
> >> On 13.11.20 21:39, Fuyao Li wrote:
> >>> Hi Matthias,
> >>>
> >>> Just to provide more context on this problem. I only have 1 partition
> >>> per each Kafka Topic at the beginning before the join operation.
> >>> After reading the doc:
> >>>
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$
> >>> <
> https://urldefense.com/v3/__https:/

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li
ctStream still need an update to the new type 
system. This is explained in FLIP-136 which will be part of Flink 
1.13 [3].


I hope I could help a bit.

Regards,
Timo


[1] 
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-113*3A*Supports*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$ 
[2] 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$ 
[3] 
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$ 


On 13.11.20 21:39, Fuyao Li wrote:

Hi Matthias,

Just to provide more context on this problem. I only have 1 
partition per each Kafka Topic at the beginning before the join 
operation. After reading the doc: 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ 
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ 
>


Maybe that is the root cause of my problem here, with less than 8 
partitions (only 1 partition in my case), using the default 
parallelism of 8 will cause this wrong behavior. This is my guess, 
it takes a while to test it out... What's your opinion on this? Thanks!


Best,

Fuyao


On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li <mailto:fuyaoli2...@gmail.com>> wrote:


    Hi Matthias,

    One more question regarding Flink table parallelism, is it possible
    to configure the parallelism for Table operation at operator level,
    it seems we don't have such API available, right? Thanks!

    Best,
    Fuyao

    On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li mailto:fuyaoli2...@gmail.com>> wrote:

    Hi Matthias,

    Thanks for your information. I have managed to figure out the
    first issue you mentioned. Regarding the second issue. I have
    got some progress on it.

    I have sent another email with the title 'BoundedOutOfOrderness
    Watermark Generator is NOT making the event time to advance'
    using another email of mine, fuyao...@oracle.com
<mailto:fuyao...@oracle.com>. That email contains some more
    context on my issue. Please take a look. I have made some
    progress after sending that new email.

    Previously, I had managed to make timelag watermark strategy
    working in my code, but my bound out of orderness strategy or
    punctuated watermark strategy doesn't work well. It produces 8
    watermarks each time. Two cycles are shown below.

    I managed to figure out the root cause is that Flink stream
    execution environment has a default parallelism as 8.*I didn't
    notice in the doc, could the Community add this explicitly into
    the official doc to avoid some confusion? Thanks.*

 From my understanding, the watermark advances based on the
    lowest watermark among the 8, so I can not advance the bound 
out

    of orderness watermark since I am only advancing 1 of the 8
    parallelisms. If I set the entire stream execution environment
    to be of parallelism 1, it will reflect the watermark in the
    context correctly. One more thing is that this behavior is not
    reflected in the Flink Cluster web UI interface. I can see the
    watermark is advancing, but it is not in reality. *That's
    causing the inconsistency problem I mentioned in the other 
email

    I mentioned above. Will this be considered as a bug in the UI?*

    My current question is, since I have full outer join operation
    before the KeyedProcessFunction here. How can I let the 
bound of

    orderness watermark / punctuated watermark strategy work if the
    parallelism > 1? It can only update one of the 8 parallelisms
    for the watermark for this onTimer operator. Is this related to
    my Table full outer join operation before this step? According
    to the doc,
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ 
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/tab

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li
ts*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$ 
[2] 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$ 
[3] 
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$ 


On 13.11.20 21:39, Fuyao Li wrote:

Hi Matthias,

Just to provide more context on this problem. I only have 1 partition 
per each Kafka Topic at the beginning before the join operation. 
After reading the doc: 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ 
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ 
>


Maybe that is the root cause of my problem here, with less than 8 
partitions (only 1 partition in my case), using the default 
parallelism of 8 will cause this wrong behavior. This is my guess, it 
takes a while to test it out... What's your opinion on this? Thanks!


Best,

Fuyao


On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li <mailto:fuyaoli2...@gmail.com>> wrote:


    Hi Matthias,

    One more question regarding Flink table parallelism, is it possible
    to configure the parallelism for Table operation at operator level,
    it seems we don't have such API available, right? Thanks!

    Best,
    Fuyao

    On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li mailto:fuyaoli2...@gmail.com>> wrote:

    Hi Matthias,

    Thanks for your information. I have managed to figure out the
    first issue you mentioned. Regarding the second issue. I have
    got some progress on it.

    I have sent another email with the title 'BoundedOutOfOrderness
    Watermark Generator is NOT making the event time to advance'
    using another email of mine, fuyao...@oracle.com
    <mailto:fuyao...@oracle.com>. That email contains some more
    context on my issue. Please take a look. I have made some
    progress after sending that new email.

    Previously, I had managed to make timelag watermark strategy
    working in my code, but my bound out of orderness strategy or
    punctuated watermark strategy doesn't work well. It produces 8
    watermarks each time. Two cycles are shown below.

    I managed to figure out the root cause is that Flink stream
    execution environment has a default parallelism as 8.*I didn't
    notice in the doc, could the Community add this explicitly into
    the official doc to avoid some confusion? Thanks.*

 From my understanding, the watermark advances based on the
    lowest watermark among the 8, so I can not advance the bound out
    of orderness watermark since I am only advancing 1 of the 8
    parallelisms. If I set the entire stream execution environment
    to be of parallelism 1, it will reflect the watermark in the
    context correctly. One more thing is that this behavior is not
    reflected in the Flink Cluster web UI interface. I can see the
    watermark is advancing, but it is not in reality. *That's
    causing the inconsistency problem I mentioned in the other email
    I mentioned above. Will this be considered as a bug in the UI?*

    My current question is, since I have full outer join operation
    before the KeyedProcessFunction here. How can I let the bound of
    orderness watermark / punctuated watermark strategy work if the
    parallelism > 1? It can only update one of the 8 parallelisms
    for the watermark for this onTimer operator. Is this related to
    my Table full outer join operation before this step? According
    to the doc,
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ 
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ 
>


    Default parallelism should be the same like the stream
    environment. Why can't I update the watermarks for all 8
    parallelisms? What should I do to enable 

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Fuyao Li
Hi Matthias,

Just to provide more context on this problem. I only have 1 partition per
each Kafka Topic at the beginning before the join operation. After reading
the doc:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

Maybe that is the root cause of my problem here, with less than 8
partitions (only 1 partition in my case), using the default parallelism of
8 will cause this wrong behavior. This is my guess, it takes a while to
test it out... What's your opinion on this? Thanks!

Best,

Fuyao

On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li  wrote:

> Hi Matthias,
>
> One more question regarding Flink table parallelism, is it possible to
> configure the parallelism for Table operation at operator level, it seems
> we don't have such API available, right? Thanks!
>
> Best,
> Fuyao
>
> On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li  wrote:
>
>> Hi Matthias,
>>
>> Thanks for your information. I have managed to figure out the first issue
>> you mentioned. Regarding the second issue. I have got some progress on it.
>>
>> I have sent another email with the title 'BoundedOutOfOrderness Watermark
>> Generator is NOT making the event time to advance' using another email of
>> mine, fuyao...@oracle.com. That email contains some more context on my
>> issue. Please take a look. I have made some progress after sending that new
>> email.
>>
>> Previously, I had managed to make timelag watermark strategy working in
>> my code, but my bound out of orderness strategy or punctuated watermark
>> strategy doesn't work well. It produces 8 watermarks each time. Two cycles
>> are shown below.
>>
>> I managed to figure out the root cause is that Flink stream execution
>> environment has a default parallelism as 8.* I didn't notice in the doc,
>> could the Community add this explicitly into the official doc to avoid some
>> confusion? Thanks.*
>>
>> From my understanding, the watermark advances based on the lowest
>> watermark among the 8, so I can not advance the bound out of orderness
>> watermark since I am only advancing 1 of the 8 parallelisms. If I set the
>> entire stream execution environment to be of parallelism 1, it will reflect
>> the watermark in the context correctly. One more thing is that this
>> behavior is not reflected in the Flink Cluster web UI interface. I can see
>> the watermark is advancing, but it is not in reality. *That's causing
>> the inconsistency problem I mentioned in the other email I mentioned above.
>> Will this be considered as a bug in the UI?*
>>
>> My current question is, since I have full outer join operation before the
>> KeyedProcessFunction here. How can I let the bound of orderness watermark /
>> punctuated watermark strategy work if the parallelism > 1? It can only
>> update one of the 8 parallelisms for the watermark for this onTimer
>> operator. Is this related to my Table full outer join operation before this
>> step? According to the doc,
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism
>>
>> Default parallelism should be the same like the stream environment. Why
>> can't I update the watermarks for all 8 parallelisms? What should I do to
>> enable this function with Parallelism larger than 1? Thanks.
>>
>> First round: (Note the first column of each log row is the timelag
>> strategy, it is getting updated correctly for all 8 parallelism, but the
>> other two strategies I mentioned above can't do that..)
>>
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266199,
>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>> 1605047187881 (only one of the 8 parallelism for bound out of orderness is
>> getting my new watermark)
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266199,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Fuyao Li
Hi Matthias,

One more question regarding Flink table parallelism, is it possible to
configure the parallelism for Table operation at operator level, it seems
we don't have such API available, right? Thanks!

Best,
Fuyao

On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li  wrote:

> Hi Matthias,
>
> Thanks for your information. I have managed to figure out the first issue
> you mentioned. Regarding the second issue. I have got some progress on it.
>
> I have sent another email with the title 'BoundedOutOfOrderness Watermark
> Generator is NOT making the event time to advance' using another email of
> mine, fuyao...@oracle.com. That email contains some more context on my
> issue. Please take a look. I have made some progress after sending that new
> email.
>
> Previously, I had managed to make timelag watermark strategy working in my
> code, but my bound out of orderness strategy or punctuated watermark
> strategy doesn't work well. It produces 8 watermarks each time. Two cycles
> are shown below.
>
> I managed to figure out the root cause is that Flink stream execution
> environment has a default parallelism as 8.* I didn't notice in the doc,
> could the Community add this explicitly into the official doc to avoid some
> confusion? Thanks.*
>
> From my understanding, the watermark advances based on the lowest
> watermark among the 8, so I can not advance the bound out of orderness
> watermark since I am only advancing 1 of the 8 parallelisms. If I set the
> entire stream execution environment to be of parallelism 1, it will reflect
> the watermark in the context correctly. One more thing is that this
> behavior is not reflected in the Flink Cluster web UI interface. I can see
> the watermark is advancing, but it is not in reality. *That's causing the
> inconsistency problem I mentioned in the other email I mentioned above.
> Will this be considered as a bug in the UI?*
>
> My current question is, since I have full outer join operation before the
> KeyedProcessFunction here. How can I let the bound of orderness watermark /
> punctuated watermark strategy work if the parallelism > 1? It can only
> update one of the 8 parallelisms for the watermark for this onTimer
> operator. Is this related to my Table full outer join operation before this
> step? According to the doc,
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism
>
> Default parallelism should be the same like the stream environment. Why
> can't I update the watermarks for all 8 parallelisms? What should I do to
> enable this function with Parallelism larger than 1? Thanks.
>
> First round: (Note the first column of each log row is the timelag
> strategy, it is getting updated correctly for all 8 parallelism, but the
> other two strategies I mentioned above can't do that..)
>
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266199,
> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
> 1605047187881 (only one of the 8 parallelism for bound out of orderness is
> getting my new watermark)
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266199,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>
> Second round: (I set the autoWatermark interval 

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Fuyao Li
0 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


Best regards,

Fuyao

On Fri, Nov 13, 2020 at 9:03 AM Matthias Pohl 
wrote:

> Hi Fuyao,
> for your first question about the different behavior depending on whether
> you chain the methods or not: Keep in mind that you have to save the return
> value of the assignTimestampsAndWatermarks method call if you don't chain
> the methods together as it is also shown in [1].
> At least the following example from your first message is indicating it:
> ```
> retractStream.assignTimestampsAndWatermarks(new
> BoRetractStreamTimestampAssigner()); (This is a deprecated method)
> // instead of: retractStream =
> retractStream.assignTimestampsAndWatermarks(new
> BoRetractStreamTimestampAssigner());
> retractStream
> .keyBy()
> .process(new TableOutputProcessFunction())
> .name("ProcessTableOutput")
> .uid("ProcessTableOutput")
> .addSink(businessObjectSink)
> .name("businessObjectSink")
> .uid("businessObjectSink")
> .setParallelism(1);
> ```
>
> For your second question about setting the EventTime I'm going to pull in
> Timo from the SDK team as I don't see an issue with your code right away.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
>
> On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li  wrote:
>
>> Hi Flink Users and Community,
>>
>> For the first part of the question, the 12 hour time difference is caused
>> by a time extraction bug myself. I can get the time translated correctly
>> now. The type cast problem does have some workarounds to solve it..
>>
>> My major blocker right now is the onTimer part is not properly triggered.
>> I guess it is caused by failing to configure the correct watermarks &
>> timestamp assigners. Please give me some insights.
>>
>> 1. If I don't chain the assignTimestampsAndWatermarks() method in
>> together with keyedBy().. and process().. method. The context.timestamp()
>> in my processElement() function will be null. Is this some expected
>> behavior? The Flink examples didn't chain it together. (see example here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
>> )
>> 2. If I use registerEventTimeTimer() in processElement(). The onTimer
>> method will not be triggered. However, I can trigger the onTimer method if
>> I simply change it to registerProcessingTimeTimer(). I am using the
>> settings below in the stream env.
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.getConfig().setAutoWatermarkInterval(1000L);
>>
>> My code for method the process chain:
>> retractStream
>>
>> .assignTimestampsAndWatermarks(WatermarkStrategy.> Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
>> .withTimestampAssigner((booleanRowTuple2,
>> timestamp) -> {
>> Row rowData = booleanRowTuple2.f1;
>> LocalDateTime headerTime =
>> (LocalDateTime)rowData.getField(3);
>> LocalDateTime linesTime =
>> (LocalDateTime)rowData.getField(7);
>>
>> LocalDateTime latestDBUpdateTime = null;
>> if (headerTime != null && linesTime != null) {
>> latestDBUpdateTime =
>> headerTime.isAfter(linesTime) ? headerTime : linesTime;
>> }
>> else {
>> latestDBUpdateTime

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-12 Thread fuyao . li
The test workflow attachment is not added in the previous email, sorry 
for the confusion, please refer to the describe text workflow.. Thanks.



On 11/12/20 16:17, fuyao...@oracle.com wrote:


Hi All,

Just to add a little more context to the problem. I have a full outer 
join operation before this stage. The source data stream for full 
outer join is a Kafka Source. I also added timestamp and watermarks to 
the FlinkKafkaConsumer. After that, it makes no difference to the 
result, still can not make the watermark to advance.


overall workflow:

two kafka topics -> two data streams in Flink -> join them together 
and convert to retract stream -> do KeyedProcessFunction and schedule 
event time timer and onTimer logic in it -> push to downstream sink.


I think there is no issues with my Syntax. But I still could NOT make 
the watermark to advance for event time using bound out of orderness 
strategy. (In Flink Cluster, the behavior is different, the watermark 
is advancing, but onTimer is still not triggered correctly. :(


I guess the reason is that I receive 8 records for each round of 
onPeriodicEmit(), only one of the eight is updated for 
BoundedOutOfOrderness Strategy. For timelag strategy (refer to the 
first email in the thread), they are all updated so that it will make 
the watermark to advance. I just don't know why I got 8 records every 
time even if I have parallelism as 1. (logs can be found in the first 
email in the thread.)


I also tried to debug inside Flink web interface based on the link: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html


The logs produced by Flink local cluster is different from directly 
starting my application. *Why the behavior is inconsistent...? *The 
context timestamp sticks to LONG.MIN_VALUE during IDE debug, however, 
it is updated correctly in the Flink Cluster, except for the first 
record to be the the default value. But, I am still not getting the 
scheduled logic triggered correctly inside the onTimer method. My test 
workflow can be seen in the attachment. I have read through previous 
archives about the not updated watermark (stick to LONG.MIN_VALUE), it 
doesn't help much in my case. Thanks.



Best,

Fuyao



On 11/11/20 11:33, fuyao...@oracle.com wrote:


Hi Community,


Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, fuyao...@oracle.com wrote:


Hi Kevin,

Sorry for the name typo...

On 11/10/20 16:48, fuyao...@oracle.com wrote:


Hi Kavin,

Thanks for your example. I think I have already done something very 
very similar before. I didn't post the full WatermarkStrategy 
interface in my previous email, but I do have that part already. I 
think the example you gave me is a punctuatedWatermarkStrategy, not 
boundoutoforderness one. My major concern now is that why my 
emitted watermark is not available in processElement() and why I 
have 8 records for each time the code reaches the onPeriodicEmit 
part. I will post my code following your example below.


The symptom is that I will get the context watermark as 
LONG.MIN_VALUE if I use the watermark strategy below.


16:35:12,969 INFO 
org.myorg.quickstart.processor.TableOutputProcessFunction - context 
current key: 69215, context current watermark: -9223372036854775808



DataStream> retractStream =tEnv.toRetractStream(table, 
Row.class);
retractStream
 .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
 .keyBy(
 value -> {String invoice_id_key = (String) value.f1.getField(0); if 
(invoice_id_key ==null) {
 invoice_id_key = (String) value.f1.getField(4); }
   return invoice_id_key; })
 .process(new TableOutputProcessFunction())
 .name("ProcessTableOutput")
 .uid("ProcessTableOutput")
 .addSink(businessObjectSink)
 .name("businessObjectSink")
 .uid("businessObjectSink")
 .setParallelism(1);

watermark strategy:

public class PunctuatedWatermarkStrategyimplements WatermarkStrategy> {
 @Override public WatermarkGenerator>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
 return new PunctuatedTableOutputWatermarkGenerator(); }

 @Override public TimestampAssigner>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
 log.info("Inside timestamp assigner"); return (booleanRowTuple2, 
previousElementTimestamp) -> {
 return my timestamp; }; }
}

watermark generator code:

public class PunctuatedTableOutputWatermarkGeneratorimplements 
WatermarkGenerator> {
 @Override public void onEvent(Tuple2 booleanRowTuple2, long 
eventTimestamp, WatermarkOutput watermarkOutput) {
 watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); 
log.info("Emit Punctuated watermark: {}", eventTimestamp); }

 @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
 // don't need to do anything because we emit in reaction to events 

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-12 Thread fuyao . li

Hi All,

Just to add a little more context to the problem. I have a full outer 
join operation before this stage. The source data stream for full outer 
join is a Kafka Source. I also added timestamp and watermarks to the 
FlinkKafkaConsumer. After that, it makes no difference to the result, 
still can not make the watermark to advance.


overall workflow:

two kafka topics -> two data streams in Flink -> join them together and 
convert to retract stream -> do KeyedProcessFunction and schedule event 
time timer and onTimer logic in it -> push to downstream sink.


I think there is no issues with my Syntax. But I still could NOT make 
the watermark to advance for event time using bound out of orderness 
strategy. (In Flink Cluster, the behavior is different, the watermark is 
advancing, but onTimer is still not triggered correctly. :(


I guess the reason is that I receive 8 records for each round of 
onPeriodicEmit(), only one of the eight is updated for 
BoundedOutOfOrderness Strategy. For timelag strategy (refer to the first 
email in the thread), they are all updated so that it will make the 
watermark to advance. I just don't know why I got 8 records every time 
even if I have parallelism as 1. (logs can be found in the first email 
in the thread.)


I also tried to debug inside Flink web interface based on the link: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/debugging_event_time.html


The logs produced by Flink local cluster is different from directly 
starting my application. *Why the behavior is inconsistent...? *The 
context timestamp sticks to LONG.MIN_VALUE during IDE debug, however, it 
is updated correctly in the Flink Cluster, except for the first record 
to be the the default value. But, I am still not getting the scheduled 
logic triggered correctly inside the onTimer method. My test workflow 
can be seen in the attachment. I have read through previous archives 
about the not updated watermark (stick to LONG.MIN_VALUE), it doesn't 
help much in my case. Thanks.



Best,

Fuyao



On 11/11/20 11:33, fuyao...@oracle.com wrote:


Hi Community,


Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, fuyao...@oracle.com wrote:


Hi Kevin,

Sorry for the name typo...

On 11/10/20 16:48, fuyao...@oracle.com wrote:


Hi Kavin,

Thanks for your example. I think I have already done something very 
very similar before. I didn't post the full WatermarkStrategy 
interface in my previous email, but I do have that part already. I 
think the example you gave me is a punctuatedWatermarkStrategy, not 
boundoutoforderness one. My major concern now is that why my emitted 
watermark is not available in processElement() and why I have 8 
records for each time the code reaches the onPeriodicEmit part. I 
will post my code following your example below.


The symptom is that I will get the context watermark as 
LONG.MIN_VALUE if I use the watermark strategy below.


16:35:12,969 INFO 
org.myorg.quickstart.processor.TableOutputProcessFunction - context 
current key: 69215, context current watermark: -9223372036854775808



DataStream> retractStream =tEnv.toRetractStream(table, 
Row.class);
retractStream
 .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
 .keyBy(
 value -> {String invoice_id_key = (String) value.f1.getField(0); if 
(invoice_id_key ==null) {
 invoice_id_key = (String) value.f1.getField(4); }
   return invoice_id_key; })
 .process(new TableOutputProcessFunction())
 .name("ProcessTableOutput")
 .uid("ProcessTableOutput")
 .addSink(businessObjectSink)
 .name("businessObjectSink")
 .uid("businessObjectSink")
 .setParallelism(1);

watermark strategy:

public class PunctuatedWatermarkStrategyimplements WatermarkStrategy> {
 @Override public WatermarkGenerator>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
 return new PunctuatedTableOutputWatermarkGenerator(); }

 @Override public TimestampAssigner>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
 log.info("Inside timestamp assigner"); return (booleanRowTuple2, 
previousElementTimestamp) -> {
 return my timestamp; }; }
}

watermark generator code:

public class PunctuatedTableOutputWatermarkGeneratorimplements 
WatermarkGenerator> {
 @Override public void onEvent(Tuple2 booleanRowTuple2, long 
eventTimestamp, WatermarkOutput watermarkOutput) {
 watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); 
log.info("Emit Punctuated watermark: {}", eventTimestamp); }

 @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
 // don't need to do anything because we emit in reaction to events 
above }

}

16:35:13,584 INFO 
org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator 
- Emit Punctuated watermark: 1605054900905


From the log, I can see, it extract the 

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-11 Thread fuyao . li

Hi Community,


Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, fuyao...@oracle.com wrote:


Hi Kevin,

Sorry for the name typo...

On 11/10/20 16:48, fuyao...@oracle.com wrote:


Hi Kavin,

Thanks for your example. I think I have already done something very 
very similar before. I didn't post the full WatermarkStrategy 
interface in my previous email, but I do have that part already. I 
think the example you gave me is a punctuatedWatermarkStrategy, not 
boundoutoforderness one. My major concern now is that why my emitted 
watermark is not available in processElement() and why I have 8 
records for each time the code reaches the onPeriodicEmit part. I 
will post my code following your example below.


The symptom is that I will get the context watermark as 
LONG.MIN_VALUE if I use the watermark strategy below.


16:35:12,969 INFO 
org.myorg.quickstart.processor.TableOutputProcessFunction - context 
current key: 69215, context current watermark: -9223372036854775808



DataStream> retractStream =tEnv.toRetractStream(table, 
Row.class);
retractStream
 .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
 .keyBy(
 value -> {String invoice_id_key = (String) value.f1.getField(0); if 
(invoice_id_key ==null) {
 invoice_id_key = (String) value.f1.getField(4); }
   return invoice_id_key; })
 .process(new TableOutputProcessFunction())
 .name("ProcessTableOutput")
 .uid("ProcessTableOutput")
 .addSink(businessObjectSink)
 .name("businessObjectSink")
 .uid("businessObjectSink")
 .setParallelism(1);

watermark strategy:

public class PunctuatedWatermarkStrategyimplements WatermarkStrategy> {
 @Override public WatermarkGenerator>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
 return new PunctuatedTableOutputWatermarkGenerator(); }

 @Override public TimestampAssigner>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
 log.info("Inside timestamp assigner"); return (booleanRowTuple2, 
previousElementTimestamp) -> {
 return my timestamp; }; }
}

watermark generator code:

public class PunctuatedTableOutputWatermarkGeneratorimplements 
WatermarkGenerator> {
 @Override public void onEvent(Tuple2 booleanRowTuple2, long 
eventTimestamp, WatermarkOutput watermarkOutput) {
 watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); 
log.info("Emit Punctuated watermark: {}", eventTimestamp); }

 @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
 // don't need to do anything because we emit in reaction to events 
above }
}

16:35:13,584 INFO 
org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator 
- Emit Punctuated watermark: 1605054900905


From the log, I can see, it extract the eventTimestamp and emits the 
watermark. Why i can't access this piece of information in 
processElement() function.


Any suggestions? Thank you so much!


Best regards,

Fuyao



On 11/10/20 16:04, Kevin Kwon wrote:
Hi Fuyao, I think you need to implement your own /WatermarkStrategy/ 
class and register that to 
/window/./assignTimestampsAndWatermarks(new 
YourEventWatermarkStrategy)/

/
/
Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks /if 
you're using Kafka consumers

/
/
an example code for a booking event that has it's internal timestamp 
would be


public class BookingWatermarkStrategyimplements WatermarkStrategy {

   @Override public WatermarkGeneratorcreateWatermarkGenerator(
   WatermarkGeneratorSupplier.Context context
   ) {
 return new WatermarkGenerator() {
   private final long OUT_OF_ORDERNESS_MILLIS =30; private long 
currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override 
public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput 
output) {
 currentMaxTimestamp = Math.max(currentMaxTimestamp, 
bookingEvent.getTimestamp()); Watermark watermark =new 
Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1); 
output.emitWatermark(watermark); }

   @Override public void onPeriodicEmit(WatermarkOutput output) {
 // Do nothing since watermark will be emitted every event }
 }; }

   @Override public TimestampAssignercreateTimestampAssigner(
   TimestampAssignerSupplier.Context context
   ) {
 return (booking, recordTimestamp) -> booking.getTimestamp(); }
}

On Wed, Nov 11, 2020 at 12:28 AM > wrote:


Hi Experts,

I am trying to use to implement a KeyedProcessFunction with
onTimer()
callback. I need to use event time and I meet some problems with
making
the watermark available to my operator. I meet some strange
behaviors.

I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to
it. The
timestamp is just a field in the stream. 

BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-10 Thread fuyao . li

Hi Experts,

I am trying to use to implement a KeyedProcessFunction with onTimer() 
callback. I need to use event time and I meet some problems with making 
the watermark available to my operator. I meet some strange behaviors.


I have a joined retracted stream without watermark or timestamp 
information and i need to assign timestamps and watermarks to it. The 
timestamp is just a field in the stream. For the watermark generator part.


Problem:

1. I can use timelag watermark generator and make it work. But for 
BoundedOutofOrdernessGenator, The 
context.timerService().currentWatermark() in ProcessElement() always 
sticks to the initial setup and never updates.


2. I set the autoWatermark interval to 5 seconds for debug purpose, I 
only attach this watermark generator in one place with parallelism 1. 
However, I am getting 8 records at a time. timelag policy will advance 
all 8 records, outOfOrderness policy will only advance 1 records. Maybe 
the mismatch is causing the processElement() to capture the wrong 
default watermark?


https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

This is my code for watermark generator:

@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements 
WatermarkGenerator> {

    private final long maxTimeLag = 15000;
    private transient long currentMaxTimestamp = 15000;
    @Override
    public void onEvent(Tuple2 booleanRowTuple2, long 
eventTimestamp, WatermarkOutput output) {

    // the eventTimestamp is get through TimestampAssigner
    // 
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
    currentMaxTimestamp = Math.max(eventTimestamp, 
currentMaxTimestamp);

    log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
    // Policy 1: timelag strategy, can work and advance the timestamp
    long watermarkEpochTime = Math.max(System.currentTimeMillis() - 
maxTimeLag, currentMaxTimestamp);

    output.emitWatermark(new Watermark(watermarkEpochTime));

    // Policy 2: periodic emit based on event
    long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
    // output.emitWatermark(new Watermark(periodicEmitWatermarkTime));

    log.info("Emit Watermark: watermark based on system time: {}, 
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
    , watermarkEpochTime, periodicEmitWatermarkTime, 
currentMaxTimestamp);

    }
}


This is my log printed by the slf4j log above. Every time, it will give 
me 8 records, why it is 8 records? I think it should be 1 in theory. I 
am very confused. Also, the policy 1 is advancing all 8 records. Policy 
2 is advancing 1 of the 8 records and not reflected in processElement().


14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266199, 
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266199, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO 

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-04 Thread Fuyao Li
Hi Flink Users and Community,

For the first part of the question, the 12 hour time difference is caused
by a time extraction bug myself. I can get the time translated correctly
now. The type cast problem does have some workarounds to solve it..

My major blocker right now is the onTimer part is not properly triggered. I
guess it is caused by failing to configure the correct watermarks &
timestamp assigners. Please give me some insights.

1. If I don't chain the assignTimestampsAndWatermarks() method in together
with keyedBy().. and process().. method. The context.timestamp() in my
processElement() function will be null. Is this some expected behavior? The
Flink examples didn't chain it together. (see example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
)
2. If I use registerEventTimeTimer() in processElement(). The onTimer
method will not be triggered. However, I can trigger the onTimer method if
I simply change it to registerProcessingTimeTimer(). I am using the
settings below in the stream env.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);

My code for method the process chain:
retractStream

.assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((booleanRowTuple2,
timestamp) -> {
Row rowData = booleanRowTuple2.f1;
LocalDateTime headerTime =
(LocalDateTime)rowData.getField(3);
LocalDateTime linesTime =
(LocalDateTime)rowData.getField(7);

LocalDateTime latestDBUpdateTime = null;
if (headerTime != null && linesTime != null) {
latestDBUpdateTime =
headerTime.isAfter(linesTime) ? headerTime : linesTime;
}
else {
latestDBUpdateTime = (headerTime != null) ?
headerTime : linesTime;
}
if (latestDBUpdateTime != null) {
return
latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
}
// In the worst case, we use system time
instead, which should never be reached.
return System.currentTimeMillis();
}))
//  .assignTimestampsAndWatermarks(new MyWaterStrategy())  //
second way to create watermark, doesn't work
.keyBy(value -> {
// There could be null fields for header invoice_id
field
String invoice_id_key = (String)value.f1.getField(0);
if (invoice_id_key == null) {
invoice_id_key = (String)value.f1.getField(4);
}
return invoice_id_key;
})
.process(new TableOutputProcessFunction())
.name("ProcessTableOutput")
.uid("ProcessTableOutput")
.addSink(businessObjectSink)
.name("businessObjectSink")
.uid("businessObjectSink")
.setParallelism(1);

Best regards,
Fuyao

On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li  wrote:

> Hi Flink Community,
>
> I am doing some research work on Flink Datastream and Table API and I meet
> two major problems. I am using Flink 1.11.2, scala version 2.11, java 8. My
> use case looks like this. I plan to write a data processing pipeline with
> two stages. My goal is to construct a business object containing
> information from several Kafka streams with a primary key and emit the
> complete business object if such primary key doesn't  appear in the
> pipeline for 10 seconds.
>
> In the first stage, I first consume three Kafka streams and transform it
> to Flink Datastream using a deserialization schema containing some type and
> date format transformation, and then I register these data streams as Table
> and do a full outer join one by one using Table API. I also add query
> configuration for this to avoid excessive state. The primary key is also
> the join key.
>
> In the second stage, I transform the joined table to a retracted stream
> and put it into KeyedProcessFunction to generate the business object if the
> business object's primary key is inactive for 10 second.
>
> Is this way of handling the data the suggested approach? (I understand I
> can directly consume kafka data in Table API. I haven't tried that yet,
> maybe that's better?) Any suggestion is welcomed. During implementing this,
> I meet two major problems and several smaller question

Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-02 Thread Fuyao Li
ehavior? In the null case,
the strange thing is that, surprisingly, I can collect the business object
immediately without a designed 10 second waiting time... This shouldn't
happen, right...? The processing timer also seems to work. The code can
enter the on timer method.

 retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner()); (This is a deprecated method)

 retractStream
.keyBy()
.process(new TableOutputProcessFunction())
.name("ProcessTableOutput")
.uid("ProcessTableOutput")
.addSink(businessObjectSink)
.name("businessObjectSink")
.uid("businessObjectSink")
.setParallelism(1);

(2) For watermarks configuration. I use an field in the retracted stream as
the event time. This time is usually 15-20 seconds before current time.

In my environment, I have done some settings for streaming env based on
information here(
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator).
My event doesn't always come, so I think I need to set auto watermark
interval to let the event timer on timer works correctly. I have added the
code below.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);

1> Which kind of watermark strategy should I use? General
BoundOutofOrderness or Watermark generator?

I tried to write a Watermark generator and I just don't how to apply it to
the stream correctly. The documentation doesn't explain very clearly. My
code looks like below and it doesn't work.

assign part:

.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier>) context -> new TableBoundOutofOrdernessGenerator()))

watermark generater:

I just assign the event time attribute following the example in the doc. (
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
)

2> I also tried to use the static method in Water Strategy. The syntax is
correct, but I meet the same problem in 2.(1).

.assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(15))
.withTimestampAssigner((booleanRowTuple2,
timestamp) -> {

}))


(3) For the retracted datastream, do I need to explicitly attach it to the
stream environment? I think it is done by default, right? Just want to
confirm it. I do have the env.execute() at the end of the code.

I understand this is a lot of questions, thanks a lot for your patience
to look through my email! If there is anything unclear, please reach out to
me. Thanks!


Best regards,

Fuyao Li