Can Connected Components run on a streaming dataset using iterate delta?

2020-02-17 Thread kant kodali
Hi All,

I am wondering if connected components

can run on a streaming data? or say incremental batch?

I see that with delta iteration not all vertices need to participate at
every iteration which is great but in my case the graph is evolving over
time other words new edges are getting added over time. If so, does the
algorithm needs to run on the entire graph or can it simply run on the new
batch of edges?

Finally, What is the best way to compute connected components on Graphs
evolving over time? Should I use streaming or batch or any custom
incremental approach? Also, the documentation take maxIterations as an
input. How can I come up with a good number for max iterations? and once I
come up with a good number for max Iterations is the algorithm guaranteed
to converge?


Thanks,
Kant


Re: Flink 'Job Cluster' mode Ui Access

2020-02-17 Thread Jatin Banger
Hi,

Recently i upgraded flink version to 1.8.3
For Session cluster it shows the version correctly.
But for job cluster. I get this in the logs

*Starting StandaloneJobClusterEntryPoint (Version: , Rev:6322618,
Date:04.09.2019 @ 22:07:41 CST)*

And my Classpath has these jars:

*Classpath:
/opt/flink/lib/flink-metrics-prometheus-1.8.3.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.6.5-7.0.jar:/opt/flink/lib/job.jar:/opt/flink/lib/log4j-over-slf4j-1.7.28.jar:/opt/flink/lib/logback-classic-1.2.3.jar:/opt/flink/lib/logback-core-1.2.3.jar:/opt/flink/lib/flink-dist_2.11-1.8.3.jar::/opt/hdfs:*

Do you have any idea what could have caused this?

Best Regards,
Jatin

On Fri, Dec 13, 2019 at 6:18 PM Chesnay Schepler  wrote:

> Thank you for the logs.
>
> Flink can indeed find the WebUI files in the distribution, which is a bit
> odd.
> Since there are no static files serve in this case, the
> StaticFileServerHandler is never set up in the first place (hence why we
> didn't find any log statements).
>
> What I also found in the logs (and, looking back, in one of your earlier
> replies) was this: Version: , Rev:ceba8af, Date:11.02.2019 @
> 22:17:09 CST
>
> This tells us a few things.
> a) You are not using 1.8.1, but 1.7.2 (based on the revision)
> b) You are not using an official release, since the build-date differs
> from the official releases
>
> I tried one of the official 1.7.2 releases, and the WebUI is shown both
> when using:
> (after copying the wordcount example into /lib)
> ./bin/standalone-job.sh start-foreground -j
> org.apache.flink.examples.java.wordcount.WordCount
> ./bin/standalone-job.sh start -j
> org.apache.flink.examples.java.wordcount.WordCount
>
> Right now I don't know what else to look for; there are some discrepancies
> as to what your environment is vs what you described, and as such I can
> only recommend to carefully evaluate what you have actually running and
> possibly try again with an official release.
>
> Regards,
> Chesnay
>
> On 13/12/2019 09:58, Jatin Banger wrote:
>
> Sure, here it is.
> Job Manager Logs with logging level as DEBUG
>
> On Wed, Dec 11, 2019 at 3:14 PM Chesnay Schepler 
> wrote:
>
>> Would it be possible for you to provide us with full debug log file?
>>
>> On 10/12/2019 18:07, Jatin Banger wrote:
>>
>> Yes, I did.
>>
>> On Tue, Dec 10, 2019 at 3:47 PM Arvid Heise  wrote:
>>
>>> Hi Jatin,
>>>
>>> just to be sure. Did you increase the log level to debug [1] before
>>> checking for *StaticFileServerHandler*?
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html#configuring-log4j
>>>
>>> On Mon, Dec 9, 2019 at 7:54 AM Jatin Banger 
>>> wrote:
>>>
 Hi,

 I have checked the logs with this keyword  *StaticFileServerHandler   *in
 it, But there were no logs coming for "Flink Job Cluster".
 Then i checked for Flink Session Cluster, i was able to find the logs
 for the *StaticFileServerHandler *keyword.

 Can i raise this as bug ?

 Best Regards,
 Jatin


 On Thu, Dec 5, 2019 at 8:59 PM Chesnay Schepler 
 wrote:

> Ok, it's good to know that the WebUI files are there.
>
> Please enable DEBUG logging and try again, searching for messages from
> the StaticFileServerHandler.
>
> This handler logs every file that is requested (which effectively
> happens when the WebUI is being served); let's see what is actually being
> requested.
>
> On 05/12/2019 05:57, Jatin Banger wrote:
>
> I have tried that already using
> '$FLINK_HOME/bin/jobmanager.sh" start-foreground
> Ui comes fine with this one.
> Which means web/index.html is present.
>
>
> On Wed, Dec 4, 2019 at 9:01 PM Chesnay Schepler 
> wrote:
>
>> hmm...this is quite odd.
>>
>> Let's try to narrow things down a bit.
>>
>> Could you try starting a local cluster (using the same distribution)
>> and checking whether the UI is accessible?
>>
>> Could you also check whether the flink-dist.jar in /lib contains
>> web/index.html?
>> On 04/12/2019 06:02, Jatin Banger wrote:
>>
>> Hi,
>>
>> I am using flink binary directly.
>>
>> I am using this command to deploy the script.
>>
>> "$FLINK_HOME/bin/standalone-job.sh"
>> start-foreground --job-classname ${ARGS_FOR_JOB}
>> where ARGS_FOR_JOB contain job class name and all other necessary
>> details needed by the job.
>>
>> Best regards,
>> Jatin
>>
>>
>> On Fri, Nov 29, 2019 at 4:18 PM Chesnay Schepler 
>> wrote:
>>
>>> To clarify, you ran "mvn package -pl flink-dist -am" to build Fink?
>>>
>>> If so, could you run that again and provide us with the maven output?
>>>
>>> On 29/11/2019 11:23, Jatin Banger wrote:
>>>
>>> Hi,
>>>
>>> @vino yang   I am using flink 1.8.1
>>>
>>> I am using the followi

Re: [Flink 1.10] Classpath doesn't include custom files in lib/

2020-02-17 Thread Yang Wang
Hi Maxim,

Both Yarn per-job and session cluster should work. Since before the
JobManager and TaskManager
launcher, Yarn NodeManager could guarantee that all the local resources
have been localized and
accessible.
If do not want to use the getResource to read the file and use File
interface instead, you should use
"lib/job.properties" as your file path. Because the PWD of JobManager and
TaskManager process is
the Yarn container workdir.
If it still could not work, do you mind to share your codes and TaskManager
logs?


Best,
Yang

Maxim Parkachov  于2020年2月18日周二 上午12:31写道:

> Hi Yang,
>
> I've just tried your suggestions, but, unfortunately, in yarn per job mode
> it doesn't work, both commands return null.
> I double checked that file is shipped to yarn container, but I feel that
> it happens later in process.
> At the moment I'm reading file with File interface, instead of getting it
> as resource, which I do in local mode.
>
> Regards,
> Maxim.
>
>
> On Mon, Feb 17, 2020 at 3:03 PM Yang Wang  wrote:
>
>> Hi Maxim,
>>
>> I have verified that the following two ways could both work.
>>
>> getClass().getClassLoader().getResource("lib/job.properties")
>> getClass().getClassLoader().getResource("job.properties")
>>
>>
>> Best,
>> Yang
>>
>> Maxim Parkachov  于2020年2月17日周一 下午6:47写道:
>>
>>> Hi Yang,
>>>
>>> thanks, this explains why classpath behavior changed, but now I struggle
>>> to
>>> understand how I could overwrite resource, which is already shipped in
>>> job jar.
>>>
>>> Before I had job.properties files in JAR in under
>>> resources/lib/job.properties
>>> for local development and deploying on cluster it was overwritten
>>> with environment specific settings in  lib/job.properties of flink
>>> distribution.
>>> Now this doesn't seem to work.  I'm using:
>>>
>>> getClass.getClassLoader.getResource("lib/job.properties")
>>>
>>> to get file. Could it be the problem ?
>>>
>>> Thanks,
>>> Maxim.
>>>
>>> On Mon, Feb 17, 2020 at 4:12 AM Yang Wang  wrote:
>>>
 Hi Maxim Parkachov,

 The users files also have been shipped to JobManager and TaskManager.
 However, it
 is not directly added to the classpath. Instead, the parent directory
 is added to the
 classpath. This changes are to make resource classloading work. You
 could check more
 information here[1].


 [1]. https://issues.apache.org/jira/browse/FLINK-13127


 Best,
 Yang

 Maxim Parkachov  于2020年2月15日周六 上午12:58写道:

> Hi everyone,
>
> I'm trying to run my job with flink 1.10 with YARN cluster per-job
> mode. In the previous versions all files in lib/ folder were automatically
> included in classpath. Now, with 1.10 I see only *.jar files are included
> in classpath. but not "other" files. Is this deliberate change or bug ?
>
> Generally, what is recommended way to include custom files in
> classpath and ship it during start to all containers ?
>
> Thank
>



Re: job history server

2020-02-17 Thread Richard Moorhead
I did not know that.

I have since wiped the directory. I will post when I see this error again.

On Mon, Feb 17, 2020 at 8:03 PM Benchao Li  wrote:

> `df -H` only gives the sizes, not inodes information. Could you also show
> us the result of `df -iH`?
>
> Richard Moorhead  于2020年2月18日周二 上午9:40写道:
>
>> Yes, I did. I mentioned it last but I should have been clearer:
>>
>> 22526:~/ $ df -H
>>
>>
>>  [18:15:20]
>> FilesystemSize  Used Avail Use% Mounted on
>> /dev/mapper/vg00-rootlv00
>>   2.1G  777M  1.2G  41% /
>> tmpfs 2.1G  753M  1.4G  37% /dev/shm
>>
>> On Mon, Feb 17, 2020 at 7:13 PM Benchao Li  wrote:
>>
>>> Hi Richard,
>>>
>>> Have you checked that inodes of the disk partition were full or not?
>>>
>>> Richard Moorhead  于2020年2月18日周二 上午8:16写道:
>>>
 I see the following exception often:

 2020-02-17 18:13:26,796 ERROR
 org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher  -
 Failure while fetching/processing job archive for job
 eaf0639027aca1624adaa100bdf1332e.
 java.nio.file.FileSystemException:
 /dev/shm/flink-history-server/jobs/eaf0639027aca1624adaa100bdf1332e/vertices/6abf3ed37d1a5e48f2786b832033f074/subtasks/86/attempts:
 No space left on device
 at
 sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
 at
 sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
 at
 sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
 at
 sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
 at java.nio.file.Files.createDirectory(Files.java:674)
 at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
 at java.nio.file.Files.createDirectories(Files.java:767)
 at
 org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:186)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at
 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)


 Unfortunately the partition listed does not appear to be full or
 anywhere near full?

 Is there a workaround to this?


>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: job history server

2020-02-17 Thread Benchao Li
`df -H` only gives the sizes, not inodes information. Could you also show
us the result of `df -iH`?

Richard Moorhead  于2020年2月18日周二 上午9:40写道:

> Yes, I did. I mentioned it last but I should have been clearer:
>
> 22526:~/ $ df -H
>
>
>[18:15:20]
> FilesystemSize  Used Avail Use% Mounted on
> /dev/mapper/vg00-rootlv00
>   2.1G  777M  1.2G  41% /
> tmpfs 2.1G  753M  1.4G  37% /dev/shm
>
> On Mon, Feb 17, 2020 at 7:13 PM Benchao Li  wrote:
>
>> Hi Richard,
>>
>> Have you checked that inodes of the disk partition were full or not?
>>
>> Richard Moorhead  于2020年2月18日周二 上午8:16写道:
>>
>>> I see the following exception often:
>>>
>>> 2020-02-17 18:13:26,796 ERROR
>>> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher  -
>>> Failure while fetching/processing job archive for job
>>> eaf0639027aca1624adaa100bdf1332e.
>>> java.nio.file.FileSystemException:
>>> /dev/shm/flink-history-server/jobs/eaf0639027aca1624adaa100bdf1332e/vertices/6abf3ed37d1a5e48f2786b832033f074/subtasks/86/attempts:
>>> No space left on device
>>> at
>>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
>>> at
>>> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>>> at
>>> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>>> at
>>> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
>>> at java.nio.file.Files.createDirectory(Files.java:674)
>>> at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
>>> at java.nio.file.Files.createDirectories(Files.java:767)
>>> at
>>> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:186)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at
>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> Unfortunately the partition listed does not appear to be full or
>>> anywhere near full?
>>>
>>> Is there a workaround to this?
>>>
>>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: job history server

2020-02-17 Thread Richard Moorhead
Yes, I did. I mentioned it last but I should have been clearer:

22526:~/ $ df -H


   [18:15:20]
FilesystemSize  Used Avail Use% Mounted on
/dev/mapper/vg00-rootlv00
  2.1G  777M  1.2G  41% /
tmpfs 2.1G  753M  1.4G  37% /dev/shm

On Mon, Feb 17, 2020 at 7:13 PM Benchao Li  wrote:

> Hi Richard,
>
> Have you checked that inodes of the disk partition were full or not?
>
> Richard Moorhead  于2020年2月18日周二 上午8:16写道:
>
>> I see the following exception often:
>>
>> 2020-02-17 18:13:26,796 ERROR
>> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher  -
>> Failure while fetching/processing job archive for job
>> eaf0639027aca1624adaa100bdf1332e.
>> java.nio.file.FileSystemException:
>> /dev/shm/flink-history-server/jobs/eaf0639027aca1624adaa100bdf1332e/vertices/6abf3ed37d1a5e48f2786b832033f074/subtasks/86/attempts:
>> No space left on device
>> at
>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
>> at
>> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>> at
>> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>> at
>> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
>> at java.nio.file.Files.createDirectory(Files.java:674)
>> at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
>> at java.nio.file.Files.createDirectories(Files.java:767)
>> at
>> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:186)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> Unfortunately the partition listed does not appear to be full or anywhere
>> near full?
>>
>> Is there a workaround to this?
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: job history server

2020-02-17 Thread Benchao Li
Hi Richard,

Have you checked that inodes of the disk partition were full or not?

Richard Moorhead  于2020年2月18日周二 上午8:16写道:

> I see the following exception often:
>
> 2020-02-17 18:13:26,796 ERROR
> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher  -
> Failure while fetching/processing job archive for job
> eaf0639027aca1624adaa100bdf1332e.
> java.nio.file.FileSystemException:
> /dev/shm/flink-history-server/jobs/eaf0639027aca1624adaa100bdf1332e/vertices/6abf3ed37d1a5e48f2786b832033f074/subtasks/86/attempts:
> No space left on device
> at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
> at java.nio.file.Files.createDirectory(Files.java:674)
> at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
> at java.nio.file.Files.createDirectories(Files.java:767)
> at
> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:186)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> Unfortunately the partition listed does not appear to be full or anywhere
> near full?
>
> Is there a workaround to this?
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


job history server

2020-02-17 Thread Richard Moorhead
I see the following exception often:

2020-02-17 18:13:26,796 ERROR
org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher  -
Failure while fetching/processing job archive for job
eaf0639027aca1624adaa100bdf1332e.
java.nio.file.FileSystemException:
/dev/shm/flink-history-server/jobs/eaf0639027aca1624adaa100bdf1332e/vertices/6abf3ed37d1a5e48f2786b832033f074/subtasks/86/attempts:
No space left on device
at
sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at
sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
at java.nio.file.Files.createDirectory(Files.java:674)
at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
at java.nio.file.Files.createDirectories(Files.java:767)
at
org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:186)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Unfortunately the partition listed does not appear to be full or anywhere
near full?

Is there a workaround to this?


AW: Process stream multiple time with different KeyBy

2020-02-17 Thread theo.diefent...@scoop-software.de
Hi Sebastian,
I'd also highly recommend a recent Flink blog post to you where exactly this 
question was answered in quote some detail :
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
Best regardsTheo
 Ursprüngliche Nachricht 
Von: Eduardo Winpenny Tejedor 
Datum: Mo., 17. Feb. 2020, 21:07
An: Lehuede sebastien 
Cc: user 
Betreff: Re: Process stream multiple time with different KeyBy


Hi Sebastien,

Without being entirely sure of what's your use case/end goal I'll tell
you (some of) the options Flink provides you for defining a flow.

If your use case is to apply the same rule to each of your "swimlanes"
of data (one with category=foo AND subcategory=bar, another with
category=foo and another with category=bar) you can do this by
implementing your own org.apache.flink.api.java.functions.KeySelector
function for the keyBy function. You'll just need to return a
different key for each of your rules and the data will separate to the
appropriate "swimlane".

If your use case is to apply different rules to each swimlane then you
can write a ProcessFunction that redirects elements to different *side
outputs*. You can then apply different operations to each side output.

Your application could get tricky to evolve IF the number of swimlanes
or the operators are meant to change over time, you'd have to be
careful how the existing state fits into your new flows.

Regards,
Eduardo

On Mon, Feb 17, 2020 at 7:06 PM Lehuede sebastien  wrote:
>
> Hi all,
>
> I'm currently working on a Flink Application where I match events against a 
> set of rules. At the beginning I wanted to dynamically create streams 
> following the category of events (Event are JSON formatted and I've a field 
> like "category":"foo" in each event) but I'm stuck by the impossibility to 
> create streams at runtime.
>
> So, one of the solution for me is to create a single Kafka topic and then use 
> the "KeyBy" operator to match events with "category":"foo" against rules also 
> containing "category":"foo" in rule specification.
>
> Now I have some cases where events and rules have one category and one 
> subcategory. At this point I'm not sure about the "KeyBy" operator behavior.
>
> Example :
>
> Events have : "category":"foo" AND "subcategory":"bar"
> Rule1 specification has : "category":"foo" AND "subcategory":"bar"
> Rule2 specification has : "category':"foo"
> Rule3 specification has : "category":"bar"
>
> In this case, my events need to be match against Rule1, Rule2 and Rule3.
>
> If I'm right, if I apply a multiple key "KeyBy()" with "category" and 
> "subcategory" fields and then apply two single key "KeyBy()" with "category" 
> field, my events will be consumed by the first "KeyBy()" operator and no 
> events will be streamed in the operators after ?
>
> Is there any way to process the same stream one time for multi key KeyBy() 
> and another time for single key KeyBy() ?
>
> Thanks !
> Sébastien.


Re: Process stream multiple time with different KeyBy

2020-02-17 Thread Eduardo Winpenny Tejedor
Hi Sebastien,

Without being entirely sure of what's your use case/end goal I'll tell
you (some of) the options Flink provides you for defining a flow.

If your use case is to apply the same rule to each of your "swimlanes"
of data (one with category=foo AND subcategory=bar, another with
category=foo and another with category=bar) you can do this by
implementing your own org.apache.flink.api.java.functions.KeySelector
function for the keyBy function. You'll just need to return a
different key for each of your rules and the data will separate to the
appropriate "swimlane".

If your use case is to apply different rules to each swimlane then you
can write a ProcessFunction that redirects elements to different *side
outputs*. You can then apply different operations to each side output.

Your application could get tricky to evolve IF the number of swimlanes
or the operators are meant to change over time, you'd have to be
careful how the existing state fits into your new flows.

Regards,
Eduardo

On Mon, Feb 17, 2020 at 7:06 PM Lehuede sebastien  wrote:
>
> Hi all,
>
> I'm currently working on a Flink Application where I match events against a 
> set of rules. At the beginning I wanted to dynamically create streams 
> following the category of events (Event are JSON formatted and I've a field 
> like "category":"foo" in each event) but I'm stuck by the impossibility to 
> create streams at runtime.
>
> So, one of the solution for me is to create a single Kafka topic and then use 
> the "KeyBy" operator to match events with "category":"foo" against rules also 
> containing "category":"foo" in rule specification.
>
> Now I have some cases where events and rules have one category and one 
> subcategory. At this point I'm not sure about the "KeyBy" operator behavior.
>
> Example :
>
> Events have : "category":"foo" AND "subcategory":"bar"
> Rule1 specification has : "category":"foo" AND "subcategory":"bar"
> Rule2 specification has : "category':"foo"
> Rule3 specification has : "category":"bar"
>
> In this case, my events need to be match against Rule1, Rule2 and Rule3.
>
> If I'm right, if I apply a multiple key "KeyBy()" with "category" and 
> "subcategory" fields and then apply two single key "KeyBy()" with "category" 
> field, my events will be consumed by the first "KeyBy()" operator and no 
> events will be streamed in the operators after ?
>
> Is there any way to process the same stream one time for multi key KeyBy() 
> and another time for single key KeyBy() ?
>
> Thanks !
> Sébastien.


Process stream multiple time with different KeyBy

2020-02-17 Thread Lehuede sebastien
Hi all,

I'm currently working on a Flink Application where I match events against a
set of rules. At the beginning I wanted to dynamically create streams
following the category of events (Event are JSON formatted and I've a field
like "category":"foo" in each event) but I'm stuck by the impossibility to
create streams at runtime.

So, one of the solution for me is to create a single Kafka topic and then
use the "KeyBy" operator to match events with "category":"foo" against
rules also containing "category":"foo" in rule specification.

Now I have some cases where events and rules have one category and one
subcategory. At this point I'm not sure about the "KeyBy" operator behavior.

Example :

   - Events have : "category":"foo" AND "subcategory":"bar"
   - Rule1 specification has : "category":"foo" AND "subcategory":"bar"
   - Rule2 specification has : "category':"foo"
   - Rule3 specification has : "category":"bar"

In this case, my events need to be match against Rule1, Rule2 and Rule3.

If I'm right, if I apply a multiple key "KeyBy()" with "category" and
"subcategory" fields and then apply two single key "KeyBy()" with
"category" field, my events will be consumed by the first "KeyBy()"
operator and no events will be streamed in the operators after ?

Is there any way to process the same stream one time for multi key KeyBy()
and another time for single key KeyBy() ?

Thanks !
Sébastien.


Re: [Flink 1.10] Classpath doesn't include custom files in lib/

2020-02-17 Thread Maxim Parkachov
Hi Yang,

I've just tried your suggestions, but, unfortunately, in yarn per job mode
it doesn't work, both commands return null.
I double checked that file is shipped to yarn container, but I feel that it
happens later in process.
At the moment I'm reading file with File interface, instead of getting it
as resource, which I do in local mode.

Regards,
Maxim.


On Mon, Feb 17, 2020 at 3:03 PM Yang Wang  wrote:

> Hi Maxim,
>
> I have verified that the following two ways could both work.
>
> getClass().getClassLoader().getResource("lib/job.properties")
> getClass().getClassLoader().getResource("job.properties")
>
>
> Best,
> Yang
>
> Maxim Parkachov  于2020年2月17日周一 下午6:47写道:
>
>> Hi Yang,
>>
>> thanks, this explains why classpath behavior changed, but now I struggle
>> to
>> understand how I could overwrite resource, which is already shipped in
>> job jar.
>>
>> Before I had job.properties files in JAR in under
>> resources/lib/job.properties
>> for local development and deploying on cluster it was overwritten
>> with environment specific settings in  lib/job.properties of flink
>> distribution.
>> Now this doesn't seem to work.  I'm using:
>>
>> getClass.getClassLoader.getResource("lib/job.properties")
>>
>> to get file. Could it be the problem ?
>>
>> Thanks,
>> Maxim.
>>
>> On Mon, Feb 17, 2020 at 4:12 AM Yang Wang  wrote:
>>
>>> Hi Maxim Parkachov,
>>>
>>> The users files also have been shipped to JobManager and TaskManager.
>>> However, it
>>> is not directly added to the classpath. Instead, the parent directory is
>>> added to the
>>> classpath. This changes are to make resource classloading work. You
>>> could check more
>>> information here[1].
>>>
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-13127
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Maxim Parkachov  于2020年2月15日周六 上午12:58写道:
>>>
 Hi everyone,

 I'm trying to run my job with flink 1.10 with YARN cluster per-job
 mode. In the previous versions all files in lib/ folder were automatically
 included in classpath. Now, with 1.10 I see only *.jar files are included
 in classpath. but not "other" files. Is this deliberate change or bug ?

 Generally, what is recommended way to include custom files in classpath
 and ship it during start to all containers ?

 Thank

>>>


Re: Batch reading from Cassandra. How to?

2020-02-17 Thread Till Rohrmann
Hi Lasse,

as far as I know, the best way to read from Cassandra is to use the
CassandraInputFormat [1]. Unfortunately, there is no such optimized way to
read a large amount of data as Spark offers it at the moment. But if you
want to contribute this feature to Flink, then the community would highly
appreciate it.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java

Cheers,
Till

On Fri, Feb 14, 2020 at 11:04 AM Lasse Nedergaard 
wrote:

> Any good suggestions?
>
> Lasse
>
> Den tir. 11. feb. 2020 kl. 08.48 skrev Lasse Nedergaard <
> lassenederga...@gmail.com>:
>
>> Hi.
>>
>> We would like to do some batch analytics on our data set stored in
>> Cassandra and are looking for an efficient way to load data from a single
>> table. Not by key, but random 15%, 50% or 100%
>> Data bricks has create an efficient way to load Cassandra data into
>> Apache Spark and they are doing it by reading from the underlying SS tables
>> to load in parallel.
>> Do we have something similarly in Flink, or how is the most efficient way
>> to load all, or many random data from a single Cassandra table into Flink?
>>
>> Any suggestions and/or recommendations is highly appreciated.
>>
>> Thanks in advance
>>
>> Lasse Nedergaard
>>
>


Re: Test sink behaviour

2020-02-17 Thread Till Rohrmann
Hi David,

if you want to test the behavior together with S3, then you could check
that S3 contains a file after the job has completed.

If you want to test the failure and retry behaviour, then I would suggest
to introduce an own abstraction for the S3 access which you can control.
That way you can provide a testing implementation which imitates the
described behaviour (first being not available and later being reachable).
That way you should be able to test the behaviour pretty well w/o having to
access metrics.

Cheers,
Till

On Thu, Feb 13, 2020 at 7:41 PM David Magalhães 
wrote:

> Hi, I've created a CustomSink that writes parquet file to S3. Inside the
> `invoke` method I have a loop to check if S3 is down, and if it is it will
> wait exponentially until it is online again.
>
> Now I want to write a test for this, and I can execute everything and see
> that the Sink is doing what is suppose to do, but I can't have a way to
> validate that is doing that programmatically (in a integration test).
>
> One of the possibilities I was thinking was check the LazyLogger errors,
> to verify that something was printed, but I can't mock Logger, since it is
> final. Since I expose the number of errors as a counter, I was trying to
> find a way to access it directly with Scala, but the only way I could find
> was via Rest API, and that is kind of a hack.
>
> Exemple:
>
> - Get the Rest API port
> with 
> flinkCluster.getClusterClient.getFlinkConfiguration.getInteger("rest.port",
> 0)
> - Get the jobId via http://localhost:61869/jobs/
> - Get the verticeId via
> http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873
> - Get the metric via
> http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873/vertices/0a448493b4782967b150582570326227/metrics/?get=0.Sink__Unnamed.errors_sink
>
> Should be available a better way to get the metric or test this ?
>
> Thanks
>


Re: [Flink 1.10] Classpath doesn't include custom files in lib/

2020-02-17 Thread Yang Wang
Hi Maxim,

I have verified that the following two ways could both work.

getClass().getClassLoader().getResource("lib/job.properties")
getClass().getClassLoader().getResource("job.properties")


Best,
Yang

Maxim Parkachov  于2020年2月17日周一 下午6:47写道:

> Hi Yang,
>
> thanks, this explains why classpath behavior changed, but now I struggle to
> understand how I could overwrite resource, which is already shipped in job
> jar.
>
> Before I had job.properties files in JAR in under
> resources/lib/job.properties
> for local development and deploying on cluster it was overwritten
> with environment specific settings in  lib/job.properties of flink
> distribution.
> Now this doesn't seem to work.  I'm using:
>
> getClass.getClassLoader.getResource("lib/job.properties")
>
> to get file. Could it be the problem ?
>
> Thanks,
> Maxim.
>
> On Mon, Feb 17, 2020 at 4:12 AM Yang Wang  wrote:
>
>> Hi Maxim Parkachov,
>>
>> The users files also have been shipped to JobManager and TaskManager.
>> However, it
>> is not directly added to the classpath. Instead, the parent directory is
>> added to the
>> classpath. This changes are to make resource classloading work. You could
>> check more
>> information here[1].
>>
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-13127
>>
>>
>> Best,
>> Yang
>>
>> Maxim Parkachov  于2020年2月15日周六 上午12:58写道:
>>
>>> Hi everyone,
>>>
>>> I'm trying to run my job with flink 1.10 with YARN cluster per-job mode.
>>> In the previous versions all files in lib/ folder were automatically
>>> included in classpath. Now, with 1.10 I see only *.jar files are included
>>> in classpath. but not "other" files. Is this deliberate change or bug ?
>>>
>>> Generally, what is recommended way to include custom files in classpath
>>> and ship it during start to all containers ?
>>>
>>> Thank
>>>
>>


Parallelize Kafka Deserialization of a single partition?

2020-02-17 Thread Theo Diefenthal
Hi, 

As for most pipelines, our flink pipeline start with parsing source kafka 
events into POJOs. We perform this step within a KafkaDeserizationSchema so 
that we properly extract the event itme timestamp for the downstream 
Timestamp-Assigner. 

Now it turned out that parsing is currently the most CPU intensive task in our 
pipeline and thus CPU bounds the number of elements we can ingest per second. 
Further splitting up the partitions will be hard as we need to maintain the 
exact order of events per partition and would also required quite some 
architectural changes for producers and the flink job. 

Now I had the idea to put the parsing task into ordered Async-IO. But AsyncIO 
can only be plugged in into an existing Stream, not into the deserialization 
schema, as far as I see. So the best idea I currently have is to keep parsing 
in the DeserializationSchema as minimal as possible to extract the Event 
timestamp and do the full parsing downstream in Async IO. This however, seems 
to be a bit tedious, especially as we have to deal with multiple input formats 
and would need to develop two parsers for the heavy load once: a timestamp only 
and a full parser. 

Do you know if it is somehow possible to parallelize / async IO the parsing 
within the KafkaDeserializationSchema? I don't have state access in there and I 
don't have a "collector" object in there so that one element as input needs to 
produce exactly one output element. 

Another question: My parsing produces Java POJO objects via "new", which are 
sent downstream (reusePOJO setting set) and finally will be garbage collected 
once they reached the sink. Is there some mechanism in Flink so that I could 
reuse "old" sinked POJOs in the source? All tasks are chained so that 
theoretically, that could be possible? 

Best regards 
Theo 


Re: [Flink 1.10] Classpath doesn't include custom files in lib/

2020-02-17 Thread Maxim Parkachov
Hi Yang,

thanks, this explains why classpath behavior changed, but now I struggle to
understand how I could overwrite resource, which is already shipped in job
jar.

Before I had job.properties files in JAR in under
resources/lib/job.properties
for local development and deploying on cluster it was overwritten
with environment specific settings in  lib/job.properties of flink
distribution.
Now this doesn't seem to work.  I'm using:

getClass.getClassLoader.getResource("lib/job.properties")

to get file. Could it be the problem ?

Thanks,
Maxim.

On Mon, Feb 17, 2020 at 4:12 AM Yang Wang  wrote:

> Hi Maxim Parkachov,
>
> The users files also have been shipped to JobManager and TaskManager.
> However, it
> is not directly added to the classpath. Instead, the parent directory is
> added to the
> classpath. This changes are to make resource classloading work. You could
> check more
> information here[1].
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-13127
>
>
> Best,
> Yang
>
> Maxim Parkachov  于2020年2月15日周六 上午12:58写道:
>
>> Hi everyone,
>>
>> I'm trying to run my job with flink 1.10 with YARN cluster per-job mode.
>> In the previous versions all files in lib/ folder were automatically
>> included in classpath. Now, with 1.10 I see only *.jar files are included
>> in classpath. but not "other" files. Is this deliberate change or bug ?
>>
>> Generally, what is recommended way to include custom files in classpath
>> and ship it during start to all containers ?
>>
>> Thank
>>
>


Flink's Either type information

2020-02-17 Thread jacopo.gobbi
Hi all,

How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on 
Either type as it does not contain information about the 'left' type." when 
doing: out.collect(Either.Right(myObject));

Thanks,

Jacopo Gobbi

Visit our website at http://www.ubs.com

This message contains confidential information and is intended only 
for the individual named.  If you are not the named addressee you 
should not disseminate, distribute or copy this e-mail.  Please 
notify the sender immediately by e-mail if you have received this 
e-mail by mistake and delete this e-mail from your system.

E-mails are not encrypted and cannot be guaranteed to be secure or 
error-free as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses.  The sender 
therefore does not accept liability for any errors or omissions in the 
contents of this message which arise as a result of e-mail transmission.  
If verification is required please request a hard-copy version.  This 
message is provided for informational purposes and should not be 
construed as a solicitation or offer to buy or sell any securities 
or related financial instruments.

UBS reserves the right to retain all messages. Messages are protected
and accessed only in legally justified cases.

For information on how UBS processes and keeps secure your personal
data, please visit our Privacy Notice at
https://www.ubs.com/global/en/legal/privacy.html

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

2020-02-17 Thread Piotr Nowojski
Hey, sorry but I know very little about the KafkaConsumer. I hope that someone 
else might know more.

However, did you try to google this issue? It doesn’t sound like Flink specific 
problem, but like a general Kafka issue. Also a solution might be just as 
simple as bumping the limit of opened files on the unix system (ulimit command 
if I remember correctly?)

Piotrek

> On 14 Feb 2020, at 23:35, John Smith  wrote:
> 
> Hi Piotr, any thoughts on this?
> 
> On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas,  > wrote:
> Hi John,
> 
> As you suggested, I would also lean towards increasing the number of
> allowed open handles, but
> for recommendation on best practices, I am cc'ing Piotr who may be
> more familiar with the Kafka consumer.
> 
> Cheers,
> Kostas
> 
> On Tue, Feb 11, 2020 at 9:43 PM John Smith  > wrote:
> >
> > Just wondering is this on the client side in the flink Job? I rebooted the 
> > task and the job deployed correctly on another node.
> >
> > Is there a specific ulimit that we should set for flink tasks nodes?
> >
> > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> > at 
> > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
> > at 
> > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:650)
> > at 
> > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630)
> > at 
> > org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> > at 
> > org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> > at 
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> > at 
> > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> > at 
> > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too 
> > many open files
> > at org.apache.kafka.common.network.Selector.(Selector.java:154)
> > at org.apache.kafka.common.network.Selector.(Selector.java:188)
> > at org.apache.kafka.common.network.Selector.(Selector.java:192)
> > at 
> > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:722)
> > ... 11 more
> > Caused by: java.io.IOException: Too many open files
> > at sun.nio.ch.IOUtil.makePipe(Native Method)
> > at sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:65)
> > at sun.nio.ch 
> > .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at org.apache.kafka.common.network.Selector.(Selector.java:152)
> > ... 14 more



Re: FlinkCEP questions - architecture

2020-02-17 Thread Kostas Kloudas
Hi Juergen,

I will reply to your questions inline. As a general comment I would
suggest to also have a look at [3] so that you have an idea of some of
the alternatives.
With that said, here come the answers :)

1) We receive files every day, which are exports from some database
tables, containing ONLY changes from the day. Most tables have
modify-cols. Even though they are files but because they contain
changes only, I belief the file records shall be considered events in
Flink terminology. Is that assumption correct?

-> Yes. I think your assumption is correct.

2) The records within the DB export files are NOT in chronologically,
and we can not change the export. Our use case is a "complex event
processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
A, then B, then C within 30 days, then do something". Does that work
with FlinkCEP despite the events/records are not in chrono order
within the file? The files are 100MB to 20GB in size. Do I need to
sort the files first before CEP processing?

-> Flink CEP also works in event time and the re-ordering can be done by Flink

3) Occassionally some crazy people manually "correct" DB records
within the database and manually trigger a re-export of ALL of the
changes for that respective day (e.g. last weeks Tuesday).
Consequently we receive a correction file. Same filename but "_1"
appended. All filenames include the date (of the original export).
What are the options to handle that case (besides telling the DB
admins not to, which we did already). Regular checkpoints and
re-process all files since then?  What happens to the CEP state? Will
it be checkpointed as well?

-> If you require re-processing, then I would say that your best
option is what you described. The other option would be to keep
everything in Flink state until you are sure that no more corrections
will come. In this case, you have to somehow issue the "correction" in
a way that the downstream system can understand what to correct and
how. Keep in mind that this may be an expensive operation because
everything has to be kept in state for longer.

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

-> The only thing to consider is the size of your state. Time is not
necessarily an issue. If your state for these 180 days is a couple of
MBs, then you have no problem. If it increases fast, then you have to
provision your cluster accordingly.

5) We also have CEP rules that must fire if after a start sequence
matched, the remaining sequence did NOT within a configured window.
E.g. If A, then B, but C did not occur within 30 days since A. Is that
supported by FlinkCEP? I couldn't find a working example.

-> You can have a look at [1] for the supported pattern combinations
and you can also look at [2] for some tests of different pattern
combinations.

6) We expect 30-40 CEP rules. How can we estimate the required storage
size for the temporary CEP state? Is there some sort of formular
considering number of rules, number of records per file or day, record
size, window, number of records matched per sequence, number of keyBy
grouping keys, ...

-> In FlinkCEP, each pattern becomes a single operator. This means
that you will have 30-40 operators in your job graph, each with each
own state. This can become heavy but once again it depends on your
workload. I cannot give an estimate because in CEP, in order to
guarantee correct ordering of events in an unordered stream, the
library sometimes has to keep also in state more records than will be
presented at the end.

Have you considered going with a solution based on processfunction and
broadcast state? This will also allow you to have a more dynamic
set-up where patterns can be added at runtime and it will allow you to
do any optimizations specific to your workload ;) For a discussion on
this, check [3]. In addition, it will allow you to "multiplex" many
patterns into a single operator thus potentially minimizing the amount
of copies of the state you keep.

7) I can imagine that for debugging reasons it'd be good if we were
able to query the temporary CEP state. What is the (CEP) schema used
to persist the CEP state and how can we query it? And does such query
work on the whole cluster or only per node (e.g. because of shuffle
and nodes responsible only for a portion of the events).

-> Unfortunatelly the state in CEP is not queryable, thus I am not
sure if you can inspect it at runtime.

8) I understand state is stored per node. What happens if I want to
add or remove a nodes. Will the state still be found, despite it being
stored in another node? I read that I need to be equally careful when
changing rules? Or is that a different issue?

-> Rescaling a Flink job is not done automatically. You need to take a
savepoint and then relaunch your job with a different parallelism.
Updating a rule is not supported in CEP, as changing a rule would
imply that (potentially) the state should change. But what you could
do is take a savepo