Re: 404 Jar File Not Found w/ Web Submit Disabled

2023-08-14 Thread Shammon FY
Hi,

Currently you can upload a jar job to a flink session cluster, or submit a
job graph to the session cluster with rest api, for example, submit sql
jobs with jdbc driver to sql-gateway, then the gateway will build job graph
and submit it to the session cluster via rest endpoint.

If you configure `web.submit.enable=false` in your cluster, you could not
upload a jar job, but you can still submit jobs via rest endpoint. You can
create your `RestClusterClient` to do that or using the existing
jdbc-driver and sql-gateway.


Best,
Shammon FY

On Tue, Aug 15, 2023 at 12:14 AM patricia lee  wrote:

> Hi,
>
> Just to add, when I set back to "true" the web.ui submit property, that is
> when the rest endpoint /jars/upload worked again. But in the documentation
> reference:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/
>
>
> Disabling the UI doesnt disable the endpoint. Is this the expected
> behavior?
>
> Regards,
> Patricia
>
> On Mon, Aug 14, 2023, 5:07 PM patricia lee  wrote:
>
>> Hi,
>>
>> I disabled the web.ui.submit=false, after that uploading jar files via
>> rest endpoint is now throwing 404. In the documentation it says:
>>
>> "Even it is disabled sessions clusters still accept jobs through REST
>> requests (Http calls). This flag only guards the feature to upload jobs in
>> the UI"
>>
>> I also set the io.tmp.dirs to my specified directory.
>>
>>
>> But I can no longer upload jar via rest endpoint.
>>
>>
>> Regards,
>> Patricia
>>
>


Re: Flink SQL query with window-TVF fails

2023-08-14 Thread liu ron
Hi, Pouria

Flink SQL uses the calcite to parse SQL, this is the calcite limitation,
the minimum precision it supports is Second [1].

[1]
https://github.com/apache/calcite/blob/main/core/src/main/codegen/templates/Parser.jj#L5067

Best,
Ron

Pouria Pirzadeh  于2023年8月15日周二 08:09写道:

> I am trying to run a window aggregation SQL query (on Flink 1.16) with
> Windowing TVF for a TUMBLE window with a size of 5 Milliseconds it seems
> Flink does not let a window size use a time unit smaller than seconds. Is
> that correct?
> (The documentation
> 
> is not clear about that.)
>
> It is unexpected given that the datastream API lets defining a tumbling
> window
> 
> with a size in milliseconds using:
> window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>
> Here is the query:
> SELECT window_start, window_end, userid,  count(pageid) AS cnt
> FROM TABLE(TUMBLE(TABLE pageviews, DESCRIPTOR(rowtime), INTERVAL '5'
> MILLISECONDS))
> GROUP BY window_start, window_end, userid;
>
> error:
> SQL parse failed. Encountered "MILLISECONDS" at line 1, column 142.
> Was expecting one of:
> "DAY" ...
> "DAYS" ...
> "HOUR" ...
> "HOURS" ...
> "MINUTE" ...
> "MINUTES" ...
> "MONTH" ...
> "MONTHS" ...
> "SECOND" ...
> "SECONDS" ...
> "YEAR" ...
> "YEARS" ...
>


Re: Recommendations on using multithreading in flink map functions in java

2023-08-14 Thread liu ron
Hi, Vignesh

Flink is a distributed parallel computing framework, each MapFunction is
actually a separate thread. If you want more threads to process the data,
you can increase the parallelism of the MapFunction without having to use
multiple threads in a single MapFunction, which in itself violates the
original design intent of Flink.

Best,
Ron

Vignesh Kumar Kathiresan via user  于2023年8月15日周二
03:59写道:

> Hello All,
>
> *Problem statement *
> For a given element, I have to perform multiple(lets say N) operations on
> it. All the N operations are independent of each other. And for achieving
> lowest latency, I want to do them concurrently. I want to understand what's
> the best way to perform it in flink?.
>
> I understand flink achieves huge parallelism across elements. But is it
> anti-pattern to do parallel processing in a map func at single element
> level? I do not see anything on the internet for using multithreading
> inside a map function.
>
> I can always fan out with multiple copies of the same element and send
> them to different operators. But it incurs at the least a
> serialize/deserialize cost and may also incur network shuffle. Trying to
> see if a multithreaded approach is better.
>
> Thanks,
> Vignesh
>


Flink SQL query with window-TVF fails

2023-08-14 Thread Pouria Pirzadeh
I am trying to run a window aggregation SQL query (on Flink 1.16) with
Windowing TVF for a TUMBLE window with a size of 5 Milliseconds it seems
Flink does not let a window size use a time unit smaller than seconds. Is
that correct?
(The documentation

is not clear about that.)

It is unexpected given that the datastream API lets defining a tumbling
window

with a size in milliseconds using:
window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

Here is the query:
SELECT window_start, window_end, userid,  count(pageid) AS cnt
FROM TABLE(TUMBLE(TABLE pageviews, DESCRIPTOR(rowtime), INTERVAL '5'
MILLISECONDS))
GROUP BY window_start, window_end, userid;

error:
SQL parse failed. Encountered "MILLISECONDS" at line 1, column 142.
Was expecting one of:
"DAY" ...
"DAYS" ...
"HOUR" ...
"HOURS" ...
"MINUTE" ...
"MINUTES" ...
"MONTH" ...
"MONTHS" ...
"SECOND" ...
"SECONDS" ...
"YEAR" ...
"YEARS" ...


Recommendations on using multithreading in flink map functions in java

2023-08-14 Thread Vignesh Kumar Kathiresan via user
Hello All,

*Problem statement *
For a given element, I have to perform multiple(lets say N) operations on
it. All the N operations are independent of each other. And for achieving
lowest latency, I want to do them concurrently. I want to understand what's
the best way to perform it in flink?.

I understand flink achieves huge parallelism across elements. But is it
anti-pattern to do parallel processing in a map func at single element
level? I do not see anything on the internet for using multithreading
inside a map function.

I can always fan out with multiple copies of the same element and send them
to different operators. But it incurs at the least a serialize/deserialize
cost and may also incur network shuffle. Trying to see if a multithreaded
approach is better.

Thanks,
Vignesh


Re: [DISCUSS] Status of Statefun Project

2023-08-14 Thread Galen Warren via user
I created a pull request for this: [FLINK-31619] Upgrade Stateful Functions
to Flink 1.16.1 by galenwarren · Pull Request #331 · apache/flink-statefun
(github.com) .

JIRA is here: [FLINK-31619] Upgrade Stateful Functions to Flink 1.16.1 -
ASF JIRA (apache.org)
.

Statefun references 1.16.2, despite the title -- that version has come out
since the issue was created.

I figured out how to run all the playground tests locally, but it took a
bit of reworking of the playground setup with respect to Docker;
specifically, the Docker contexts used to build the example functions
needed to be broadened (i.e. moved up the tree) so that, if needed, local
artifacts/code can be accessed from within the containers at build time.
Then I made the Docker compose.yml configurable through environment
variables to allow for them to run in either the original manner -- i.e.
pulling artifacts from public repos -- or in a "local" mode, where
artifacts are pulled from local builds.

This process is a cleaner if the playground is a subfolder of the
flink-statefun project rather than be its own repository
(flink-statefun-playground), because then all the relative paths between
the playground files and the build artifacts are fixed. So, I'd like to
propose to move the playground files, modified as described above, to
flink-statefun/playground and retire flink-statefun-playground. I can
submit separate PR s those changes if everyone is on board.

Also, should I plan to do the same upgrade to handle Flink 1.17.x? It
should be easy to do, especially while the 1.16.x upgrade is fresh on my
mind.

Thanks.


On Fri, Aug 11, 2023 at 6:40 PM Galen Warren 
wrote:

> I'm done with the code to make Statefun compatible with Flink 1.16, and
> all the tests (including e2e succeed). The required changes were pretty
> minimal.
>
> I'm running into a bit of a chicken/egg problem executing the tests in
> flink-statefun-playground
> , though. That
> project seems to assume that all the various Statefun artifacts are built
> and deployed to the various public repositories already. I've looked into
> trying to redirect references to local artifacts; however, that's also
> tricky since all the building occurs in Docker containers.
>
> Gordon, is there a trick to running the sample code in
> flink-statefun-playground against yet-unreleased code that I'm missing?
>
> Thanks.
>
> On Sat, Jun 24, 2023 at 12:40 PM Galen Warren 
> wrote:
>
>> Great -- thanks!
>>
>> I'm going to be out of town for about a week but I'll take a look at this
>> when I'm back.
>>
>> On Tue, Jun 20, 2023 at 8:46 AM Martijn Visser 
>> wrote:
>>
>>> Hi Galen,
>>>
>>> Yes, I'll be more than happy to help with Statefun releases.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Tue, Jun 20, 2023 at 2:21 PM Galen Warren 
>>> wrote:
>>>
 Thanks.

 Martijn, to answer your question, I'd need to do a small amount of work
 to get a PR ready, but not much. Happy to do it if we're deciding to
 restart Statefun releases -- are we?

 -- Galen

 On Sat, Jun 17, 2023 at 9:47 AM Tzu-Li (Gordon) Tai <
 tzuli...@apache.org> wrote:

> > Perhaps he could weigh in on whether the combination of automated
> tests plus those smoke tests should be sufficient for testing with new
> Flink versions
>
> What we usually did at the bare minimum for new StateFun releases was
> the following:
>
>1. Build tests (including the smoke tests in the e2e module, which
>covers important tests like exactly-once verification)
>2. Updating the flink-statefun-playground repo and manually
>running all language examples there.
>
> If upgrading Flink versions was the only change in the release, I'd
> probably say that this is sufficient.
>
> Best,
> Gordon
>
> On Thu, Jun 15, 2023 at 5:25 AM Martijn Visser <
> martijnvis...@apache.org> wrote:
>
>> Let me know if you have a PR for a Flink update :)
>>
>> On Thu, Jun 8, 2023 at 5:52 PM Galen Warren via user <
>> user@flink.apache.org> wrote:
>>
>>> Thanks Martijn.
>>>
>>> Personally, I'm already using a local fork of Statefun that is
>>> compatible with Flink 1.16.x, so I wouldn't have any need for a released
>>> version compatible with 1.15.x. I'd be happy to do the PRs to modify
>>> Statefun to work with new versions of Flink as they come along.
>>>
>>> As for testing, Statefun does have unit tests and Gordon also sent
>>> me instructions a while back for how to do some additional smoke tests
>>> which are pretty straightforward. Perhaps he could weigh in on whether 
>>> the
>>> combination of automated tests plus those smoke tests should be 
>>> sufficient
>>> for testing with new Flink versions (I beli

Re: 404 Jar File Not Found w/ Web Submit Disabled

2023-08-14 Thread patricia lee
Hi,

Just to add, when I set back to "true" the web.ui submit property, that is
when the rest endpoint /jars/upload worked again. But in the documentation
reference:

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/


Disabling the UI doesnt disable the endpoint. Is this the expected
behavior?

Regards,
Patricia

On Mon, Aug 14, 2023, 5:07 PM patricia lee  wrote:

> Hi,
>
> I disabled the web.ui.submit=false, after that uploading jar files via
> rest endpoint is now throwing 404. In the documentation it says:
>
> "Even it is disabled sessions clusters still accept jobs through REST
> requests (Http calls). This flag only guards the feature to upload jobs in
> the UI"
>
> I also set the io.tmp.dirs to my specified directory.
>
>
> But I can no longer upload jar via rest endpoint.
>
>
> Regards,
> Patricia
>


Re: Question about serialization of java.util classes

2023-08-14 Thread Alexis Sarda-Espinosa
Hello,

AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing
Flink can do about that. Here's an example of helper classes I've been
using to support set serde in Flink POJOs, but note that it's hardcoded for
LinkedHashSet, so you would have to create different implementations if you
need to differentiate sorted sets:

https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398

Regards,
Alexis.


Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb :

> Hi,
>
> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
> ```
> package com.example;
> import java.util.ArrayList;
> import java.util.HashSet;
> import java.util.TreeSet;
>
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class App {
> public static class Pojo {
> public ArrayList list;
> public HashSet set;
> public TreeSet treeset;
> public Pojo() {
> this.list = new ArrayList<>();
> this.set = new HashSet<>();
> this.treeset = new TreeSet<>();
> }
> }
> public static void main(String[] args) throws Exception {
> var env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableGenericTypes();
> env.fromElements(new Pojo()).print();
> env.execute("Pipeline");
> }
> }
> ```
>
> The result of running:
> ```
> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - class java.util.ArrayList does not contain a setter for field size
> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Class class java.util.ArrayList cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types
> & Serialization" for details of the effect on performance and schema
> evolution.
> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#list will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - No fields were detected for class java.util.HashSet so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#set will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - No fields were detected for class java.util.TreeSet so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#sset will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and
> type java.util.ArrayList is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
>
> at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInte

Re: Streaming join performance

2023-08-14 Thread Alexey Novakov via user
Привет Артем!

Are your tables backed by Kafka? If - yes, what if you use upsert-kafka
connector from Table API
,
does it help to reduce the number of records in each subsequent join
operator?
I wrote a blog-post some time ago (see joins part) why upsert-kafka can be
useful when joining event tables:
https://www.ververica.com/blog/streaming-modes-of-flink-kafka-connectors

Best regards,
Alexey

On Wed, Aug 9, 2023 at 5:05 AM liu ron  wrote:

> Hi, David
>
> Regarding the N-way join, this feature aims to address the issue of state
> simplification, it is on the roadmap. Technically there are no limitations,
> but we'll need some time to find a sensible solution.
>
> Best,
> Ron
>
> David Anderson  于2023年8月9日周三 10:38写道:
>
>> This join optimization sounds promising, but I'm wondering why Flink
>> SQL isn't taking advantage of the N-Ary Stream Operator introduced in
>> FLIP-92 [1][2] to implement a n-way join in a single operator. Is
>> there something that makes this impossible/impractical?
>>
>> [1] https://cwiki.apache.org/confluence/x/o4uvC
>> [2] https://issues.apache.org/jira/browse/FLINK-15688
>>
>> On Sat, Aug 5, 2023 at 3:54 AM shuai xu  wrote:
>> >
>> > Hi, we are also paying attention to this issue and have completed the
>> validation of the minibatch join optimization including the intermediate
>> message folding you mentioned. We plan to officially release it in Flink
>> 1.19. This optimization could significantly improves the performance of
>> join operations and we are looking forward to the arrival of Flink 1.19 to
>> help solve your problem.
>> >
>> > On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote:
>> > > Hello, Flink community!
>> > >
>> > > I have some important use case for me, which shows extremely bad
>> performance:
>> > >
>> > >   *   Streaming application
>> > >   *   sql table api
>> > >   *   10 normal joins (state should be kept forever)
>> > >
>> > > Join rules are simple, i have 10 ten tables, which have same primary
>> key. I want to join result table from 10 pieces.
>> > >
>> > > But Flink joins sequentionally, so i have a chain with 10 joins.
>> > >
>> > > What happens if i generate update message for first table in chain:
>> > >
>> > >
>> > >   *   first join operator will produce 2 records: delete+insert
>> > >   *   second operator will double incoming messages. result=2*2=4
>> messages
>> > >   *   ...
>> > >   *   last operator will produce 2**10=1024 messages.
>> > >
>> > > Perfomance become extremely slow and resources are wasting away.
>> > >
>> > > I've made some simple compaction operator, which compacts records
>> after join:
>> > >
>> > >
>> > >   *   join operator after receiving delete message, generates 2
>> messages
>> > >   *   after receiving insert message, generate 2 more messges
>> > >   *   but two of the four are compacted. So operator receives 2
>> messages->sends 2 messages
>> > >
>> > > I wonder if this approach is right? Why it is not implemented in
>> Flink yet?
>> > >
>> > > And i've got some problem how should i implement it on cluster,
>> because i have changed some flink sources, which are not pluggable?
>> > >
>> > > I have modified StreamExecJoin class and added this code as a proof
>> of concept:
>> > >
>> > >
>> > > final OneInputTransformation compactTransform =
>> > > ExecNodeUtil.createOneInputTransformation(
>> > > transform,
>> > > "compact join results",
>> > > "description",
>> > > new ProcessOperator<>(new
>> CompactStreamOperator(equaliser)),
>> > > InternalTypeInfo.of(returnType),
>> > > leftTransform.getParallelism()
>> > > );
>> > >
>> > > return compactTransform;
>> > >
>> > > Transform operator:
>> > > @Override
>> > >
>> > > public void processElement(
>> > > RowData value,
>> > > ProcessFunction.Context ctx,
>> > > Collector collector) throws Exception {
>> > >
>> > > counter++;
>> > >
>> > > boolean compacted=false;
>> > > if (value.getRowKind()==RowKind.DELETE) {
>> > > value.setRowKind(RowKind.INSERT);
>> > > for (int i = buffer.size() - 1; i >= 0; i--) {
>> > > RowData x = buffer.get(i);
>> > > if (x.getRowKind() == RowKind.INSERT &&
>> recordEqualiser.equals(x, value)) {
>> > > buffer.remove(i);
>> > > compacted = true;
>> > > break;
>> > > }
>> > > }
>> > > value.setRowKind(RowKind.DELETE);
>> > > }
>> > >
>> > > if (!compacted) {
>> > > buffer.add(value);
>> > > }
>> > >
>> > > if (counter>=10)
>> > > {
>> > > buffer.forEach(collector::collect);
>> > > buffer.clear();
>> > > counter=0;
>> > > }
>> > > }
>> > >
>> > >
>> > > [cid:f886301c-4708-494e-a6df-d81137150774]
>> > >
>> > >
>> > >
>> > > Regards,
>> > >

Re: Question about serialization of java.util classes

2023-08-14 Thread s
Hi,

Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
```
package com.example;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.TreeSet;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class App {
public static class Pojo {
public ArrayList list;
public HashSet set;
public TreeSet treeset;
public Pojo() {
this.list = new ArrayList<>();
this.set = new HashSet<>();
this.treeset = new TreeSet<>();
}
}
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableGenericTypes();
env.fromElements(new Pojo()).print();
env.execute("Pipeline");
}
}
```

The result of running:
```
13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - class java.util.ArrayList does not contain a setter for field size
13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - Class class java.util.ArrayList cannot be used as a POJO type because not 
all fields are valid POJO fields, and must be processed as GenericType. Please 
read the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance and schema evolution.
13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - Field Pojo#list will be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - No fields were detected for class java.util.HashSet so it cannot be used 
as a POJO type and must be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - Field Pojo#set will be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - No fields were detected for class java.util.TreeSet so it cannot be used 
as a POJO type and must be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - Field Pojo#sset will be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner 
(file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
 to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release
Exception in thread "main" java.lang.UnsupportedOperationException: Generic 
types have been disabled in the ExecutionConfig and type java.util.ArrayList is 
treated as a generic type.
at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
at 
org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
at 
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
at 
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
at 
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
at 
org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
at 
org.apache.flink.strea

Re: Question about serialization of java.util classes

2023-08-14 Thread Alexey Novakov via user
Hi Saleh,

If you could show us the minimal code example of the issue (event classes),
I think someone could help you to solve it.

Best regards,
Alexey

On Mon, Aug 14, 2023 at 9:23 AM  wrote:

> Hi,
>
> According to this blog post
> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
> The "Must be processed as GenericType" message means that the POJO
> serializer will not be used and instead, Kyro will be used.
>
> I created a simple POJO to test it again with a java Collection but I got
> the same message. Disabling generic types throws an exception.
>
> I'm not sure how to use these types along with the POJO serializer or any
> other fast serializer.
>
> Best regards,
> Saleh.
>
>
>
> On 14 Aug 2023, at 4:59 AM, liu ron  wrote:
>
> Hi,
>
> According to the test in [1], I think Flink can recognize Pojo class which
> contains java List, so I think you can refer to the related Pojo class
> implementation.
>
> [1]
> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>
> Best,
> Ron
>
>  于2023年8月13日周日 22:50写道:
>
>> Greetings,
>>
>> I am working on a project that needs to process around 100k events per
>> second and I'm trying to improve performance.
>>
>> Most of the classes being used are POJOs but have a couple of fields
>> using a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet`
>> etc. This forces Flink to use Kyro and throw these warnings:
>>
>> ```
>> class java.util.ArrayList does not contain a setter for field size
>> Class class java.util.ArrayList cannot be used as a POJO type because not
>> all fields are valid POJO fields, and must be processed as GenericType.
>> Please read the Flink documentation on "Data Types & Serialization" for
>> details of the effect on performance and schema evolution.
>> ```
>>
>> ```
>> No fields were detected for class java.util.HashSet so it cannot be used
>> as a POJO type and must be processed as GenericType. Please read the Flink
>> documentation on "Data Types & Serialization" for details of the effect on
>> performance and schema evolution.
>> I read through the documentation and stackoverflow and the conclusion is
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>> annotation over my POJO.
>> ```
>>
>> My question is what do I need to do to get Flink to recognize my classes
>> as POJOs and use the POJO serializer for better performance?
>> I read through the documentation and stackoverflow and the conclusion is
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>> annotation over my POJO.
>> While this seems incredibly tedious and I keep thinking "there must be a
>> better way", I would be fine with this solution if I could figure out how
>> to do this for the Set types I'm using.
>>
>> Any help would be appreciated.
>
>
>


404 Jar File Not Found w/ Web Submit Disabled

2023-08-14 Thread patricia lee
Hi,

I disabled the web.ui.submit=false, after that uploading jar files via rest
endpoint is now throwing 404. In the documentation it says:

"Even it is disabled sessions clusters still accept jobs through REST
requests (Http calls). This flag only guards the feature to upload jobs in
the UI"

I also set the io.tmp.dirs to my specified directory.


But I can no longer upload jar via rest endpoint.


Regards,
Patricia


Re: Dependency injection framework for flink

2023-08-14 Thread Alexey Novakov via user
I would agree with Ron.

If you have a chance to use Scala, then it is much easier to compose Flink
process functions (or what have you) into a data stream. Simple Functional
Programming power.
Coming from a Java background into the Scala ecosystem sometime ago, I was
just surprised that proper language does not require any DI Framework.
Sometimes (10 % of cases) I used the Scala Macwire library to get DI
behaviour.

Best regards,
Alexey

On Sun, Aug 6, 2023 at 2:19 PM liu ron  wrote:

> Hi, Oscar
>
> IMO, Flink as a big data compute engine, its main goal is to provide
> general-purpose computing power, not as a back-end service, or to solve a
> specific business problem, so it doesn't need a dependency injection
> framework, so that's why you didn't find information about it in the Flink
> community. Of course from the point of view of your needs, because you want
> to use DataStream or Table API to solve a specific business problem, which
> may have a lot of external dependencies, so the dependency injection
> framework will be very useful. Based on what you've heard and my
> experience, Spring should be a good choice.
>
> Best,
> Ron
>
> Oscar Perez via user  于2023年8月1日周二 23:39写道:
>
>> Hi,
>> we are currently migrating some of our jobs into hexagonal architecture
>> and I have seen that we can use spring as dependency injection framework,
>> see:
>>
>>
>> https://getindata.com/blog/writing-flink-jobs-using-spring-dependency-injection-framework/
>>
>> Has anybody analyzed different JVM DI frameworks e.g guice, micronaut,
>> etc and feasibility and performance on apache flink?
>>
>> using google I have found some issues with dagger and flink while
>> guice/spring seems better suited but I could not find a study of
>> performance recommendations from the flink community.
>>
>> Thanks!
>> Oscar
>>
>


Re: Conversion expects insert-only records but DataStream API record contains: UPDATE_BEFORE

2023-08-14 Thread Hang Ruan
Hi,

Changelog mode is the concept of the table API. You can get a changelog
stream from StreamTableEnvironment#fromChangelogStream.
For kafka connector, its changelog mode depends on the used format.

Best,
Hang

liu ron  于2023年8月13日周日 22:06写道:

> Hi,
>
> After deep dive into the source code, I guess you use the
> StreamTableEnvironment#fromDataStream method, this method only supports the
> insert-only message. According to your case, I think you should use the
> StreamTableEnvironment#fromChangelogStream[1], it supports consuming update
> row.
>
> [1]
> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java#L317
>
> Best,
> Ron
>
> 完结篇 <2366123...@qq.com> 于2023年8月12日周六 02:29写道:
>
>> Flink:1.15.2
>>
>> I am now going to change the data stream from *DataStream* to
>> *DataStream*
>>
>> Already implemented (*insert only works fine*), but when
>> DataStream contains *update *information
>>
>> The error is:
>> *Caused by: org.apache.flink.util.FlinkRuntimeException: Error during
>> input conversion. Conversion expects insert-only records but DataStream API
>> record contains: UPDATE_BEFORE*
>> at
>> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> *at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:180)*
>> at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:160)
>> at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
>> at
>> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
>> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:118)
>> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:107)
>> at
>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> *kafkaflink.java:179-180 lines of code*
>>
>> Row before = ChangelogToRowUtil.extractRow(RowKind.UPDATE_BEFORE,
>> beforeObject, rowTypeInfo);
>> collector. collect(before);
>>
>> The before data output is -U[1, test, 123-456-789]
>>
>> I would like to know : How to convert the stream containing *update* data
>> from *DataStream* to *DataStream*
>>
>


Re: Question about serialization of java.util classes

2023-08-14 Thread s
Hi,

According to this blog post 
https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
 

The "Must be processed as GenericType" message means that the POJO serializer 
will not be used and instead, Kyro will be used.

I created a simple POJO to test it again with a java Collection but I got the 
same message. Disabling generic types throws an exception.

I'm not sure how to use these types along with the POJO serializer or any other 
fast serializer.

Best regards,
Saleh.



> On 14 Aug 2023, at 4:59 AM, liu ron  wrote:
> 
> Hi,
> 
> According to the test in [1], I think Flink can recognize Pojo class which 
> contains java List, so I think you can refer to the related Pojo class 
> implementation.
> 
> [1] 
> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>  
> 
> 
> Best,
> Ron
> 
> mailto:s...@sammar.sa>> 于2023年8月13日周日 22:50写道:
> Greetings,
> 
> I am working on a project that needs to process around 100k events per second 
> and I'm trying to improve performance.
> 
> Most of the classes being used are POJOs but have a couple of fields using a 
> `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This 
> forces Flink to use Kyro and throw these warnings:
> 
> ```
> class java.util.ArrayList does not contain a setter for field size
> Class class java.util.ArrayList cannot be used as a POJO type because not all 
> fields are valid POJO fields, and must be processed as GenericType. Please 
> read the Flink documentation on "Data Types & Serialization" for details of 
> the effect on performance and schema evolution.
> ```
> 
> ```
> No fields were detected for class java.util.HashSet so it cannot be used as a 
> POJO type and must be processed as GenericType. Please read the Flink 
> documentation on "Data Types & Serialization" for details of the effect on 
> performance and schema evolution.
> I read through the documentation and stackoverflow and the conclusion is that 
> I need to make a TypeInfoFactory and use it inside a TypeInfo annotation over 
> my POJO.
> ```
> 
> My question is what do I need to do to get Flink to recognize my classes as 
> POJOs and use the POJO serializer for better performance?
> I read through the documentation and stackoverflow and the conclusion is that 
> I need to make a TypeInfoFactory and use it inside a TypeInfo annotation over 
> my POJO.
> While this seems incredibly tedious and I keep thinking "there must be a 
> better way", I would be fine with this solution if I could figure out how to 
> do this for the Set types I'm using.
> 
> Any help would be appreciated.