Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-20 Thread Klemens Muthmann
Hi,

I guess this is more of a Java Problem than a Flink Problem. If you want it 
quick and dirty you could implement a class such as:

public class Value {
private boolean isLongSet = false;
private long longValue = 0L;
private boolean isIntegerSet = false;
private int intValue = 0;

   public Value(final long value) {
   setLong(value);
   }

public void setLong(final long value) |
longValue = value;
isLongSet = true;
   }

   public long getLong() {
   if(isLongSet) {
   return longValue
   }
   }

   // Add same methods for int
   // to satisfy POJO requirements you will also need to add a no-argument 
constructor as well as getters and setters for the boolean flags
}

I guess a cleaner solution would be possible using a custom Kryo serializer as 
explained here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

Regards
  Klemens



> Am 20.04.2021 um 10:34 schrieb Miguel Araújo :
> 
> Hi everyone,
> 
> I have a ProcessFunction which needs to store different number types for 
> different keys, e.g., some keys need to store an integer while others need to 
> store a double.
> 
> I tried to use java.lang.Number as the type for the ValueState, but I got the 
> expected "No fields were detected for class java.lang.Number so it cannot be 
> used as a POJO type and must be processed as GenericType." 
> 
> I have the feeling that this is not the right approach, but the exact type to 
> be stored is only known at runtime which makes things a bit trickier. Is 
> there a way to register these classes correctly, or Is it preferable to use 
> different ValueState's for different types?
> 
> Thanks,
> Miguel



Re: Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-15 Thread Klemens Muthmann
Hi,

Since kindergarden time is shortened due to the pandemic I only get four hours 
of work into each day and I am supposed to do eight. So unfortunately I will 
not be able to develop a fix at the moment. -.- I am happy to provide any debug 
log you need or test adaptations and provide fixes as pull requests. But I will 
sadly have no time to do any research into the problem. :( So for now I guess I 
will be using one of our Linux servers to test the Flink Pipelines until 
Silicon is supported.

Nevertheless, thanks for your answer. If there is anything I can provide you to 
narrow down the problem, I am happy to help.

Regards
 Klemens

> Am 15.04.2021 um 20:59 schrieb Robert Metzger :
> 
> Hey Klemens,
> 
> I'm sorry that you are running into this. Looks like you are the first (of 
> probably many people) who use Flink on a M1 chip.
> 
> If you are up for it, we would really appreciate a fix for this issue, as a 
> contribution to Flink.
> Maybe you can distill the problem into an integration test, so that you can 
> look into fixes right from your IDE. (It seems that the RestClient is causing 
> the problems. The client is used by the command line interface to upload the 
> job to the cluster (that's not happening when executing the job from the IDE))
> My first guess is that a newer netty version might be required? Or maybe 
> there's some DEBUG log output that's helpful in understanding the issue?
> 
> 
> 
> 
> On Tue, Apr 13, 2021 at 5:34 PM Klemens Muthmann  <mailto:klemens.muthm...@cyface.de>> wrote:
> Hi,
> 
> I've just tried to run the basic example for Apache Flink 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html>
>  on an Apple Mac Pro with the new M1 Processor. I only need this for 
> development purposes. The actual thing is going to run on a Linux server 
> later on, so I would not mind if it only runs using the Rosetta compatibility 
> layer. Unfortunately it failed with the following Stack Trace:
> 
> flink-1.12.2 ./bin/flink run examples/streaming/WordCount.jar
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by 
> org.apache.flink.api.java.ClosureCleaner 
> (file:/Users/muthmann/Development/Flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar)
>  to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of 
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> 
> 
>  The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute job 'Streaming WordCount'.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
> 'Streaming WordCount'.
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at 
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> 

Re: JSON source for pyflink stream

2021-04-14 Thread Klemens Muthmann
Hi,

We are loading our JSON from a Mongo Database. But we also found no readily 
available way to stream JSON Data into a Flink Pipeline. I guess this would be 
hard to implement since you have to know details about the JSON structure to do 
this. So I guess your best bet would be to implement your own input source, 
which can stream in your file and create results based on the JSON structure. 
We are not using Pyflink so I can not give any details about this, but it 
should not matter, which language you use.

Just implement a Source reading your input and employ any JSON parser you like, 
creating for example domain objects with the same attributes as your JSON 
structure and forward those into your Flink Pipeline for further processing.

Regards
Klemens

> Am 14.04.2021 um 04:40 schrieb Yik San Chan :
> 
> Hi Giacomo,
> 
> I think you can try using Flink SQL connector. For JSON input such as {"a": 
> 1, "b": {"c": 2, {"d": 3}}}, you can do:
> 
> CREATE TABLE data (
>   a INT,
>   b ROW>
> ) WITH (...)
> 
> Let me know if that helps.
> 
> Best,
> Yik San
> 
> On Wed, Apr 14, 2021 at 2:00 AM  > wrote:
> Hi,
> I'm new to Flink and I am trying to create a stream from locally downloaded 
> tweets. The tweets are in json format, like in this example:
>  
> {"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC 
> ","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
> "author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter
>  for Android","lang":"in"},
> "includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan
>  Pareda","created_at":"2021-03-05T14:07:56.000Z",
> "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
> "username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
>  
> I would like to do it in Python using Pyflink, but could also use Java if 
> there is no reasonable way to do it in Python. I've been looking at different 
> options for loading these objects into a stream, but am not sure what to do. 
> Here's my situation so far:
>  
> 1. There doesn't seem to be a fitting connector. The filesystem-connector 
> doesn't seem to support json format.
> 2. I've seen in the archive of this mailing list that some reccomend to use 
> the Table API. But I am not sure if this is a viable option given how nested 
> the json objects are.
> 3. I could of course try to implement a custom DataSource, but that seems to 
> be quite difficult so I'd only consider this if there's no other way.
> 
> I'll be very grateful for any kind of input.
> Cheers,
> Giacomo
>  



Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-13 Thread Klemens Muthmann
 did anyone get this to run? Is there any chance of executing 
Flink jobs on Apple Silicon?

I’ve posted the same question on StackOverflow 
<https://stackoverflow.com/questions/67071254/running-apache-flink-1-12-jobs-on-apple-m1-silicon>.
 So If you’d like some points there you are free to head over and post a reply. 
;)

Thanks and Regards
 Klemens Muthmann





Re: Question: How to avoid local execution being terminated before session window closes

2020-12-03 Thread Klemens Muthmann

Hi,

Thanks for the hint. The infinite loop was the solution and my pipeline 
works now.


Regards

    Klemens

Am 24.11.20 um 16:59 schrieb Timo Walther:
For debugging you can also implement a simple non-parallel source 
using 
`org.apache.flink.streaming.api.functions.source.SourceFunction`. You 
would need to implement the run() method with an endless loop after 
emitting all your records.


Regards,
Timo

On 24.11.20 16:07, Klemens Muthmann wrote:

Hi,

Thanks for your reply. I am using processing time instead of event 
time, since we do get the events in batches and some might arrive 
days later.


But for my current dev setup I just use a CSV dump of finite size as 
input. I will hand over the pipeline to some other guys, who will 
need to integrate it with an Apache Kafka Service. Output is written 
to a Postgres-Database-System.


I'll have a look at your proposal and let you know if it worked, 
after having finished a few prerequisite parts.


Regards

 Klemens

Am 24.11.20 um 12:59 schrieb Timo Walther:

Hi Klemens,

what you are observing are reasons why event-time should be 
preferred over processing-time. Event-time uses the timestamp of 
your data while processing-time is to basic for many use cases. Esp. 
when you want to reprocess historic data, you want to do that at 
full speed instead of waiting 1 hour for 1-hour-windows.


If you want to use processing-time nevertheless, you need to use a 
source that produced unbounded streams instead of bounded streams 
such that the pipeline execution theoretically is infinite. Some 
documentation can be found here [1] where you need to use the 
`FileProcessingMode.PROCESS_CONTINUOUSLY`. But what kind of 
connector are you currently using?


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources 



On 24.11.20 09:59, Klemens Muthmann wrote:

Hi,

I have written an Apache Flink Pipeline containing the following 
piece of code (Java):


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
CustomAggregator()).print();


If I run the pipeline using local execution I see the following 
behavior. The "CustomAggregator" calls the `createAccumulator` and 
`add` methods correctly with the correct data. However it never 
calls `getResult` and my pipeline simply finishes.


So I did a little research and found out that it works if I change 
the code to:


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
CustomAggregator()).print();


Notice the reduced gap time for the processing time session window. 
So it seems that execution only continues if the window has been 
closed and if that takes too long, the execution simply aborts. I 
guess another factor playing a part in the problem is, that my 
initial data is read in much faster than 50 seconds. This results 
in the pipeline being in a state where it only waits for the window 
to be closed and having nothing else to do it decides that there is 
no work left and simply shuts down.


My question now is if it is possible to tell the local execution 
environment to wait for that window to be closed, instead of just 
shutting down.


Thanks and Regards

 Klemens Muthmann






--
Mit freundlichen Grüßen
  Dr.-Ing. Klemens Muthmann

---
Cyface GmbH
Hertha-Lindner-Straße 10
01067 Dresden
web: www.cyface.de
email: klemens.muthm...@cyface.de



Re: Question: How to avoid local execution being terminated before session window closes

2020-11-24 Thread Klemens Muthmann

Hi,

Thanks for your reply. I am using processing time instead of event time, 
since we do get the events in batches and some might arrive days later.


But for my current dev setup I just use a CSV dump of finite size as 
input. I will hand over the pipeline to some other guys, who will need 
to integrate it with an Apache Kafka Service. Output is written to a 
Postgres-Database-System.


I'll have a look at your proposal and let you know if it worked, after 
having finished a few prerequisite parts.


Regards

    Klemens

Am 24.11.20 um 12:59 schrieb Timo Walther:

Hi Klemens,

what you are observing are reasons why event-time should be preferred 
over processing-time. Event-time uses the timestamp of your data while 
processing-time is to basic for many use cases. Esp. when you want to 
reprocess historic data, you want to do that at full speed instead of 
waiting 1 hour for 1-hour-windows.


If you want to use processing-time nevertheless, you need to use a 
source that produced unbounded streams instead of bounded streams such 
that the pipeline execution theoretically is infinite. Some 
documentation can be found here [1] where you need to use the 
`FileProcessingMode.PROCESS_CONTINUOUSLY`. But what kind of connector 
are you currently using?


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources


On 24.11.20 09:59, Klemens Muthmann wrote:

Hi,

I have written an Apache Flink Pipeline containing the following 
piece of code (Java):


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
CustomAggregator()).print();


If I run the pipeline using local execution I see the following 
behavior. The "CustomAggregator" calls the `createAccumulator` and 
`add` methods correctly with the correct data. However it never calls 
`getResult` and my pipeline simply finishes.


So I did a little research and found out that it works if I change 
the code to:


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
CustomAggregator()).print();


Notice the reduced gap time for the processing time session window. 
So it seems that execution only continues if the window has been 
closed and if that takes too long, the execution simply aborts. I 
guess another factor playing a part in the problem is, that my 
initial data is read in much faster than 50 seconds. This results in 
the pipeline being in a state where it only waits for the window to 
be closed and having nothing else to do it decides that there is no 
work left and simply shuts down.


My question now is if it is possible to tell the local execution 
environment to wait for that window to be closed, instead of just 
shutting down.


Thanks and Regards

 Klemens Muthmann




--
Mit freundlichen Grüßen
  Dr.-Ing. Klemens Muthmann

---
Cyface GmbH
Hertha-Lindner-Straße 10
01067 Dresden
web: www.cyface.de
email: klemens.muthm...@cyface.de



Question: How to avoid local execution being terminated before session window closes

2020-11-24 Thread Klemens Muthmann

Hi,

I have written an Apache Flink Pipeline containing the following piece 
of code (Java):


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
CustomAggregator()).print();


If I run the pipeline using local execution I see the following 
behavior. The "CustomAggregator" calls the `createAccumulator` and `add` 
methods correctly with the correct data. However it never calls 
`getResult` and my pipeline simply finishes.


So I did a little research and found out that it works if I change the 
code to:


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
CustomAggregator()).print();


Notice the reduced gap time for the processing time session window. So 
it seems that execution only continues if the window has been closed and 
if that takes too long, the execution simply aborts. I guess another 
factor playing a part in the problem is, that my initial data is read in 
much faster than 50 seconds. This results in the pipeline being in a 
state where it only waits for the window to be closed and having nothing 
else to do it decides that there is no work left and simply shuts down.


My question now is if it is possible to tell the local execution 
environment to wait for that window to be closed, instead of just 
shutting down.


Thanks and Regards

    Klemens Muthmann