Re: Issue with HybridSource recovering from Savepoint

2022-05-09 Thread Arvid Heise
I'm not sure why recovery from a savepoint would be different than from a
checkpoint but if you look for a savepoint test case, PTAL at [1].

I rather think you found some edge case in your recovery setup. Changed
degree of parallelism certainly sounds like the most likely option. Or did
you upgrade versions while recovering from a savepoint?

[1]
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java#L393-L406

On Mon, May 9, 2022 at 1:42 AM Thomas Weise  wrote:

> One more question: Are you changing the parallelism when resuming from
> savepoint?
>
> On Sun, May 8, 2022 at 4:05 PM Thomas Weise  wrote:
> >
> > Hi Kevin,
> >
> > Unfortunately I did not find a way to test the savepoint scenario with
> > the MiniCluster. Savepoints are not supported in the embedded mode.
> > There is a way to hack around that, but then the state of the
> > enumerator won't be handled.
> >
> > As for your original issue, is it reproducible consistently? Can you
> > capture the debug log of the enumerator?
> >
> > Thanks,
> > Thomas
> >
> > On Wed, May 4, 2022 at 10:05 AM Martijn Visser 
> wrote:
> > >
> > > Hi Kevin,
> > >
> > > I'm hoping that @Thomas Weise could help with the issue regarding the
> recovery from the savepoint.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Wed, 4 May 2022 at 17:05, Kevin Lam  wrote:
> > >>
> > >> Following up on this, is there a good way to debug restoring from
> savepoints locally? We currently have a set-up where we use IntelliJ to run
> and test our pipelines locally, but would like an API to be able to specify
> the savepoint to restore from, without needing to spin up a full cluster.
> > >>
> > >> In intelliJ we just use the build and run functionality, and don't
> have access to the Flink CLI.
> > >>
> > >> On Tue, May 3, 2022 at 2:48 PM Kevin Lam 
> wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> We're encountering an error using a HybridSource that is composed of
> a FileSource + KafkaSource, only when recovering from a savepoint [0]. This
> HybridSource is used to read from a Kafka topic's archives hosted on GCS
> via a bounded FileSource, and then automatically switch over to the data
> stream from the Kafka associated topic.
> > >>>
> > >>> Has anyone seen this error before?
> > >>>
> > >>> [0]
> > >>> ```
> > >>> 2022-05-03 09:47:57
> > >>> org.apache.flink.util.FlinkException: Global failure triggered by
> OperatorCoordinator for 'Source: ShopAppTrackingEventUpdate_V1' (operator
> afb3208349a953c47059c1994f800aa2).
> > >>> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
> > >>> at
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
> > >>> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
> > >>> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:358)
> > >>> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> > >>> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > >>> at java.base/java.lang.Thread.run(Unknown Source)
> > >>> Caused by: java.lang.NullPointerException: Source for index=0 not
> available
> > >>> at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> > >>> at
> org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(SwitchedSources.java:36)
> > >>> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:148)
> > >>> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:222)
> > >>> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)
> > >>> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)
> > >>> ... 3 more
> > >>> ```
>


Re: Practical guidance with Scala and Flink >= 1.15

2022-05-09 Thread Robert Metzger
Hi Salva,
my somewhat wild guess (because I'm not very involved with the Scala
development on Flink): I would stick with option 1 for now. It should be
easier now for the Flink community to support Scala versions past 2.12
(because we don't need to worry about scala 2.12+ support for Flink's
internal dependencies such as akka).
An argument against supporting newer Scala versions is that I'm not aware
of anybody currently working on Flink with Scala in general.

On Fri, May 6, 2022 at 6:37 PM Salva Alcántara 
wrote:

> I've always used Scala in the context of Flink. Now that Flink 1.15 has
> become Scala-free, I wonder what is the best (most practical) route for me
> moving forward. These are my options:
>
> 1. Keep using Scala 2.12 for the years to come (and upgrade to newer
> versions when the community has come up with something). How long is Flink
> expected to support Scala 2.12?
>
> 2. Upgrade to Scala 2.13 or Scala 3 and use the Java API directly (without
> any Scala-specific wrapper/API). How problematic will that be, especially
> regarding type information & scala-specific serializers? I hate those
> "returns" (type hints) in the Java API...
>
> 3. Switch to Java, at least for the time being...
>
> To be clear, I have a strong preference for Scala over Java, but I'm
> trying to look at the "grand scheme of things" here, and be pragmatic. I
> guess I'm not alone here, and that many people are indeed evaluating the
> same pros & cons. Any feedback will be much appreciated.
>
> Thanks in advance!
>


Re: Flink serialization errors at a batch job

2022-05-09 Thread Robert Metzger
Hi,

I suspect that this error is not caused by Flink code (because our
serializer stack is fairly stable, there would be more users reporting such
issues if it was a bug in Flink).
In my experience, these issues are caused by broken serializer
implementations (e.g. a serializer being used by multiple threads causing
issues; or a serializer somebow not being deterministic).

Maybe there's a bug in the "com.spotify.scio.coders.*" code? Have you
checked if these errors are known there?

On Tue, May 3, 2022 at 11:31 PM Yunus Olgun  wrote:

> Hi,
>
> We're running a large Flink batch job and sometimes it throws
> serialization errors in the middle of the job. It is always the same
> operator but the error can be different. Then the following attempts work.
> Or sometimes attempts get exhausted, then retrying the job.
>
> The job is basically reading a list of filenames, downloading them from
> GCS, doing a groupBy- reduce and then writing it. The error happens at the
> reducing operator.
>
> We use Flink 1.13.6 and Beam 2.35.
>
> 1 - Do you know what may be going wrong here or how to debug it further?
> 2 - Attempts require reading all data again. Is there any way to fasten
> the recovery time in cases like this?
>
> Thanks,
>
> >> Example stacktrace 1:
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at groupByKey@{xxx}' , caused an error: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492)
>   at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.WrappingRuntimeException: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260)
>   at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227)
>   at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
>   ... 4 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error obtaining the sorted input: Thread 
> 'SortMerger Reading Thread' terminated due to an exception: Serializer 
> consumed more bytes than the record had. This indicates broken serialization. 
> If you are using custom serialization types (Value or Writable), check their 
> serialization methods. If you are using a Kryo-serialized type, check the 
> corresponding Kryo serializer.
>   at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown 
> Source)
>   at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257)
>   ... 7 more
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer 
> consumed more bytes than the record had. This indicates broken serialization. 
> If you are using custom serialization types (Value or Writable), check their 
> serialization methods. If you are using a Kryo-serialized type, check the 
> corresponding Kryo serializer.
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown
>  Source)
>   at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
>  Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392)
>   at 
> org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(Thr

Re:WaterMark that defined in DDL does not work

2022-05-09 Thread Xuyang
Hi, Huang. I test the SQL with the connector 'datagen', and watermark exists in 
the we ui. You can change "WATERMARK FOR createtime AS createtime - INTERVAL 
'5' SECOND" to "WATERMARK FOR createtime AS createtime" and ensure all sutasks 
contain data for testing.
At 2022-05-07 16:41:36, "JianWen Huang"  wrote:
>Flink version : 1.15.0
>Flink sql :
>create table custom_kafka(
>name STRING,
>money BIGINT,
>status STRING,
>createtime TIMESTAMP(3),
>operation_ts TIMESTAMP_LTZ(3),
>WATERMARK FOR createtime AS createtime - INTERVAL '5' SECOND
>)WITH(
>'connector' = 'kafka',
>'topic' = 'flink.cdc_test',
>'scan.startup.mode' = 'earliest-offset',
>'properties.bootstrap.servers' = '*.70:9092,*.71:9092,*.72:9092',
>'format' = 'debezium-json'
>)
>
>create view  custom_day_sum  AS
>SELECT DATE_FORMAT(TUMBLE_START(createtime,INTERVAL '10'
>MINUTES),'-MM-dd')as
>date_str,SUBSTR(DATE_FORMAT(TUMBLE_END(createtime,INTERVAL '10'
>MINUTES),'HH:mm'),1,4) || '0' as time_str,sum(money) as total ,name
>FROM  custom_kafka
>where status='1'
>GROUP BY name,TUMBLE(createtime,INTERVAL '10' MINUTES)
>
>CREATE TABLE print_table (
>date_str STRING,
>time_str STRING,
>total BIGINT,
>name STRING"
>)WITH(
>'connector'='print'
>)
>
>INSERT INTO print_table
>SELECT date_str,time_str,total,name
>FROM custom_day_sum
>
>Error:
>There are not watermark shows in web ui .


Re: Practical guidance with Scala and Flink >= 1.15

2022-05-09 Thread Martijn Visser
Hi Salva,

Like Robert said, I don't expect that we will be able to drop support for
Scala 2.12 anytime soon. I do think that we should have a discussion in the
Flink community about providing Scala APIs. My opinion is that we are
probably better off to deprecate the current Scala APIs (keeping it
internal as we still have a big piece of Scala internally) and only offer
Java APIs. The Flink community lacks real Scala maintainers. I think Seth's
blog is pretty spot-on on this too [1].

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1] https://flink.apache.org/2022/02/22/scala-free.html

On Mon, 9 May 2022 at 12:24, Robert Metzger  wrote:

> Hi Salva,
> my somewhat wild guess (because I'm not very involved with the Scala
> development on Flink): I would stick with option 1 for now. It should be
> easier now for the Flink community to support Scala versions past 2.12
> (because we don't need to worry about scala 2.12+ support for Flink's
> internal dependencies such as akka).
> An argument against supporting newer Scala versions is that I'm not aware
> of anybody currently working on Flink with Scala in general.
>
> On Fri, May 6, 2022 at 6:37 PM Salva Alcántara 
> wrote:
>
>> I've always used Scala in the context of Flink. Now that Flink 1.15 has
>> become Scala-free, I wonder what is the best (most practical) route for me
>> moving forward. These are my options:
>>
>> 1. Keep using Scala 2.12 for the years to come (and upgrade to newer
>> versions when the community has come up with something). How long is Flink
>> expected to support Scala 2.12?
>>
>> 2. Upgrade to Scala 2.13 or Scala 3 and use the Java API directly
>> (without any Scala-specific wrapper/API). How problematic will that be,
>> especially regarding type information & scala-specific serializers? I hate
>> those "returns" (type hints) in the Java API...
>>
>> 3. Switch to Java, at least for the time being...
>>
>> To be clear, I have a strong preference for Scala over Java, but I'm
>> trying to look at the "grand scheme of things" here, and be pragmatic. I
>> guess I'm not alone here, and that many people are indeed evaluating the
>> same pros & cons. Any feedback will be much appreciated.
>>
>> Thanks in advance!
>>
>


Re: trigger once (batch job with streaming semantics)

2022-05-09 Thread Martijn Visser
Hi Georg,

No they wouldn't. There is no capability out of the box that lets you start
Flink in streaming mode, run everything that's available at that moment and
then stops when there's no data anymore. You would need to trigger the stop
yourself.

Best regards,

Martijn

On Fri, 6 May 2022 at 13:37, Georg Heiler  wrote:

> Hi,
>
> I would disagree:
> In the case of spark, it is a streaming application that is offering full
> streaming semantics (but with less cost and bigger latency) as it triggers
> less often. In particular, windowing and stateful semantics as well as
> late-arriving data are handled automatically using the regular streaming
> features.
>
> Would these features be available in a Flink Batch job as well?
>
> Best,
> Georg
>
> Am Fr., 6. Mai 2022 um 13:26 Uhr schrieb Martijn Visser <
> martijnvis...@apache.org>:
>
>> Hi Georg,
>>
>> Flink batch applications run until all their input is processed. When
>> that's the case, the application finishes. You can read more about this in
>> the documentation for DataStream [1] or Table API [2]. I think this matches
>> the same as Spark is explaining in the documentation.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/
>>
>> On Mon, 2 May 2022 at 16:46, Georg Heiler 
>> wrote:
>>
>>> Hi,
>>>
>>> spark
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
>>> offers a variety of triggers.
>>>
>>> In particular, it also has the "once" mode:
>>>
>>> *One-time micro-batch* The query will execute *only one* micro-batch to
>>> process all the available data and then stop on its own. This is useful in
>>> scenarios you want to periodically spin up a cluster, process everything
>>> that is available since the last period, and then shutdown the cluster. In
>>> some case, this may lead to significant cost savings.
>>>
>>> Does flink have a similar possibility?
>>>
>>> Best,
>>> Georg
>>>
>>


Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Shilpa Shankar
Hello,
We are building a flink use case where we are consuming from a kafka topic
and performing aggregations and generating alerts based on average, max,
min thresholds. We also need to notify the users when there are 0 events in
a Tumbling Event Time Windows. We are having trouble coming up with a
solution to do the same. The options we considered are below, please let us
know if there are other ideas we haven't looked into.

[1] Querable State : Save the keys in each of the Process Window Functions.
Query the state from an external application and alert when a key is
missing after the 20min time interval has expired. We see Queryable state
feature is being deprecated in the future. We do not want to go down this
path when we already know there is an EOL for it.

[2] Use Processing Time Windows :  Using Processing time instead of Event
time would have been an option if our downstream applications would send
out events in real time. Maintenances of the downstream applications,
delays etc would result in a lot of data loss which is undesirable.

Flink version : 1.14.3

Thanks,
Shilpa


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-09 Thread Robert Metzger
Sorry for joining this discussion late, and thanks for the summary Xintong!

Why are we considering a separate slack instance instead of using the ASF
Slack instance?
The ASF instance is paid, so all messages are retained forever, and quite a
few people are already on that Slack instance.
There is already a #flink channel on that Slack instance, that we could
leave as passive as it is right now, or put some more effort into it, on a
voluntary basis.
We could add another #flink-dev channel to that Slack for developer
discussions, and a private flink-committer and flink-pmc chat.

If we are going that path, we should rework the "Community" and "Getting
Help" pages and explain that the mailing lists are the "ground truth tools"
in Flink, and Slack is only there to facilitate faster communication, but
it is optional / voluntary (e.g. a committers won't respond to DMs)

All public #flink-* channels should be archived and google-indexable.
I've asked Jarek from Airflow who's maintaining
http://apache-airflow.slack-archives.org.
If we can't use slack-archives.org, it would be nice to find some
volunteers in the Flink community to hack a simple indexing tool.
The indexing part is very important for me, because of some bad experiences
with the Kubernetes experience, where most of the advanced stuff is hidden
in their Slack, and it took me a few weeks to find that goldmine of
information.

Overall, I see this as an experiment worth doing, but I would suggest
revisiting it in 6 to 12 months: We should check if really all important
decisions are mirrored to the right mailing lists, and that we get the
benefits we hoped for (more adoption, better experience for users and
developers), and that we can handle the concerns (DMs to developers,
indexing).





On Sat, May 7, 2022 at 12:22 PM Xintong Song  wrote:

> Thanks all for the valuable feedback.
>
> It seems most people are overall positive about using Slack for dev
> discussions, as long as they are properly reflected back to the MLs.
> - We definitely need a code of conduct that clearly specifies what people
> should / should not do.
> - Contributors pinging well-known reviewers /committers, I think that also
> happens now on JIRA / Github. Personally, I'd understand a no-reply as a
> "soft no". We may consider to also put that in the cod of conduct.
>
> Concerning using Slack for user QAs, it seem the major concern is that, we
> may end up repeatedly answering the same questions from different users,
> due to lack of capacity for archiving and searching historical
> conversations. TBH, I don't have a good solution for the archivability and
> searchability. I investigated some tools like Zapier [1], but none of them
> seems suitable for us. However, I'd like to share 2 arguments.
> - The purpose of Slack is to make the communication more efficient? By
> *efficient*, I mean saving time for both question askers and helpers with
> instance messages, file transmissions, even voice / video calls, etc.
> (Especially for cases where back and forth is needed, as David mentioned.)
> It does not mean questions that do not get enough attentions on MLs are now
> guaranteed to be answered immediately. We can probably put that into the
> code of conduct, and kindly guide users to first search and initiate
> questions on MLs.
> - I'd also like to share some experience from the Flink China community. We
> have 3 DingTalk groups with totally 25k members (might be less, I didn't do
> deduplication), posting hundreds of messages daily. What I'm really excited
> about is that, there are way more interactions between users & users than
> between users & developers. Users are helping each other, sharing
> experiences, sending screenshots / log files / documentations and solving
> problems together. We the developers seldom get pinged, if not proactively
> joined the conversations. The DingTalk groups are way more active compared
> to the user-zh@ ML, which I'd attribute to the improvement of interaction
> experiences. Admittedly, there are questions being repeatedly asked &
> answered, but TBH I don't think that compares to the benefit of a
> self-driven user community. I'd really love to see if we can bring such
> success to the global English-speaking community.
>
> Concerning StackOverFlow, it definitely worth more attention from the
> community. Thanks for the suggestion / reminder, Piotr & David. I think
> Slack and StackOverFlow are probably not mutual exclusive.
>
> Thank you~
>
> Xintong Song
>
>
> [1] https://zapier.com/
>
>
>
> On Sat, May 7, 2022 at 9:50 AM Jingsong Li  wrote:
>
> > Most of the open source communities I know have set up their slack
> > channels, such as Apache Iceberg [1], Apache Druid [2], etc.
> > So I think slack can be worth trying.
> >
> > David is right, there are some cases that need to communicate back and
> > forth, slack communication will be more effective.
> >
> > But back to the question, ultimately it's about whether there are
> > enough core dev

Re: Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Andrew Otto
This sounds similar to a non streaming problem we had at WMF.  We ingest
all event data from Kafka into HDFS/Hive and partition the Hive tables in
hourly directories.  If there are no events in a Kafka topic for a given
hour, we have no way of knowing if the hour has been ingested
successfully.  For all we know, the upstream producer pipeline might be
broken.

We solved this by emitting artificial 'canary' events into each topic
multiple times an hour.  The canary events producer uses the same code
pathways and services that (most) of our normal event producers do.  Then,
when ingesting into Hive, we filter out the canary events.  The ingestion
code has work to do and can mark an hour as complete, but still end up
writing no events to it.

Perhaps you could do the same?  Always emit artificial events, and filter
them out in your windowing code? The window should still fire since it will
always have events, even if you don't use them?




On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar 
wrote:

> Hello,
> We are building a flink use case where we are consuming from a kafka topic
> and performing aggregations and generating alerts based on average, max,
> min thresholds. We also need to notify the users when there are 0 events in
> a Tumbling Event Time Windows. We are having trouble coming up with a
> solution to do the same. The options we considered are below, please let us
> know if there are other ideas we haven't looked into.
>
> [1] Querable State : Save the keys in each of the Process Window
> Functions. Query the state from an external application and alert when a
> key is missing after the 20min time interval has expired. We see Queryable
> state feature is being deprecated in the future. We do not want to go down
> this path when we already know there is an EOL for it.
>
> [2] Use Processing Time Windows :  Using Processing time instead of Event
> time would have been an option if our downstream applications would send
> out events in real time. Maintenances of the downstream applications,
> delays etc would result in a lot of data loss which is undesirable.
>
> Flink version : 1.14.3
>
> Thanks,
> Shilpa
>


Re: Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Dario Heinisch
It depends on the user case,  in Shilpa's use case it is about users so 
the user ids are probably know beforehand.


https://dpaste.org/cRe3G <= This is an example with out an window but 
essentially Shilpa you would be reregistering the timers every time they 
fire.
You would also have to ingest the user ids before hand into your 
pipeline, so that if a user never has any event he still gets a 
notification. So probably on startup ingest the user ids with a single 
source

from the DB.

My example is pretty minimal but the idea in your case stays the same:

- key by user
- have a co-process function to init the state with the user ids
- reregister the timers every time they fire
- use `env.getConfig().setAutoWatermarkInterval(1000)` to move the event 
time forward even if there is no data coming in (this is what you are 
probably looking for!!)
- then collect an Optionable/CustomStruct/Null or so depending on if 
data is present or not
- and then u can check whether the event was triggered because there was 
data or because there wasn't data


Best regards,

Dario

On 09.05.22 15:19, Andrew Otto wrote:
This sounds similar to a non streaming problem we had at WMF.  We 
ingest all event data from Kafka into HDFS/Hive and partition the Hive 
tables in hourly directories.  If there are no events in a Kafka topic 
for a given hour, we have no way of knowing if the hour has been 
ingested successfully.  For all we know, the upstream producer 
pipeline might be broken.


We solved this by emitting artificial 'canary' events into each topic 
multiple times an hour.  The canary events producer uses the same code 
pathways and services that (most) of our normal event producers do.  
Then, when ingesting into Hive, we filter out the canary events.  The 
ingestion code has work to do and can mark an hour as complete, but 
still end up writing no events to it.


Perhaps you could do the same?  Always emit artificial events, and 
filter them out in your windowing code? The window should still fire 
since it will always have events, even if you don't use them?





On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar  
wrote:


Hello,
We are building a flink use case where we are consuming from a
kafka topic and performing aggregations and generating alerts
based on average, max, min thresholds. We also need to notify the
users when there are 0 events in a Tumbling Event Time Windows. We
are having trouble coming up with a solution to do the same. The
options we considered are below, please let us know if there are
other ideas we haven't looked into.

[1] Querable State : Save the keys in each of the Process Window
Functions. Query the state from an external application and alert
when a key is missing after the 20min time interval has expired.
We see Queryable state feature is being deprecated in the future.
We do not want to go down this path when we already know there is
an EOL for it.

[2] Use Processing Time Windows :  Using Processing time instead
of Event time would have been an option if our downstream
applications would send out events in real time. Maintenances of
the downstream applications, delays etc would result in a lot of
data loss which is undesirable.

Flink version : 1.14.3

Thanks,
Shilpa


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-09 Thread Martijn Visser
As far as I recall you can't sign up for the ASF instance of Slack, you can
only get there if you're a committer or if you're invited by a committer.

On Mon, 9 May 2022 at 15:15, Robert Metzger  wrote:

> Sorry for joining this discussion late, and thanks for the summary Xintong!
>
> Why are we considering a separate slack instance instead of using the ASF
> Slack instance?
> The ASF instance is paid, so all messages are retained forever, and quite
> a few people are already on that Slack instance.
> There is already a #flink channel on that Slack instance, that we could
> leave as passive as it is right now, or put some more effort into it, on a
> voluntary basis.
> We could add another #flink-dev channel to that Slack for developer
> discussions, and a private flink-committer and flink-pmc chat.
>
> If we are going that path, we should rework the "Community" and "Getting
> Help" pages and explain that the mailing lists are the "ground truth tools"
> in Flink, and Slack is only there to facilitate faster communication, but
> it is optional / voluntary (e.g. a committers won't respond to DMs)
>
> All public #flink-* channels should be archived and google-indexable.
> I've asked Jarek from Airflow who's maintaining
> http://apache-airflow.slack-archives.org.
> If we can't use slack-archives.org, it would be nice to find some
> volunteers in the Flink community to hack a simple indexing tool.
> The indexing part is very important for me, because of some bad
> experiences with the Kubernetes experience, where most of the advanced
> stuff is hidden in their Slack, and it took me a few weeks to find that
> goldmine of information.
>
> Overall, I see this as an experiment worth doing, but I would suggest
> revisiting it in 6 to 12 months: We should check if really all important
> decisions are mirrored to the right mailing lists, and that we get the
> benefits we hoped for (more adoption, better experience for users and
> developers), and that we can handle the concerns (DMs to developers,
> indexing).
>
>
>
>
>
> On Sat, May 7, 2022 at 12:22 PM Xintong Song 
> wrote:
>
>> Thanks all for the valuable feedback.
>>
>> It seems most people are overall positive about using Slack for dev
>> discussions, as long as they are properly reflected back to the MLs.
>> - We definitely need a code of conduct that clearly specifies what people
>> should / should not do.
>> - Contributors pinging well-known reviewers /committers, I think that also
>> happens now on JIRA / Github. Personally, I'd understand a no-reply as a
>> "soft no". We may consider to also put that in the cod of conduct.
>>
>> Concerning using Slack for user QAs, it seem the major concern is that, we
>> may end up repeatedly answering the same questions from different users,
>> due to lack of capacity for archiving and searching historical
>> conversations. TBH, I don't have a good solution for the archivability and
>> searchability. I investigated some tools like Zapier [1], but none of them
>> seems suitable for us. However, I'd like to share 2 arguments.
>> - The purpose of Slack is to make the communication more efficient? By
>> *efficient*, I mean saving time for both question askers and helpers with
>> instance messages, file transmissions, even voice / video calls, etc.
>> (Especially for cases where back and forth is needed, as David mentioned.)
>> It does not mean questions that do not get enough attentions on MLs are
>> now
>> guaranteed to be answered immediately. We can probably put that into the
>> code of conduct, and kindly guide users to first search and initiate
>> questions on MLs.
>> - I'd also like to share some experience from the Flink China community.
>> We
>> have 3 DingTalk groups with totally 25k members (might be less, I didn't
>> do
>> deduplication), posting hundreds of messages daily. What I'm really
>> excited
>> about is that, there are way more interactions between users & users than
>> between users & developers. Users are helping each other, sharing
>> experiences, sending screenshots / log files / documentations and solving
>> problems together. We the developers seldom get pinged, if not proactively
>> joined the conversations. The DingTalk groups are way more active compared
>> to the user-zh@ ML, which I'd attribute to the improvement of interaction
>> experiences. Admittedly, there are questions being repeatedly asked &
>> answered, but TBH I don't think that compares to the benefit of a
>> self-driven user community. I'd really love to see if we can bring such
>> success to the global English-speaking community.
>>
>> Concerning StackOverFlow, it definitely worth more attention from the
>> community. Thanks for the suggestion / reminder, Piotr & David. I think
>> Slack and StackOverFlow are probably not mutual exclusive.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1] https://zapier.com/
>>
>>
>>
>> On Sat, May 7, 2022 at 9:50 AM Jingsong Li 
>> wrote:
>>
>> > Most of the open source communities I know have set up t

Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-09 Thread Robert Metzger
Thanks a lot for your answer. The onboarding experience to the ASF Slack is
indeed not ideal:
https://apisix.apache.org/docs/general/join#join-the-slack-channel
I'll see if we can improve it

On Mon, May 9, 2022 at 3:38 PM Martijn Visser 
wrote:

> As far as I recall you can't sign up for the ASF instance of Slack, you can
> only get there if you're a committer or if you're invited by a committer.
>
> On Mon, 9 May 2022 at 15:15, Robert Metzger  wrote:
>
> > Sorry for joining this discussion late, and thanks for the summary
> Xintong!
> >
> > Why are we considering a separate slack instance instead of using the ASF
> > Slack instance?
> > The ASF instance is paid, so all messages are retained forever, and quite
> > a few people are already on that Slack instance.
> > There is already a #flink channel on that Slack instance, that we could
> > leave as passive as it is right now, or put some more effort into it, on
> a
> > voluntary basis.
> > We could add another #flink-dev channel to that Slack for developer
> > discussions, and a private flink-committer and flink-pmc chat.
> >
> > If we are going that path, we should rework the "Community" and "Getting
> > Help" pages and explain that the mailing lists are the "ground truth
> tools"
> > in Flink, and Slack is only there to facilitate faster communication, but
> > it is optional / voluntary (e.g. a committers won't respond to DMs)
> >
> > All public #flink-* channels should be archived and google-indexable.
> > I've asked Jarek from Airflow who's maintaining
> > http://apache-airflow.slack-archives.org.
> > If we can't use slack-archives.org, it would be nice to find some
> > volunteers in the Flink community to hack a simple indexing tool.
> > The indexing part is very important for me, because of some bad
> > experiences with the Kubernetes experience, where most of the advanced
> > stuff is hidden in their Slack, and it took me a few weeks to find that
> > goldmine of information.
> >
> > Overall, I see this as an experiment worth doing, but I would suggest
> > revisiting it in 6 to 12 months: We should check if really all important
> > decisions are mirrored to the right mailing lists, and that we get the
> > benefits we hoped for (more adoption, better experience for users and
> > developers), and that we can handle the concerns (DMs to developers,
> > indexing).
> >
> >
> >
> >
> >
> > On Sat, May 7, 2022 at 12:22 PM Xintong Song 
> > wrote:
> >
> >> Thanks all for the valuable feedback.
> >>
> >> It seems most people are overall positive about using Slack for dev
> >> discussions, as long as they are properly reflected back to the MLs.
> >> - We definitely need a code of conduct that clearly specifies what
> people
> >> should / should not do.
> >> - Contributors pinging well-known reviewers /committers, I think that
> also
> >> happens now on JIRA / Github. Personally, I'd understand a no-reply as a
> >> "soft no". We may consider to also put that in the cod of conduct.
> >>
> >> Concerning using Slack for user QAs, it seem the major concern is that,
> we
> >> may end up repeatedly answering the same questions from different users,
> >> due to lack of capacity for archiving and searching historical
> >> conversations. TBH, I don't have a good solution for the archivability
> and
> >> searchability. I investigated some tools like Zapier [1], but none of
> them
> >> seems suitable for us. However, I'd like to share 2 arguments.
> >> - The purpose of Slack is to make the communication more efficient? By
> >> *efficient*, I mean saving time for both question askers and helpers
> with
> >> instance messages, file transmissions, even voice / video calls, etc.
> >> (Especially for cases where back and forth is needed, as David
> mentioned.)
> >> It does not mean questions that do not get enough attentions on MLs are
> >> now
> >> guaranteed to be answered immediately. We can probably put that into the
> >> code of conduct, and kindly guide users to first search and initiate
> >> questions on MLs.
> >> - I'd also like to share some experience from the Flink China community.
> >> We
> >> have 3 DingTalk groups with totally 25k members (might be less, I didn't
> >> do
> >> deduplication), posting hundreds of messages daily. What I'm really
> >> excited
> >> about is that, there are way more interactions between users & users
> than
> >> between users & developers. Users are helping each other, sharing
> >> experiences, sending screenshots / log files / documentations and
> solving
> >> problems together. We the developers seldom get pinged, if not
> proactively
> >> joined the conversations. The DingTalk groups are way more active
> compared
> >> to the user-zh@ ML, which I'd attribute to the improvement of
> interaction
> >> experiences. Admittedly, there are questions being repeatedly asked &
> >> answered, but TBH I don't think that compares to the benefit of a
> >> self-driven user community. I'd really love to see if we can bring such
> >> success to th

Re: Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Shilpa Shankar
Thanks Andrew.
We did consider this solution too. Unfortunately we do not have permissions
to generate artificial kafka events in our ecosystem.

Dario,
Thanks for your inputs. We will give your design a try. Due the number of
events being processed per window, we are using incremental aggregate
function
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#processwindowfunction-with-incremental-aggregation.
Do you think we can use KeyedCoProcessFunction in this design?

Thanks,
Shilpa







On Mon, May 9, 2022 at 9:31 AM Dario Heinisch 
wrote:

> It depends on the user case,  in Shilpa's use case it is about users so
> the user ids are probably know beforehand.
>
> https://dpaste.org/cRe3G <= This is an example with out an window but
> essentially Shilpa you would be reregistering the timers every time they
> fire.
> You would also have to ingest the user ids before hand into your pipeline,
> so that if a user never has any event he still gets a notification. So
> probably on startup ingest the user ids with a single source
> from the DB.
>
> My example is pretty minimal but the idea in your case stays the same:
>
> - key by user
> - have a co-process function to init the state with the user ids
> - reregister the timers every time they fire
> - use `env.getConfig().setAutoWatermarkInterval(1000)` to move the event
> time forward even if there is no data coming in (this is what you are
> probably looking for!!)
> - then collect an Optionable/CustomStruct/Null or so depending on if data
> is present or not
> - and then u can check whether the event was triggered because there was
> data or because there wasn't data
>
> Best regards,
>
> Dario
> On 09.05.22 15:19, Andrew Otto wrote:
>
> This sounds similar to a non streaming problem we had at WMF.  We ingest
> all event data from Kafka into HDFS/Hive and partition the Hive tables in
> hourly directories.  If there are no events in a Kafka topic for a given
> hour, we have no way of knowing if the hour has been ingested
> successfully.  For all we know, the upstream producer pipeline might be
> broken.
>
> We solved this by emitting artificial 'canary' events into each topic
> multiple times an hour.  The canary events producer uses the same code
> pathways and services that (most) of our normal event producers do.  Then,
> when ingesting into Hive, we filter out the canary events.  The ingestion
> code has work to do and can mark an hour as complete, but still end up
> writing no events to it.
>
> Perhaps you could do the same?  Always emit artificial events, and filter
> them out in your windowing code? The window should still fire since it will
> always have events, even if you don't use them?
>
>
>
>
> On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar 
> wrote:
>
>> Hello,
>> We are building a flink use case where we are consuming from a kafka
>> topic and performing aggregations and generating alerts based on average,
>> max, min thresholds. We also need to notify the users when there are 0
>> events in a Tumbling Event Time Windows. We are having trouble coming up
>> with a solution to do the same. The options we considered are below, please
>> let us know if there are other ideas we haven't looked into.
>>
>> [1] Querable State : Save the keys in each of the Process Window
>> Functions. Query the state from an external application and alert when a
>> key is missing after the 20min time interval has expired. We see Queryable
>> state feature is being deprecated in the future. We do not want to go down
>> this path when we already know there is an EOL for it.
>>
>> [2] Use Processing Time Windows :  Using Processing time instead of Event
>> time would have been an option if our downstream applications would send
>> out events in real time. Maintenances of the downstream applications,
>> delays etc would result in a lot of data loss which is undesirable.
>>
>> Flink version : 1.14.3
>>
>> Thanks,
>> Shilpa
>>
>


FW: Rabbitmq Connection error with Flink version(1.15.0)

2022-05-09 Thread harshit.varsh...@iktara.ai
 

 

From: harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai] 
Sent: Monday, May 9, 2022 7:33 PM
To: 'user@flink.apache.org'
Cc: 'harshit.varsh...@iktara.ai'
Subject: Rabbitmq Connection error with Flink version(1.15.0)

 

Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.15.0 & using reference code from
pyflink reference code.

 

I am getting following error . 

Exception in thread "Thread-4" java.lang.NoClassDefFoundError:
com/rabbitmq/client/ConnectionFactory

Caused by: java.lang.ClassNotFoundException:
com.rabbitmq.client.ConnectionFactory

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File
"C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\
java_gateway.py", line 1159, in send_command

raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

py4j.protocol.Py4JError:
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Bu
ilder does not exist in the JVM

 

Below is my code for reference.

 

from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer, RMQConnectionConfig, RMQSource

import logging

import os

import sys

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.common import SimpleStringSchema

def main():

env = StreamExecutionEnvironment.get_execution_environment()

# checkpointing is required for exactly-once or at-least-once guarantees

 

env.enable_checkpointing(100)

rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

'flink-connector-rabbitmq-1.15.0.jar')

env.add_jars("file:///{}  ".format(rabbitmq_jar))

 

connection_config = RMQConnectionConfig.Builder() \

.set_host("localhost") \

.set_port(5672) \

.build()

 

 

stream = env \

.add_source(RMQSource(

connection_config,

'hello',

True,

SimpleStringSchema(),

)) \

.set_parallelism(1)

 

stream.print()

env.execute('main')

 

 

if __name__ == '__main__':

main()

 

 

Thanks,

Harshit

 

 



Re: Flink-SQL returning duplicate rows for some records

2022-05-09 Thread Joost Molenaar
Hi Leonard and Martijn, thanks for looking into this.

I ran into the issue on Flink 1.14.4 (with the matching
flink-sql-connector-kafka based on Scala 2.11), but reproduced the problem
today in 1.15.0 (again with the matching flink-sql-connector-kafka). I haven't
used older versions than 1.14.4.

These following debezium-json messages illustrate the problem; note that
they're published without schema and that they're all produced to Kafka with
this message key:

{"id":1}

These are the message values; first for an INSERT:

{"before":null,"after":{"id":1,"done":false,"name":"Initial
value"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104409527,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:0528:001c","commit_lsn":"0025:0528:001d","event_serial_no":1},"op":"c","ts_ms":1652104413976,"transaction":null}

Then an UPDATE on the text field:

{"before":{"id":1,"done":false,"name":"Initial
value"},"after":{"id":1,"done":false,"name":"Updated
#1"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104502837,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:05d8:0002","commit_lsn":"0025:05d8:0003","event_serial_no":2},"op":"u","ts_ms":1652104503260,"transaction":null}

Then an UPDATE on a boolean field -- this causes a duplicated row for id=1:


{"before":{"id":1,"done":false,"name":""},"after":{"id":1,"done":true,"name":"Updated
#1"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104507080,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:05f0:0002","commit_lsn":"0025:05f0:0003","event_serial_no":2},"op":"u","ts_ms":1652104508248,"transaction":null}

Another UPDATE on the text field -- this causes an update the of text
field in the second instance of the id=1 row:

{"before":{"id":1,"done":true,"name":"Updated
#1"},"after":{"id":1,"done":true,"name":"Updated
#2"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104511600,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:0608:0002","commit_lsn":"0025:0608:0003","event_serial_no":2},"op":"u","ts_ms":1652104513257,"transaction":null}

And finally a DELETE -- this causes the deletion of the second row
with id=1, but not the first:

{"before":{"id":1,"done":true,"name":"Updated
#2"},"after":null,"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104514893,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:0620:0002","commit_lsn":"0025:0620:0005","event_serial_no":1},"op":"d","ts_ms":1652104518749,"transaction":null}

(Debezium then produces a tombstone record with the same key
`{"id":1}` and value `null`.)

For reference, this is the CREATE TABLE statement for the source connector::

CREATE TABLE todo_list (
id BIGINT,
done BOOLEAN,
name STRING
)
WITH (
'connector'='kafka',
'topic'='mssql.dbo.todo_list',
'properties.bootstrap.servers'='10.88.10.10:9092',
'properties.group.id'='flinksql-todo-list',
'scan.startup.mode'='earliest-offset',
'key.format'='json',
'key.fields'='id',
'value.format'='debezium-json',
'value.debezium-json.schema-include'='false',
'value.fields-include'='EXCEPT_KEY'
);

Please let me know if there's anything else I can do to clear this up.

Kind regards,
Joost Molenaar

On Sat, 7 May 2022 at 10:26, Leonard Xu  wrote:
>
> Hi Joost
>
> Could you share your flink version and the two records in debezium-json 
> format which produced by two MS SQL UPDATE statement ?
>
> Best,
> Leonard
>
> > 2022年5月2日 下午9:59,Joost Molenaar  写道:
> >
> > Hello all,
> >
> > I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by
> > Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason,
> > Flink-SQL shows a new row when I update the boolean field, but updates the
> > row in place when I update the text field, and I'm not understanding why
> > this happens. My ultimate goal is to use Flink-SQL to do a join on records
> > that come from both sides of a 1:N relation in the foreign database, to
> > expose a more ready to consume JSON object to downstream consumers.
> >
> > The source table is defined like this in MS-SQL:
> >
> >CREATE TABLE todo_list (
> >id int IDENTITY NOT NULL,
> >done bit NOT NULL DEFAULT 0,
> >name varchar(MAX) NOT NULL,
> >CONSTRAINT PK_todo_list PRIMARY KEY (id)
> >);
> >
> > This is the configuration I'm sending to Debezium, note that I'm not
> > including the
> > JSON-schema in both keys and values:
> >
> >{
> >"

What causes a task to change parallelism?

2022-05-09 Thread Jason Politis
Good evening all,

We are running a job in flink SQL.  We've confirmed all Kafka topics that
we are sourcing from have 5 partitions.  All source tasks in the larger
DAG, of which we're only showing a small portion of it below, have a
parallelism of 5.  But for some reason, this one little guy here (to which
we can not figure out which part of the query he belongs to) decides to not
parallelize.  The task following him though IS parallelized again.

What is the most common cause for this?

Does it have anything to do with the arrows pointing to him and their
labels saying GLOBAL?

How can we go about pinpointing which part of our query belongs to that
specific task?  We have 104 tasks, so quickly pinpointing the exact part of
the query would help us out alot.

Thank you


Jason Politis
Solutions Architect, Carrera Group
carrera.io
| jpoli...@carrera.io 



Re: trigger once (batch job with streaming semantics)

2022-05-09 Thread Georg Heiler
Hi Martijn,

many thanks for this clarification. Do you know of any example somewhere
which would showcase such an approach?

Best,
Georg

Am Mo., 9. Mai 2022 um 14:45 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi Georg,
>
> No they wouldn't. There is no capability out of the box that lets you
> start Flink in streaming mode, run everything that's available at that
> moment and then stops when there's no data anymore. You would need to
> trigger the stop yourself.
>
> Best regards,
>
> Martijn
>
> On Fri, 6 May 2022 at 13:37, Georg Heiler 
> wrote:
>
>> Hi,
>>
>> I would disagree:
>> In the case of spark, it is a streaming application that is offering full
>> streaming semantics (but with less cost and bigger latency) as it triggers
>> less often. In particular, windowing and stateful semantics as well as
>> late-arriving data are handled automatically using the regular streaming
>> features.
>>
>> Would these features be available in a Flink Batch job as well?
>>
>> Best,
>> Georg
>>
>> Am Fr., 6. Mai 2022 um 13:26 Uhr schrieb Martijn Visser <
>> martijnvis...@apache.org>:
>>
>>> Hi Georg,
>>>
>>> Flink batch applications run until all their input is processed. When
>>> that's the case, the application finishes. You can read more about this in
>>> the documentation for DataStream [1] or Table API [2]. I think this matches
>>> the same as Spark is explaining in the documentation.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/
>>>
>>> On Mon, 2 May 2022 at 16:46, Georg Heiler 
>>> wrote:
>>>
 Hi,

 spark
 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
 offers a variety of triggers.

 In particular, it also has the "once" mode:

 *One-time micro-batch* The query will execute *only one* micro-batch
 to process all the available data and then stop on its own. This is useful
 in scenarios you want to periodically spin up a cluster, process everything
 that is available since the last period, and then shutdown the cluster. In
 some case, this may lead to significant cost savings.

 Does flink have a similar possibility?

 Best,
 Georg

>>>


Re: FW: Rabbitmq Connection error with Flink version(1.15.0)

2022-05-09 Thread Dian Fu
Hi Harshit,

You should use
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-rabbitmq/1.15.0/flink-sql-connector-rabbitmq-1.15.0.jar
which is a fat jar containing all the dependencies.

Regards,
Dian

On Mon, May 9, 2022 at 10:05 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

>
>
>
>
> *From:* harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai]
> *Sent:* Monday, May 9, 2022 7:33 PM
> *To:* 'user@flink.apache.org'
> *Cc:* 'harshit.varsh...@iktara.ai'
> *Subject:* Rabbitmq Connection error with Flink version(1.15.0)
>
>
>
> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.15.0 & using reference code from
> pyflink reference code.
>
>
>
> I am getting following error .
>
> Exception in thread "Thread-4" java.lang.NoClassDefFoundError:
> com/rabbitmq/client/ConnectionFactory
>
> Caused by: java.lang.ClassNotFoundException:
> com.rabbitmq.client.ConnectionFactory
>
> ERROR:root:Exception while sending command.
>
> Traceback (most recent call last):
>
>   File
> "C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\java_gateway.py",
> line 1159, in send_command
>
> raise Py4JNetworkError("Answer from Java side is empty")
>
> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
>
> py4j.protocol.Py4JError:
> org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Builder
> does not exist in the JVM
>
>
>
> Below is my code for reference.
>
>
>
> from pyflink.datastream.connectors import FlinkKafkaProducer,
> FlinkKafkaConsumer, RMQConnectionConfig, RMQSource
>
> import logging
>
> import os
>
> import sys
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.common import SimpleStringSchema
>
> def main():
>
> env = StreamExecutionEnvironment.get_execution_environment()
>
> # checkpointing is required for exactly-once or at-least-once
> guarantees
>
>
>
> env.enable_checkpointing(100)
>
> rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
> 'flink-connector-rabbitmq-1.15.0.jar')
>
> env.add_jars("file:///{}".format(rabbitmq_jar))
>
>
>
> connection_config = RMQConnectionConfig.Builder() \
>
> .set_host("localhost") \
>
> .set_port(5672) \
>
> .build()
>
>
>
>
>
> stream = env \
>
> .add_source(RMQSource(
>
> connection_config,
>
> 'hello',
>
> True,
>
> SimpleStringSchema(),
>
> )) \
>
> .set_parallelism(1)
>
>
>
> stream.print()
>
> env.execute(‘main’)
>
>
>
>
>
> if __name__ == '__main__':
>
> main()
>
>
>
>
>
> Thanks,
>
> Harshit
>
>
>
>
>


Re: OOM errors cause by the new KafkaSink API

2022-05-09 Thread Hua Wei Chen
Hi Martijn,

Thanks for your response.

> What's the Flink version that you're using?
Our Flink version is 1.14.4 and the scala version is 2.12.12.

> Could you also separate the two steps (switching from the old Kafka
interfaces to the new ones + modifying serializers) to determine which of
the two steps cause the problem?
Because the new Kafka API needs the new
serializer (KafkaRecordSerializationSchema) and seems like cannot use the
old one (KafkaSerializationSchema), we cannot separate the change into two
steps.

Best Regards,
Hua Wei

On Tue, Apr 26, 2022 at 5:03 PM Martijn Visser 
wrote:

> Hi,
>
> What's the Flink version that you're using? Could you also separate the
> two steps (switching from the old Kafka interfaces to the new ones +
> modifying serializers) to determine which of the two steps cause the
> problem?
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
>
> On Mon, 25 Apr 2022 at 17:11, Hua Wei Chen 
> wrote:
>
>> Hi Huweihua,
>>
>> Thanks for the reply. Yes, we increased memory first.
>> But we are still curious about the memory increasing with the new Kafka
>> APIs/Serilizers.
>>
>>
>> On Mon, Apr 25, 2022 at 8:38 PM huweihua  wrote:
>>
>>> Hi,
>>>
>>> You can try to increase the memory of TaskManager.
>>> If there is persistent OOM, you can dump the memory and check which part
>>> is taking up memory.
>>>
>>>
>>> 2022年4月25日 上午11:44,Hua Wei Chen  写道:
>>>
>>> Hi all,
>>>
>>> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
>>> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
>>> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*.
>>> Our Kafka settings are not changed*[4]*.
>>>
>>> The services are very stable before migration. However, we get OOM errors
>>> *[5]* after the APIs migration.
>>>
>>> Does anyone encounter the same issue? Or anyone can give us suggestions
>>> about the settings?
>>>
>>> Many Thanks!
>>>
>>> [1] Kafka | Apache Flink
>>> 
>>> [2] new Kafka APIs
>>> ```
>>>
>>> def getKafkaSource[T: TypeInformation](config: Config,
>>>topic: String,
>>>parallelism: Int,
>>>uid: String,
>>>env: StreamExecutionEnvironment,
>>>deserializer: 
>>> DeserializationSchema[T]): DataStream[T] = {
>>>   val properties = getKafkaCommonProperties(config)
>>>
>>>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
>>> config.getString("kafka.group.id"))
>>>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
>>> config.getString("kafka.session.timeout.ms"))
>>>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
>>> config.getString("kafka.receive.buffer.bytes"))
>>>
>>>   
>>> properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>>>  "360")
>>>
>>>   val source = KafkaSource.builder[T]()
>>> .setProperties(properties)
>>> .setTopics(topic)
>>> .setValueOnlyDeserializer(deserializer)
>>> 
>>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>> .build()
>>>
>>>   env
>>> .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>>> .uid(uid)
>>> .setParallelism(math.min(parallelism, env.getParallelism))
>>> .setMaxParallelism(parallelism)
>>> }
>>>
>>> def getKafkaSink[T: TypeInformation](config: Config,
>>>  serializer: 
>>> KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>>>   val properties = getKafkaCommonProperties(config)
>>>
>>>   properties.put(ProducerConfig.LINGER_MS_CONFIG, 
>>> config.getString("kafka.linger.ms"))
>>>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 
>>> config.getString("kafka.batch.size"))
>>>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
>>> config.getString("kafka.compression.type"))
>>>
>>>   KafkaSink.builder[T]()
>>> .setKafkaProducerConfig(properties)
>>> .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>>> .setRecordSerializer(serializer)
>>> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>>> .build()
>>> }
>>>
>>> ```
>>> [3] New Serializer
>>>
>>> import java.lang
>>> import java.nio.charset.StandardCharsets
>>> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
>>> import org.apache.kafka.clients.producer.ProducerRecord
>>> import com.appier.rt.short_term_score.model.UserSTState
>>>
>>> class UserSTStateSerializer(topic: String) extends 
>>> KafkaRecordSerializationSchema[UserSTState] {
>>>   override def serialize(element: UserSTState, context: 
>>> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): 
>>> ProducerRecord[Array[Byte], Array[Byte]] = {
>>> ne

Re: What causes a task to change parallelism?

2022-05-09 Thread Caizhi Weng
Hi!

I can't see the image (if there is any) in the email. But from the
description it is related to the arrow labeled GLOBAL.

A global shuffle collects all records from its upstream and aggregate them
in its downstream. There are several SQL patterns which lead to this type
of shuffle, for example aggregate functions without grouping.

How can we go about pinpointing which part of our query belongs to that
> specific task?
>

This might not be straight forward but there are clues. Each task is
assigned a task name and the task name represents what operators, functions
or columns are used in this task. For example projections and filters are
called Calc. Projected columns and filter conditions will also appear in
the task name.

Jason Politis  于2022年5月10日周二 04:45写道:

> Good evening all,
>
> We are running a job in flink SQL.  We've confirmed all Kafka topics that
> we are sourcing from have 5 partitions.  All source tasks in the larger
> DAG, of which we're only showing a small portion of it below, have a
> parallelism of 5.  But for some reason, this one little guy here (to which
> we can not figure out which part of the query he belongs to) decides to not
> parallelize.  The task following him though IS parallelized again.
>
> What is the most common cause for this?
>
> Does it have anything to do with the arrows pointing to him and their
> labels saying GLOBAL?
>
> How can we go about pinpointing which part of our query belongs to that
> specific task?  We have 104 tasks, so quickly pinpointing the exact part of
> the query would help us out alot.
>
> Thank you
>
>
> Jason Politis
> Solutions Architect, Carrera Group
> carrera.io
> | jpoli...@carrera.io 
> 
>


How to get flink to use POJO serializer when enum is present in POJO class

2022-05-09 Thread Tejas B
Hi,
I am trying to get flink schema evolution to work for me using POJO
serializer. But I found out that if an enum is present in the POJO then the
POJO serializer is not used. Example of my POJO is as follows :

public class Rule {
String id;int val;
RuleType ruleType;//Newly added field//int val2 = 0;
public Rule() {}
public Rule(String id, int val, RuleType ruleType) {
this.id = id;
this.val = val;
this.ruleType = ruleType;
//this.val2 = val2;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getVal() {
return val;
}
public void setVal(int val) {
this.val = val;
}
public RuleType getRuleType() {
return ruleType;
}
public void setRuleType(RuleType ruleType) {
this.ruleType = ruleType;
}
//public int getVal2() {//return val2;//}
//public void setVal2(int val2) {//this.val2 = val2;//}
@Overridepublic boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Rule rule = (Rule) o;
return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType;
}
@Overridepublic int hashCode() {
return Objects.hash(id, val, ruleType);
}
@Overridepublic String toString() {
return "Rule{" +
"name='" + id + '\'' +
", val=" + val +
", ruleType=" + ruleType +
'}';
}

}

RuleType is an enum class as follows :

public enum RuleType {
X,
Y,
Z

}

Now for the Rule class the schema evolution (Adding a new field called
val2), works only if I write a custom typeFactory for this class.

Is there a way that I can write typeFactory for the enum class ? Why does
the flink not recognize enum in a POJO class ?


Re: RichAsyncFunction + Cache or Map State?

2022-05-09 Thread Dan Hill
Hi.  Any advice on this?  I just hit this too.

Some ideas:
1. Manage our own separate cache (disk, Redis, etc).
2. Use two operators (first one a cache one and the second is the
RichAsyncFunction).  Have a feedback loop by using another Kafka topic or
S3 File source/sink.


On Wed, Feb 9, 2022 at 7:11 AM Clayton Wohl  wrote:

> I have a RichAsyncFunction that does async queries to an external
> database. I'm using a Guava cache within the Flink app. I'd like this Guava
> cache to be serialized with the rest of Flink state in
> checkpoint/savepoints. However, RichAsyncFunction doesn't support the state
> functionality at all.
>
> There is one Guava cache for the entire Flink app which might make this
> scenario simpler.
>
> Is there a recommended way to handle this situation?
>
> Also, the Flink MapState interface doesn't support check-and-set type
> functionality and doesn't support lock-free concurrent use like
> java.util.concurrent.ConcurrentMap and Guava's cache do. I need both of
> these features for proper concurrent operation. So even if I could use
> Flink MapState, that doesn't seem like a practical solution.
>


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-09 Thread Robert Metzger
It seems that we'd have to use invite links on the Flink website for people
to join our Slack (1)
These links can be configured to have no time-expiration, but they will
expire after 100 guests have joined.
I guess we'd have to use a URL shortener (https://s.apache.org) that we
update once the invite link expires. It's not a nice solution, but it'll
work.


(1) https://the-asf.slack.com/archives/CBX4TSBQ8/p1652125017094159


On Mon, May 9, 2022 at 3:59 PM Robert Metzger  wrote:

> Thanks a lot for your answer. The onboarding experience to the ASF Slack
> is indeed not ideal:
> https://apisix.apache.org/docs/general/join#join-the-slack-channel
> I'll see if we can improve it
>
> On Mon, May 9, 2022 at 3:38 PM Martijn Visser 
> wrote:
>
>> As far as I recall you can't sign up for the ASF instance of Slack, you
>> can
>> only get there if you're a committer or if you're invited by a committer.
>>
>> On Mon, 9 May 2022 at 15:15, Robert Metzger  wrote:
>>
>> > Sorry for joining this discussion late, and thanks for the summary
>> Xintong!
>> >
>> > Why are we considering a separate slack instance instead of using the
>> ASF
>> > Slack instance?
>> > The ASF instance is paid, so all messages are retained forever, and
>> quite
>> > a few people are already on that Slack instance.
>> > There is already a #flink channel on that Slack instance, that we could
>> > leave as passive as it is right now, or put some more effort into it,
>> on a
>> > voluntary basis.
>> > We could add another #flink-dev channel to that Slack for developer
>> > discussions, and a private flink-committer and flink-pmc chat.
>> >
>> > If we are going that path, we should rework the "Community" and "Getting
>> > Help" pages and explain that the mailing lists are the "ground truth
>> tools"
>> > in Flink, and Slack is only there to facilitate faster communication,
>> but
>> > it is optional / voluntary (e.g. a committers won't respond to DMs)
>> >
>> > All public #flink-* channels should be archived and google-indexable.
>> > I've asked Jarek from Airflow who's maintaining
>> > http://apache-airflow.slack-archives.org.
>> > If we can't use slack-archives.org, it would be nice to find some
>> > volunteers in the Flink community to hack a simple indexing tool.
>> > The indexing part is very important for me, because of some bad
>> > experiences with the Kubernetes experience, where most of the advanced
>> > stuff is hidden in their Slack, and it took me a few weeks to find that
>> > goldmine of information.
>> >
>> > Overall, I see this as an experiment worth doing, but I would suggest
>> > revisiting it in 6 to 12 months: We should check if really all important
>> > decisions are mirrored to the right mailing lists, and that we get the
>> > benefits we hoped for (more adoption, better experience for users and
>> > developers), and that we can handle the concerns (DMs to developers,
>> > indexing).
>> >
>> >
>> >
>> >
>> >
>> > On Sat, May 7, 2022 at 12:22 PM Xintong Song 
>> > wrote:
>> >
>> >> Thanks all for the valuable feedback.
>> >>
>> >> It seems most people are overall positive about using Slack for dev
>> >> discussions, as long as they are properly reflected back to the MLs.
>> >> - We definitely need a code of conduct that clearly specifies what
>> people
>> >> should / should not do.
>> >> - Contributors pinging well-known reviewers /committers, I think that
>> also
>> >> happens now on JIRA / Github. Personally, I'd understand a no-reply as
>> a
>> >> "soft no". We may consider to also put that in the cod of conduct.
>> >>
>> >> Concerning using Slack for user QAs, it seem the major concern is
>> that, we
>> >> may end up repeatedly answering the same questions from different
>> users,
>> >> due to lack of capacity for archiving and searching historical
>> >> conversations. TBH, I don't have a good solution for the archivability
>> and
>> >> searchability. I investigated some tools like Zapier [1], but none of
>> them
>> >> seems suitable for us. However, I'd like to share 2 arguments.
>> >> - The purpose of Slack is to make the communication more efficient? By
>> >> *efficient*, I mean saving time for both question askers and helpers
>> with
>> >> instance messages, file transmissions, even voice / video calls, etc.
>> >> (Especially for cases where back and forth is needed, as David
>> mentioned.)
>> >> It does not mean questions that do not get enough attentions on MLs are
>> >> now
>> >> guaranteed to be answered immediately. We can probably put that into
>> the
>> >> code of conduct, and kindly guide users to first search and initiate
>> >> questions on MLs.
>> >> - I'd also like to share some experience from the Flink China
>> community.
>> >> We
>> >> have 3 DingTalk groups with totally 25k members (might be less, I
>> didn't
>> >> do
>> >> deduplication), posting hundreds of messages daily. What I'm really
>> >> excited
>> >> about is that, there are way more interactions between users & users
>> than
>> >> between users & de