Re: SQL Engine Type inference when extending AsyncTableFunction class twice.

2022-09-16 Thread Jonathan Weaver
I think I've narrowed it down to this function in ExtractionUtils

public static Optional> extractSimpleGeneric(
Class baseClass, Class clazz, int pos) {
try {
if (clazz.getSuperclass() != baseClass) {
return Optional.empty();
}
final Type t =
((ParameterizedType) clazz.getGenericSuperclass())
.getActualTypeArguments()[pos];
return Optional.ofNullable(toClass(t));
} catch (Exception unused) {
return Optional.empty();
}
}

clazz.superClasss() == "BaseClass" in my example and baseClass in the
function is expecting AsyncTableFunction .. because that doesn't
compare it returns an empty result, even though it's correctly getting the
type inference elsewise.

Is there a way we could allow multiple extends in the future, instead of
just allowing a direct single subclass?



On Thu, Sep 15, 2022 at 4:42 PM Jonathan Weaver 
wrote:

> I am having an issue with the automatic type inference with SQL engine in
> an AsyncTableFunction class.
>
> I am extending AsyncTableFunction in a BaseClass (common code).
>
> Then extending again for some specific implementations.
>
> FinalClass extends BaseClass
>
> If I use BaseClass it correctly infers the output of the RowData from the
> catalog.
> If I use FinalClass it errors with
>
> Cannot extract a data type from an internal
> 'org.apache.flink.table.data.RowData' class without further information.
> Please use annotations to define the full logical type.
>
> So something with the typeInference is not looking at the right class in
> the hierarchy.
>
> I have tried overriding typeInformation at various points but it doesn't
> seem to help.
>
> Does anyone have an idea of how to have a common base class that gets
> extended with correct automatic typeinference?
>
> I can provide more details if needed.
>


Re: Classloading issues with Flink Operator / Kubernetes Native

2022-09-16 Thread Yaroslav Tkachenko
Application mode. I've done a bit more research and created
https://issues.apache.org/jira/browse/FLINK-29288, planning to work on a PR
today.

TLDR: currently Flink operator always creates /opt/flink/usrlib folder and
forces you to specify the jarURI parameter, which is passed as
pipeline.jars / pipeline.classpaths configuration options. This leads to
the jar being loaded twice by different classloaders (system and user
ones).

On Fri, Sep 16, 2022 at 2:30 AM Matthias Pohl 
wrote:

> Are you deploying the job in session or application mode? Could you
> provide the stacktrace. I'm wondering whether that would be helpful to pin
> a code location for further investigation.
> So far, I couldn't come up with a definite answer about placing the jar in
> the lib directory. Initially, I would have thought that it's fine
> considering that all dependencies are included and the job jar itself ends
> up on the user classpath. I'm curious whether Chesnay (CC'd) has an answer
> to that one.
>
> On Tue, Sep 13, 2022 at 1:40 AM Yaroslav Tkachenko 
> wrote:
>
>> Hey everyone,
>>
>> I’m migrating a Flink Kubernetes standalone job to the Flink operator
>> (with Kubernetes native mode).
>>
>> I have a lot of classloading issues when trying to run with the operator
>> in native mode. For example, I have a Postgres driver as a dependency (I
>> can confirm the files are included in the uber jar), but I still get
>> "java.sql.SQLException: No suitable driver found for jdbc:postgresql:..."
>> exception.
>>
>> In the Kubernetes standalone setup my uber jar is placed in the
>> /opt/flink/lib folder, this is what I specify as "jarURI" in the operator
>> config. Is this supported? Should I only be using /opt/flink/usrlib?
>>
>> Thanks for any suggestions.
>>
>


Re: A question about restoring state with an additional variable with kryo

2022-09-16 Thread Vishal Santoshi
Thank you for the clarification. I thought so to,

Unfortunately my state are generics based and those are definitely not
treated as a POJO , though it has all the constructs ( no arg constructor,
getters/setters etc ). I will likely take an at least once hit by

Changing the uid of that specific Operator, and restart with Allow
non-restored state ... This will ignore state that cannot be restored ( for
the previous uid ) , construct state for the new uid  and not affect other
operators ( including the kafka consumer operators ). I can live with it, I
think.

On Fri, Sep 16, 2022 at 2:55 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Vishal,
>
>
>
> Good news and bad news 😊:
>
>
>
>- Bad: Kryo serializer cannot be used for schema evolution, see [1]
>- Good: not all is lost here,
>   - If you happen to have state that you cannot afford to lose, you
>   can transcode it by means of the savepoint API [2],
>   - However, this takes quite some effort
>- In general, if you ever plan to migrate/extend your schemas, choose
>a data type that supports schema migration [1],
>- In your case, PoJo types would be the closest to your original
>implementation
>- You can disable Kryo in configuration to avoid this situation in the
>future, by the way,
>- Kryo serializer is quite slow compared to the other options and I
>believe it is only there as a (emergency) fallback solution: [3]
>
>
>
> Feel free to ask for clarification 😊
>
>
>
> Thias
>
>
>
>
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#kryo-cannot-be-used-for-schema-evolution
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>
> [3]
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>
>
>
>
>
>
>
> *From:* Vishal Santoshi 
> *Sent:* Friday, September 16, 2022 1:17 AM
> *To:* user 
> *Subject:* Re: A question about restoring state with an additional
> variable with kryo
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> The exception thrown is as follows. I realize that it is trying to read
> the long value. How do I signal to kryo that it is OK and that he object
> can have a default value
>
>
>
> Caused by: java.io.EOFException: No more bytes left.
>
> at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
> .require(NoFetchingInput.java:80)
>
> at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>
> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>
> at com.esotericsoftware.kryo.serializers.
> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
>
> at com.esotericsoftware.kryo.serializers.
> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
>
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:113)
>
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:354)
>
> at org.apache.flink.api.common.typeutils.CompositeSerializer
> .deserialize(CompositeSerializer.java:156)
>
> at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(
> RocksDBValueState.java:89)
>
>
>
> On Thu, Sep 15, 2022 at 7:10 PM Vishal Santoshi 
> wrote:
>
> << How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `String b`.
>
>
>
> >> How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `long b`.
>
>
>
> Sorry a typo
>
>
>
>
>
> On Thu, Sep 15, 2022 at 7:04 PM Vishal Santoshi 
> wrote:
>
> I have state in rocksDB that represents say
>
>
>
> class A {
>
>   String a
>
> }
>
>
>
> I now change my class and add another variable
>
>
>
>
> Class A {
>
>   String a;
>
>   long b = 0;
>
> }
>
>
>
> How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `String b`.
>
>
>
> Unfortunately the state is not using POJO serializer.
>
>
>
> Thanks and Regards.
>
>
>
> Vishal
>
>
>
>
>
>
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitt

Re: ExecutionMode in ExecutionConfig

2022-09-16 Thread Yun Tang
Hi Hailu,

If you take a look at the history of ExecutionMode [1], apart from the 
refactoring commit, this class is introduced before the year 2016, in which 
DataSet API has not been deprecated.

>From my point of view, you should set runtime mode [2] instead of execution 
>mode currently if using Flink as a computation engine.


[1] 
https://github.com/apache/flink/commits/master/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
[2] 
https://github.com/apache/flink/blob/9d2ae5572897f3e2d9089414261a250cfc2a2ab8/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java#L98

Best
Yun Tang


From: zhanghao.c...@outlook.com 
Sent: Thursday, September 15, 2022 0:03
To: Hailu, Andreas ; user@flink.apache.org 

Subject: Re: ExecutionMode in ExecutionConfig

It's added in Flink 1.14: 
https://nightlies.apache.org/flink/flink-docs-master/zh/release-notes/flink-1.14/#expose-a-consistent-globaldataexchangemode.
 Not sure if there's a way to change this in 1.13

Best,
Zhanghao Chen

From: Hailu, Andreas 
Sent: Wednesday, September 14, 2022 23:38
To: zhanghao.c...@outlook.com ; 
user@flink.apache.org 
Subject: RE: ExecutionMode in ExecutionConfig


I can give this a try. Do you know which Flink version does this feature become 
available in?



ah



From: zhanghao.c...@outlook.com 
Sent: Wednesday, September 14, 2022 11:10 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



Could you try setting ”execution.batch-shuffle-mode‘=‘ALL_EXCHANGES_PIPELINED’? 
Looks like the ExecutionMode in ExecutionConfig does not work for DataStream 
APIs.



The default shuffling behavior for a DataStream API in batch mode is 
'ALL_EXCHANGES_BLOCKING' where upstream and downstream tasks run subsequently. 
On the other hand, the pipelined mode will have upstream and downstream tasks 
run simultaneously.





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 21:37
To: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: RE: ExecutionMode in ExecutionConfig



Hi Zhanghao,



That seems different than what I’m referencing and one of my points of 
confusion – the documents refer to ExecutionMode as BATCH and STREAMING which 
is different than what the code refers to it as Runtime Mode e.g. 
env.setRuntimeMode(RuntimeExecutionMode.BATCH);



I’m referring to the ExecutionMode in the ExecutionConfig e.g. 
env.getConfig().setExecutionMode(ExecutionMode.BATCH)/ 
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED). I’m not able to find 
documentation on this anywhere.







ah



From: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>
Sent: Wednesday, September 14, 2022 1:10 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
 gives a comprehensive description on it

Execution Mode (Batch/Streaming) | Apache 
Flink

Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the “classic” 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...

nightlies.apache.org





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: ExecutionMode in ExecutionConfig



Hello,



Is there somewhere I can learn more about the details of the effect of 
ExecutionMode in ExecutionConfig on a job? I am

Re: Classloading issues with Flink Operator / Kubernetes Native

2022-09-16 Thread Matthias Pohl via user
Are you deploying the job in session or application mode? Could you provide
the stacktrace. I'm wondering whether that would be helpful to pin a code
location for further investigation.
So far, I couldn't come up with a definite answer about placing the jar in
the lib directory. Initially, I would have thought that it's fine
considering that all dependencies are included and the job jar itself ends
up on the user classpath. I'm curious whether Chesnay (CC'd) has an answer
to that one.

On Tue, Sep 13, 2022 at 1:40 AM Yaroslav Tkachenko 
wrote:

> Hey everyone,
>
> I’m migrating a Flink Kubernetes standalone job to the Flink operator
> (with Kubernetes native mode).
>
> I have a lot of classloading issues when trying to run with the operator
> in native mode. For example, I have a Postgres driver as a dependency (I
> can confirm the files are included in the uber jar), but I still get
> "java.sql.SQLException: No suitable driver found for jdbc:postgresql:..."
> exception.
>
> In the Kubernetes standalone setup my uber jar is placed in the
> /opt/flink/lib folder, this is what I specify as "jarURI" in the operator
> config. Is this supported? Should I only be using /opt/flink/usrlib?
>
> Thanks for any suggestions.
>