Re: Does the Kafka source perform retractions on Key?

2021-03-01 Thread Arvid Heise
Hi Rex,

yes you can go directly into Flink since 1.11.0 [1], but afaik only through
Table API/SQL currently (which you seem to be using anyways most of the
time). I'd recommend using 1.11.1+ (some bugfixes) or even 1.12.0+ (many
new useful features [2]). You can also check the main doc [3].

If you like more background, Marta talked about it on a higher level [4]
(slides [5]) and Qingsheng and Jark on a lower level as well [6].

[1]
https://flink.apache.org/news/2020/07/06/release-1.11.0.html#table-apisql-support-for-change-data-capture-cdc
[2]
https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html
[4] https://www.youtube.com/watch?v=wRIQqgI1gLA
[5]
https://noti.st/morsapaes/liQzgs/change-data-capture-with-flink-sql-and-debezium
[6] https://www.youtube.com/watch?v=5AThYUD4grA

On Mon, Mar 1, 2021 at 8:53 PM Rex Fenley  wrote:

> Thanks Arvid,
>
> I think my confusion lies in misinterpreting the meaning of CDC. We
> basically don't want CDC, we just use it to get data into a compacted Kafka
> topic where we hold the current state of the world to consume from multiple
> consumers. You have described pretty thoroughly where we want to go.
>
> One interesting part of your architecture is this "Debezium -> State
> collecting Flink job". Is there a way for Debezium to write to Flink? I
> thought it required Kafka Connect.
>
> Appreciate your feedback
>
> On Mon, Mar 1, 2021 at 12:43 AM Arvid Heise  wrote:
>
>> > We are rereading the topics, at any time we might want a completely
>> different materialized view for a different web service for some new
>> application feature. Other jobs / new jobs need to read all the up-to-date
>> rows from the databases.
>> > I still don't see how this is the case if everything just needs to be
>> overwritten by primary key. To re-emphasize, we do not care about
>> historical data.
>> Why are you reading from a CDC topic and not a log-compacted topic that
>> reflects the state then? CDC is all about history and changes.
>>
>> What i'd imagine an architecture that would work better for you:
>>
>> For each SQL table (ingress layer):
>> SQL Table -> Debezium -> State collecting Flink job -> Kafka state topic
>> (compacted)
>>
>> Analytics (processing layer):
>> Kafka state topics (compacted) -> Analytical Flink job -> Kafka state
>> topic (compacted)
>>
>> For each view (egress layer):
>> Kafka state topics (compacted) -> Aggregating Flink job -> K/V store(s)
>> -> Web application
>>
>> The ingress layer is only there to provide you log-compacted Kafka
>> topics. Then you can do a bunch of analytical queries from Kafka to Kafka.
>> Finally, you output your views to K/V stores for high-avail web
>> applications (=decoupled from processing layer).
>>
>> If that's what you already have, then my apology for not picking that up.
>> It's really important to stress that no Kafka topics ever contain CDC data
>> in this instance since you are not interested in historic data. The only
>> CDC exchange is by using the debezium connector of Flink. At this point,
>> all discussions of this thread are resolved.
>>
>>
>>
>> On Sat, Feb 27, 2021 at 9:06 PM Rex Fenley  wrote:
>>
>>> Hi Arvid,
>>>
>>> >If you are not rereading the topics, why do you compact them?
>>> We are rereading the topics, at any time we might want a completely
>>> different materialized view for a different web service for some new
>>> application feature. Other jobs / new jobs need to read all the up-to-date
>>> rows from the databases.
>>>
>>> >correctness depends on compaction < downtime
>>> I still don't see how this is the case if everything just needs to be
>>> overwritten by primary key. To re-emphasize, we do not care about
>>> historical data.
>>>
>>> >Again, a cloud-native key/value store would perform much better and be
>>> much cheaper with better SLAs
>>> Is there a cloud-native key/value store which can read from a Postgres
>>> WAL or MySQL binlog and then keep an up-to-date read marker for any
>>> materialization consumers downstream *besides* Kafka + Debezium?
>>>
>>> Appreciate all the feedback, though hopefully we can get closer to the
>>> same mental model. If there's really a better alternative here I'm all for
>>> it!
>>>
>>>
>>> On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise  wrote:
>>>
 Hi Rex,

 Your initial question was about the impact of compaction on your CDC
 application logic. I have been (unsuccessfully) trying to tell you that you
 do not need compaction and it's counterproductive.

 If you are not rereading the topics, why do you compact them? It's lost
 compute time and I/O on the Kafka brokers (which are both very valuable)
 and does not give you anything that an appropriate retention time wouldn't
 give you (=lower SSD usage). It makes the mental model more complicated. An
 aggressive compact

Re: Flink’s Kubernetes HA services - NOT working

2021-03-01 Thread Till Rohrmann
Hmm, this is strange. From the logs it looks as if certain communications
between components don't arrive at the receiver's end. I think we have to
further dig into the problem.

In order to further narrow it down, could you try to start the cluster with
using pod IPs instead of K8s services for inter component communications?
You can see here how to configure it [1]. That way we make sure that it is
not a problem of the K8s service.

[1] https://stackoverflow.com/a/66228073/4815083

Cheers,
Till

On Mon, Mar 1, 2021, 21:42 Omer Ozery  wrote:

> Hey Till
> these our flink resource definitions, as they are generated using the helm
> template command (minus log4j,metrics configuration and some sensitive data)
> ---
> # Source: flink/templates/flink-configmap.yaml
> apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: flink-config
>   labels:
> app: flink
> data:
>   flink-conf.yaml: |
> jobmanager.rpc.address: flink-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.execution.failover-strategy: region
> jobmanager.memory.process.size: 8g
> taskmanager.memory.process.size: 24g
> taskmanager.memory.task.off-heap.size: 1g
> taskmanager.numberOfTaskSlots: 4
> queryable-state.proxy.ports: 6125
> queryable-state.enable: true
> blob.server.port: 6124
> parallelism.default: 1
> state.backend.incremental: true
> state.backend: rocksdb
> state.backend.rocksdb.localdir: /opt/flink/rocksdb
> state.checkpoints.dir: file:///opt/flink/checkpoints
> classloader.resolve-order: child-first
> kubernetes.cluster-id: flink-cluster
> kubernetes.namespace: intel360-beta
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: file:///opt/flink/recovery
>
> ---
> # Source: flink/templates/flink-service.yaml
> apiVersion: v1
> kind: Service
> metadata:
>   name: flink-jobmanager
>   labels:
> {}
> spec:
>   ports:
>   - name: http-ui
> port: 8081
> targetPort: http-ui
>   - name: tcp-rpc
> port: 6123
> targetPort: tcp-rpc
>   - name: tcp-blob
> port: 6124
> targetPort: tcp-blob
>   selector:
> app: flink
> component: jobmanager
> ---
> # Source: flink/templates/flink-deployment.yaml
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-jobmanager
> spec:
>   replicas: 1
>   selector:
> matchLabels:
>   app: flink
>   component: jobmanager
>   template:
> metadata:
>   labels:
> app: flink
> component: jobmanager
>   annotations:
> checksum/config:
> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
> spec:
>   containers:
>   - name: jobmanager
> image: flink:1.12.1-scala_2.11-java11
> args: [ "jobmanager" ]
> ports:
> - name: http-ui
>   containerPort: 8081
> - name: tcp-rpc
>   containerPort: 6123
> - name: tcp-blob
>   containerPort: 6124
> resources:
>   {}
> # Environment Variables
> env:
> - name: ENABLE_CHECKPOINTING
>   value: "true"
> - name: JOB_MANAGER_RPC_ADDRESS
>   value: "flink-jobmanager"
> volumeMounts:
> - name: flink-config
>   mountPath: /opt/flink/conf/flink-conf.yaml
>   subPath: flink-conf.yaml
> # NFS mounts
> - name: flink-checkpoints
>   mountPath: "/opt/flink/checkpoints"
> - name: flink-recovery
>   mountPath: "/opt/flink/recovery"
>   volumes:
>   - name: flink-config
> configMap:
>   name: flink-config
>   # NFS volumes
>   - name: flink-checkpoints
> nfs:
>   server: "my-nfs-server.my-org"
>   path: "/my-shared-nfs-dir/flink/checkpoints"
>   - name: flink-recovery
> nfs:
>   server: "my-nfs-server.my-org"
>   path: "/my-shared-nfs-dir/flink/recovery"
> ---
> # Source: flink/templates/flink-deployment.yaml
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-taskmanager
> spec:
>   replicas: 7
>   selector:
> matchLabels:
>   app: flink
>   component: taskmanager
>   template:
> metadata:
>   labels:
> app: flink
> component: taskmanager
>   annotations:
> checksum/config:
> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
> spec:
>   containers:
>   - name: taskmanager
> image: flink:1.12.1-scala_2.11-java11
> args: [ "taskmanager" ]
> resources:
>   limits:
> cpu: 6000m
> memory: 24Gi
>   requests:
> cpu: 6000m
> memory: 24Gi
> # Environment Variables
> env:
> - name: ENABLE_CHECKPOINTING
>   value: "true"
> - name: JOB_MANAGER_RPC_ADDRESS
>   value: "flink-jobmanager"
> volumeMounts:
> - name: flin

Debugging long Flink checkpoint durations

2021-03-01 Thread Dan Hill
Hi.  Are there good ways to debug long Flink checkpoint durations?

I'm running a backfill job that runs ~10 days of data and then starts
checkpointing failing.  Since I only see the last 10 checkpoints in the
jobmaster UI, I don't see when it starts.

I looked through the text logs and didn't see much.

I assume:
1) I have something misconfigured that is causing old state is sticking
around.
2) I don't have enough resources.


Allocating tasks to specific TaskManagers

2021-03-01 Thread 황혜조
Hi,

I am looking for a way to allocate each created subTask to a specific
TaskManager.
Is there any way to force assigning tasks to specific taskManagers?

Thank you

Best regards,

Hyejo Hwang


Re: Does the Kafka source perform retractions on Key?

2021-03-01 Thread Rex Fenley
Thanks Arvid,

I think my confusion lies in misinterpreting the meaning of CDC. We
basically don't want CDC, we just use it to get data into a compacted Kafka
topic where we hold the current state of the world to consume from multiple
consumers. You have described pretty thoroughly where we want to go.

One interesting part of your architecture is this "Debezium -> State
collecting Flink job". Is there a way for Debezium to write to Flink? I
thought it required Kafka Connect.

Appreciate your feedback

On Mon, Mar 1, 2021 at 12:43 AM Arvid Heise  wrote:

> > We are rereading the topics, at any time we might want a completely
> different materialized view for a different web service for some new
> application feature. Other jobs / new jobs need to read all the up-to-date
> rows from the databases.
> > I still don't see how this is the case if everything just needs to be
> overwritten by primary key. To re-emphasize, we do not care about
> historical data.
> Why are you reading from a CDC topic and not a log-compacted topic that
> reflects the state then? CDC is all about history and changes.
>
> What i'd imagine an architecture that would work better for you:
>
> For each SQL table (ingress layer):
> SQL Table -> Debezium -> State collecting Flink job -> Kafka state topic
> (compacted)
>
> Analytics (processing layer):
> Kafka state topics (compacted) -> Analytical Flink job -> Kafka state
> topic (compacted)
>
> For each view (egress layer):
> Kafka state topics (compacted) -> Aggregating Flink job -> K/V store(s) ->
> Web application
>
> The ingress layer is only there to provide you log-compacted Kafka topics.
> Then you can do a bunch of analytical queries from Kafka to Kafka. Finally,
> you output your views to K/V stores for high-avail web applications
> (=decoupled from processing layer).
>
> If that's what you already have, then my apology for not picking that up.
> It's really important to stress that no Kafka topics ever contain CDC data
> in this instance since you are not interested in historic data. The only
> CDC exchange is by using the debezium connector of Flink. At this point,
> all discussions of this thread are resolved.
>
>
>
> On Sat, Feb 27, 2021 at 9:06 PM Rex Fenley  wrote:
>
>> Hi Arvid,
>>
>> >If you are not rereading the topics, why do you compact them?
>> We are rereading the topics, at any time we might want a completely
>> different materialized view for a different web service for some new
>> application feature. Other jobs / new jobs need to read all the up-to-date
>> rows from the databases.
>>
>> >correctness depends on compaction < downtime
>> I still don't see how this is the case if everything just needs to be
>> overwritten by primary key. To re-emphasize, we do not care about
>> historical data.
>>
>> >Again, a cloud-native key/value store would perform much better and be
>> much cheaper with better SLAs
>> Is there a cloud-native key/value store which can read from a Postgres
>> WAL or MySQL binlog and then keep an up-to-date read marker for any
>> materialization consumers downstream *besides* Kafka + Debezium?
>>
>> Appreciate all the feedback, though hopefully we can get closer to the
>> same mental model. If there's really a better alternative here I'm all for
>> it!
>>
>>
>> On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise  wrote:
>>
>>> Hi Rex,
>>>
>>> Your initial question was about the impact of compaction on your CDC
>>> application logic. I have been (unsuccessfully) trying to tell you that you
>>> do not need compaction and it's counterproductive.
>>>
>>> If you are not rereading the topics, why do you compact them? It's lost
>>> compute time and I/O on the Kafka brokers (which are both very valuable)
>>> and does not give you anything that an appropriate retention time wouldn't
>>> give you (=lower SSD usage). It makes the mental model more complicated. An
>>> aggressive compaction and a larger backlog (compaction time < application
>>> failure/restart/upgrade time) would lead to incorrect results (in the same
>>> way an inappropriate retention period may cause data loss for the same
>>> reason).
>>>
>>> The only use case for log compaction is if you're using a Kafka topic
>>> for a key/value store to serve a web application (in which case, it's
>>> usually better to take a real key/value store) but then you don't need
>>> retractions anymore but you'd simply overwrite the actual values or use
>>> tombstone records for deletions.
>>>
>>> If you consume the same topic both for web applications and Flink and
>>> don't want to use another technology for key/value store, then log
>>> compaction of retractions kinda makes sense to kill 2 birds with one stone.
>>> However, you have to live with the downsides on the Flink side (correctness
>>> depends on compaction < downtime) and on web application (deal with
>>> retractions even though they do not make any sense at that level). Again, a
>>> cloud-native key/value store would perform much better and be much cheaper

Re: [Statefun] Exception occurs during function chaining / Async function

2021-03-01 Thread Seth Wiesman
Hi Le,

I believe the issue is the bounded source[1]. Stateful Functions only
supports unbounded inputs.

Additionally, you can remove all the `synchronized` blocks from your code;
statefun handles all synchronization for you.

Seth

[1]
https://gist.github.com/flint-stone/cbc60f2d41507fdf33507ba998696134#file-example-java-L249-L251

On Fri, Feb 26, 2021 at 8:28 PM Le Xu  wrote:

> Hello!
>
> I'm getting an exception running a modified version of datastream/statefun
> example. (See exception details that follow.) The example was adapted from
> the original datastream example provided in statefun repo. I was trying to
> play with the example by chaining two functions (with the 1st function
> simulating async IO function invocation). However I'm getting an exception
> saying that the mailbox is closed before the operation finishes. I'm adding
> the source code of the example here
> 
> (Please ignore the printing statement and variable usage not making any
> sense since I'm debugging). The example works without chaining the second
> function or without using AsyncIO in any of the functions. Any ideas why
> this happens? Any suggestions would be appreciated.
>
>
> Thanks!
>
> Le
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 885ad02322104a6e36b011fadc704d2a)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 885ad02322104a6e36b011fadc704d2a)
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682)
> at
> org.apache.flink.statefun.examples.datastream.Example.main(Example.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 8 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 885ad02322104a6e36b011fadc704d2a)
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postCompl

Re: Handling Data Separation / Watermarking from Kafka in Flink

2021-03-01 Thread Rion Williams
Hey David et all,

I had one follow up question for this as I've been putting together some
integration/unit tests to verify that things are working as expected with
finite datasets (e.g. a text file with several hundred records that are
serialized, injected into Kafka, and processed through the pipeline). I'm
wondering if there's a good strategy to handle these finite sets (i.e. when
I'm done reading through all of the records that I care about, I'd need to
trigger something to explicitly flush the windows / evict messages. I'm not
sure what a great approach would be to handle here? I don't think there's
an easy way to simulate processing time delays outside of an explicit
Thread.sleep() call prior to injecting some messages into the running
pipeline asynchronously.

Any recommendations for handling something like this? I must imagine that
it's a fairly common use-case for testing, but maybe not?

Thanks much!

Rion

On Sat, Feb 27, 2021 at 10:56 AM Rion Williams 
wrote:

> Thanks David,
>
> I figured that the correct approach would obviously be to adopt a keying
> strategy upstream to ensure the same data that I used as a key downstream
> fell on the same partition (ensuring the ordering guarantees I’m looking
> for).
>
> I’m guessing implementation-wise, when I would normally evict a window
> after some event time and allowed lateness, I could set a timer or just
> explicitly keep the window open for some additional time to allow for out
> of order data to make its way into the window.
>
> Either way - I think the keying is probably the right approach, but I
> wanted to consider any other options should that become an issue upstream.
>
> Thanks!
>
> Rion
>
> On Feb 27, 2021, at 10:21 AM, David Anderson  wrote:
>
> 
> Rion,
>
> If you can arrange for each tenant's events to be in only one kafka
> partition, that should be the best way to simplify the processing you need
> to do. Otherwise, a simple change that may help would be to increase the
> bounded delay you use in calculating your own per-tenant watermarks,
> thereby making late events less likely.
>
> David
>
> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams 
> wrote:
>
>> David and Timo,
>>
>> Firstly, thank you both so much for your contributions and advice. I
>> believe I’ve implemented things along the lines that you both detailed and
>> things appear to work just as expected (e.g. I can see things arriving,
>> being added to windows, discarding late records, and ultimately writing out
>> files as expected).
>>
>> With that said, I have one question / issue that I’ve run into with
>> handling the data coming my Kafka topic. Currently, my tenant/source (i.e.
>> my key) may be distributed across the 10 partitions of my Kafka topic. With
>> the way that I’m consuming from this topic (with a Kafka Consumer), it
>> looks like my data is arriving in a mixed order which seems to be causing
>> my own watermarks (those stored in my ValueState) to process as later data
>> may arrive earlier than other data and cause my windows to be evicted.
>>
>> I’m currently using the `withNoWatermarks()` along with a custom
>> timestamp assigned to handle all of my timestamping, but is there a
>> mechanism to handle the mixed ordering across partitions in this scenario
>> at the Flink level?
>>
>> I know the answer here likely lies with Kafka and adopting a better
>> keying strategy to ensure the same tenant/source (my key) lands on the same
>> partition, which by definition ensures ordering. I’m just wondering if
>> there’s some mechanism to accomplish this post-reading from Kafka in Flink
>> within my pipeline to handle things in a similar fashion?
>>
>> Again - thank you both so much, I’m loving the granularity and control
>> that Flink has been providing me over other streaming technologies I’ve
>> used in the past. I’m totally sold on it and am looking forward to doing
>> more incredible things with it.
>>
>> Best regards,
>>
>> Rion
>>
>> On Feb 26, 2021, at 4:36 AM, David Anderson  wrote:
>>
>> 
>> Yes indeed, Timo is correct -- I am proposing that you not use timers at
>> all. Watermarks and event-time timers go hand in hand -- and neither
>> mechanism can satisfy your requirements.
>>
>> You can instead put all of the timing logic in the processElement method
>> -- effectively emulating what you would get if Flink were to offer per-key
>> watermarking.
>>
>> The reason that the PseudoWindow example is using MapState is that for
>> each key/tenant, more than one window can be active simultaneously. This
>> occurs because the event stream is out-of-order with respect to time, so
>> events for the "next window" are probably being processed before "the
>> previous" window is complete. And if you want to accommodate allowed
>> lateness, the requirement to have several windows open at once becomes even
>> more important.
>>
>> MapState gives you a per-tenant hashmap, where each entry in that map
>> corresponds to an open window for some particular tenant, where t

Timeout Exception When Producing/Consuming Messages to Hundreds of Topics

2021-03-01 Thread Claude M
Hello,

I'm trying to run an experiment w/ two flink jobs:

   - A producer producing messages to hundreds of topics
   - A consumer consuming the messages from all the topics

After the job runs after a few minutes, it will fail w/ following error:

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic
 not present in metadata after 6 ms

If I run the job w/ a few topics, it will work.  I have tried setting the
following properties in the job but still encounter the problem:

properties.setProperty("retries", "20");
properties.setProperty("request.timeout.ms", "30");
properties.setProperty("metadata.fetch.timeout.ms", "30");

Any ideas about this?

Thanks


State Schema Evolution within SQL API

2021-03-01 Thread Jan Oelschlegel
Hi at all,

i would like to know how far a state schema evolution is possible by using SQL 
API of Flink.  Which query changes can I do without disrupting the schema of my 
savepoint?


In the documentation is, only for the DataStream API , written what are the 
do's and don'ts regarding a safe schema evolution. [1]


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html



Best,
Jan

HINWEIS: Dies ist eine vertrauliche Nachricht und nur f?r den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zug?nglich zu machen. Sollten Sie diese Nachricht irrt?mlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.


timeWindow()s and queryable state

2021-03-01 Thread Ron Crocker
Hi all -

I’m trying to keep some state around for a little while after a window fires to 
use as queryable state. I am intending on using something like:

.keyBy()
.timeWindow(Time.minutes(1)).allowedLateness(Time.minutes(90))
.aggregate(…)
.keyBy()
.asQueryableState(...)

My intent is to keep that window available for 90 minutes. I’m not sure how I 
feel about this pattern - it feels more side-effect-y than intentional.

My questions:
a) Is that actually going to keep the window (and, by implication, the 
downstream state) around?
b) Is there a “more correct” way to do this? Maybe it would be better to use 
some kind of time-aware reducing state that will provide some lingering state?

Before you ask, no, I haven’t run it to see what it does. That’s next, but I 
figured I’d ask for your advice first

Scaling Higher than 10k Nodes

2021-03-01 Thread Joey Tran
Hi, I was looking at Apache Beam/Flink for some of our data processing
needs, but when reading about the resource managers
(YARN/mesos/Kubernetes), it seems like they all top out at around 10k
nodes. What are recommended solutions for scaling higher than this?

Thanks in advance,
Joey


Re: Flink application kept restarting

2021-03-01 Thread Matthias Pohl
Another question is: The timeout of 48 hours sounds strange. There should
have been some other system noticing the connection problem earlier
assuming that you have a reasonably low heartbeat interval configured.

Matthias

On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl  wrote:

> Thanks for providing this information, Rainie. Are other issues documented
> in the logs besides the TimeoutException in the JM logs which you already
> shared? For now, it looks like that there was a connection problem between
> the TaskManager and the JobManager that caused the shutdown of the operator
> resulting in the NetworkBufferPool to be destroyed. For this scenario I
> would expect other failures to occur besides the ones you shared.
>
> Best,
> Matthias
>
> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li  wrote:
>
>> Thank you Mattias.
>> It’s version1.9.
>>
>> Best regards
>> Rainie
>>
>> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Rainie,
>>> the network buffer pool was destroyed for some reason. This happens when
>>> the NettyShuffleEnvironment gets closed which is triggered when an operator
>>> is cleaned up, for instance. Maybe, the timeout in the metric system caused
>>> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
>>> conversation hoping that he can give more insights.
>>>
>>> If I may ask: What Flink version are you using?
>>>
>>> Thanks,
>>> Matthias
>>>
>>>
>>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li 
>>> wrote:
>>>
 Hi All,

 Our flink application kept restarting and it did lots of RPC calls to a
 dependency service.

 *We saw this exception from failed task manager log: *
 org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
 Could not forward element to next operator
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at
 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at
 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at
 org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 at
 com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
 at
 com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
 at
 org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at
 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at
 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at
 org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
 at
 org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
 at
 org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
 at
 org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
 at
 org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
 at
 org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
 at
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 at
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
 at
 org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
 Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
 at
 org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java

Re: Flink application kept restarting

2021-03-01 Thread Matthias Pohl
Thanks for providing this information, Rainie. Are other issues documented
in the logs besides the TimeoutException in the JM logs which you already
shared? For now, it looks like that there was a connection problem between
the TaskManager and the JobManager that caused the shutdown of the operator
resulting in the NetworkBufferPool to be destroyed. For this scenario I
would expect other failures to occur besides the ones you shared.

Best,
Matthias

On Fri, Feb 26, 2021 at 8:28 PM Rainie Li  wrote:

> Thank you Mattias.
> It’s version1.9.
>
> Best regards
> Rainie
>
> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl 
> wrote:
>
>> Hi Rainie,
>> the network buffer pool was destroyed for some reason. This happens when
>> the NettyShuffleEnvironment gets closed which is triggered when an operator
>> is cleaned up, for instance. Maybe, the timeout in the metric system caused
>> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
>> conversation hoping that he can give more insights.
>>
>> If I may ask: What Flink version are you using?
>>
>> Thanks,
>> Matthias
>>
>>
>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li  wrote:
>>
>>> Hi All,
>>>
>>> Our flink application kept restarting and it did lots of RPC calls to a
>>> dependency service.
>>>
>>> *We saw this exception from failed task manager log: *
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>> Could not forward element to next operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>> at
>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api

Re: Suspected classloader leak in Flink 1.11.1

2021-03-01 Thread Kezhu Wang
Hi Tamir,

> The histogram has been taken from Task Manager using jcmd tool.

>From that histogram, I guest there is no classloader leaking.

> A simple batch job with single operation . The memory bumps to ~600MB
(after single execution). once the job is finished the memory never freed.

It could be just new code paths and hence new classes. A single execution
does not making much sense. Multiple or dozen runs and continuous memory
increasing among them and not decreasing after could be symptom of leaking.

You could use following steps to verify whether there are issues in your
task managers:
* Run job N times, the more the better.
* Wait all jobs finished or stopped.
* Trigger manually gc dozen times.
* Take class histogram and check whether there are any
“ChildFirstClassLoader”.
* If there are roughly N “ChildFirstClassLoader” in histogram, then we can
pretty sure there might be class loader leaking.
* If there is no “ChildFirstClassLoader” or few but memory still higher
than a threshold, say ~600MB or more, it could be other shape of leaking.


In all leaking case, an heap dump as @Chesnay said could be more helpful
since it can tell us which object/class/thread keep memory from freeing.


Besides this, I saw an attachment “task-manager-thrad-print.txt” in initial
mail, when and where did you capture ? Task Manager ? Is there any job
still running ?


Best,
Kezhu Wang

On March 1, 2021 at 18:38:55, Tamir Sagi (tamir.s...@niceactimize.com)
wrote:

Hey Kezhu,

The histogram has been taken from Task Manager using jcmd tool.

By means of batch job, do you means that you compile job graph from DataSet
API in client side and then submit it through RestClient ? I am not
familiar with data set api, usually, there is no `ChildFirstClassLoader`
creation in client side for job graph building. Could you depict a pseudo
for this or did you create `ChildFirstClassLoader` yourself ?

Yes, we have a batch app. we read a file from s3 using hadoop-s3-plugin,
then map that data into DataSet then just print it.
Then we have a Flink Client application which saves the batch app jar.

Attached the following files:

   1. batch-source-code.java - main function
   2. FlatMapXSightMsgProcessor.java - custom RichFlatMapFunction
   3. flink-job-submit.txt - The code to submit the job


I've noticed 2 behaviors:

   1. Idle - Once Task manager application boots up the memory consumption
   gradually grows, starting ~360MB to ~430MB(within few minutes) I see logs
   where many classes are loaded into JVM and never get released.(Might be a
   normal behavior)
   2. Batch Job Execution - A simple batch job with single operation . The
   memory bumps to ~600MB (after single execution). once the job is finished
   the memory never freed. I executed GC several times (Manually +
   Programmatically) it did not help(although some classes were unloaded). the
   memory keeps growing while more batch jobs are executed.

Attached Task Manager Logs from yesterday after a single batch
execution.(Memory grew to 612MB and never freed)

   1. taskmgr.txt - Task manager logs (2021-02-28T16:06:05,983 is the timestamp
   when the job was submitted)
   2. gc-class-historgram.txt
   3. thread-print.txt
   4. vm-class-loader-stats.txt
   5. vm-class-loaders.txt
   6. heap_info.txt


Same behavior has been observed in Flink Client application. Once the batch
job is executed the memory is increased gradually and does not get cleaned
afterwards.(We observed many ChildFirstClassLoader instances)


Thank you
Tamir.

--
*From:* Kezhu Wang 
*Sent:* Sunday, February 28, 2021 6:57 PM
*To:* Tamir Sagi 
*Subject:* Re: Suspected classloader leak in Flink 1.11.1


*EXTERNAL EMAIL*


HI Tamir,

The histogram has no instance of `ChildFirstClassLoader`.

> we are running Flink on a session cluster (version 1.11.1) on Kubernetes,
submitting batch jobs with Flink client on Spring boot application (using
RestClusterClient).

> By analyzing the memory of the client Java application with profiling
tools, We saw that there are many instances of Flink's
ChildFirstClassLoader (perhaps as the number of jobs which were running),
and therefore many instances of the same class, each from a different
instance of the Class Loader (as shown in the attached screenshot).
Similarly, to the Flink task manager memory.

By means of batch job, do you means that you compile job graph from DataSet
API in client side and then submit it through RestClient ? I am not
familiar with data set api, usually, there is no `ChildFirstClassLoader`
creation in client side for job graph building. Could you depict a pseudo
for this or did you create `ChildFirstClassLoader` yourself ?


> In addition, we have tried calling GC manually, but it did not change
much.

It might take serval runs to collect a class loader instance.


Best,
Kezhu Wang


On February 28, 2021 at 23:27:38, Tamir Sagi (tamir.s...@niceactimize.com)
wrote:

Hey Kezhu,
Thanks for fast responding,

I've read tha

how to propagate watermarks across multiple jobs

2021-03-01 Thread yidan zhao
I have a job which includes about 50+ tasks. I want to split it to multiple
jobs, and the data is transferred through Kafka, but how about watermark?

Is anyone have do something similar and solved this problem?

Here I give an example:
The original job: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==>
xxxWindow2 resultSinkToKafka(result-topic).

The new job1: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==>
resultSinkToKafka(mid-topic).
The new job2: kafkaStream1(mid-topic) => xxxWindow2 ==>
resultSinkToKafka(result-topic).

The watermark for window1 and window 2 is separated to two jobs, which also
seems to be working, but this introduces a 5-minute delay for window2 (both
window is 5min's cycle).

The key problem is that the window's cycle is 5min, so the window2 will
have a 5min's delay.
If watermark can be transferred between jobs, it is not a problem anymore.


Re: Flink’s Kubernetes HA services - NOT working

2021-03-01 Thread Till Rohrmann
Hi Omer,

thanks for the logs. Could you tell us a bit more about the concrete setup
of your Flink K8s cluster? It looks to me as if the ResourceManager cannot
talk to the JobMaster which tries to register at the RM. Also some
JobMasters don't seem to reach the ResourceManager. Could it be that you
are running standby JobManager processes? If this is the case, then it does
not work that you are using a K8s service for the communication between
Flink components.

Cheers,
Till

On Sun, Feb 28, 2021 at 11:29 AM Omer Ozery  wrote:

> Sorry for the late reply.
> I attached to this mail 3 types of logs taken from the jobmanager.
>
> 1. flink-jobmanager with log level info - when nothing is working the
> minute we try to deploy the jobs (even the UI is jobs overview is stuck)
> 3. flink-jobmanager with log level debug -  when nothing is working the
> minute we try to deploy the jobs, (even the UI is jobs overview is stuck)
> 2. flink-jobmanager with log level info with 1 successful job - you can
> see that it is started and dealing with leadership and checkpoints
> properly.
>
> you can see that everything works fine when the cluster is starting with
> no jobs
> all task managers are registered and communicating with the jobmanager
> with no problems.
>
> * BTY flink has this problem when some jobs are stuck in scheduling, the
> jobmanager running jobs UI overview is stuck also, and you can't see any
> jobs running even if there are some. it happened in earlier versions also,
> 1.9, 1.11...
>
> Thanks
> Omer
>
>
>
> On Thu, Feb 18, 2021 at 4:22 PM Till Rohrmann 
> wrote:
>
>> Hi Omer,
>>
>> could you share a bit more of the logs with us? I would be interested in
>> what has happened before "Stopping DefaultLeaderRetrievalService" is
>> logged. One problem you might run into is FLINK-20417. This problem should
>> be fixed with Flink 1.12.2.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20417
>>
>> Cheers,
>> Till
>>
>> On Thu, Feb 18, 2021 at 2:54 PM Omer Ozery  wrote:
>>
>>> Hey guys
>>> It looks like the flink cluster is deployed successfully, it starts with
>>> no errors.
>>> but when we try to deploy the jobs, some jobs are starting and some
>>> can't find available slots for some reason, even when we have free ones.
>>> happens with different jobs every time..
>>> below are the exceptions thrown by the components.
>>> and I also attached an image showing the taskamangers and the free slots.
>>>
>>> *jobManager throws this error:*
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>> - Stopping DefaultLeaderRetrievalService.
>>> 2021-02-17 11:19:41,956 INFO
>>>  
>>> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver
>>> [] - Stopping
>>> KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-1796779318657734fcbc261f8d01d250-jobmanager-leader'}.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
>>> [] - The watcher is closing.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster [] -
>>> Registration at ResourceManager was declined: java.lang.Exception: Job
>>> leader id service has been stopped.
>>>
>>>
>>> java.util.concurrent.CompletionException:
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Slot request bulk is not fulfillable! Could not allocate the required slot
>>> within slot request timeout
>>> at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>> ~[?:?]
>>> at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>> ~[?:?]
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>> ~[?:?]
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>> ~[?:?]
>>> at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(Co

AW: Kafka SQL Connector: dropping events if more partitions then source tasks

2021-03-01 Thread Jan Oelschlegel
Hi Shengkai,

thanks for this hint. I solves the issue having more consumer tasks than kafka 
partitions.

But the case with dropping events while having less consumer tasks than kafka 
partitions is still there. I think it will be okay in version 1.12 [1]

[1] https://issues.apache.org/jira/browse/FLINK-20041

Best,
Jan

Von: Shengkai Fang 
Gesendet: Samstag, 27. Februar 2021 05:03
An: Jan Oelschlegel 
Cc: Benchao Li ; Arvid Heise ; user 
; Timo Walther 
Betreff: Re: Kafka SQL Connector: dropping events if more partitions then 
source tasks

Hi Jan.

Thanks for your reply. Do you set the option `table.exec.source.idle-timeout`  
and `pipeline.auto-watermark-interval` ? If the 
`pipeline.auto-watermark-interval ` is zero, it will not trigger the detection 
of the idle source.

Best,
Shengkai

Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>> 
于2021年2月26日周五 下午11:09写道:
Hi Shengkai,

i’m using Flink 1.11.2. The problem is if I use a parallelism higher than my 
kafka partition count, the watermarks are not increasing and so the windows are 
never ggot fired.

I suspect that then a source task is not marked as idle and thus the watermark 
is not increased. In any case I have observed how with a larger number of 
source tasks no results are produced.

Best,
Jan
Von: Shengkai Fang mailto:fskm...@gmail.com>>
Gesendet: Freitag, 26. Februar 2021 15:32
An: Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>>
Cc: Benchao Li mailto:libenc...@apache.org>>; Arvid Heise 
mailto:ar...@apache.org>>; user 
mailto:user@flink.apache.org>>; Timo Walther 
mailto:twal...@apache.org>>
Betreff: Re: Kafka SQL Connector: dropping events if more partitions then 
source tasks

Hi, Jan.

Could you tell us which Flink version you use? As far as I know, the kafka sql 
connector has implemented `SupportWatermarkPushDown` in Flink-1.12. The 
`SupportWatermarkPushDown` pushes the watermark generator into the source and 
emits the minimum watermark among all the partitions. For more details, you can 
refer to the doc for more details[1].

If you use the version before FLINK-1.12,  I think the best approach to solve 
this problem is to increase source tasks.

Best,
Shengkai

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#source-per-partition-watermarks

Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>> 
于2021年2月25日周四 下午4:24写道:
Hi Benchao,

i’m observing this behaviour only for the SQL API. With the Datastream API i 
can take more or less source-tasks then kafka partition count. And FLIP-27 
seems to belong to the Datastream API.

The problem is only on the SQL site.


Best,
Jan

Von: Benchao Li mailto:libenc...@apache.org>>
Gesendet: Donnerstag, 25. Februar 2021 00:04
An: Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>>
Cc: Arvid Heise mailto:ar...@apache.org>>; user 
mailto:user@flink.apache.org>>; Timo Walther 
mailto:twal...@apache.org>>
Betreff: Re: Kafka SQL Connector: dropping events if more partitions then 
source tasks

Hi Jan,

What you are observing is correct for the current implementation.

Current watermark generation is based on subtask instead of partition. Hence if 
there are
more than on partition in the same subtask, it's very easy to see more data 
dropped.

AFAIK, FLIP-27 could solve this problem, however the Kafka Connector has not 
been
migrated to FLIP-27 for now.


Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>> 
于2021年2月24日周三 下午10:07写道:
Hi Arvid,

thanks for bringing back this topic.

Yes, I’m running on historic data, but as you mentioned that should not be the 
problem, even there is a event-time skew between partitions.

But maybe this issue with the missing watermark pushdown per partition  is the 
important fact:

https://issues.apache.org/jira/browse/FLINK-20041


Best,
Jan

Von: Arvid Heise mailto:ar...@apache.org>>
Gesendet: Mittwoch, 24. Februar 2021 14:10
An: Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>>
Cc: user mailto:user@flink.apache.org>>; Timo Walther 
mailto:twal...@apache.org>>
Betreff: Re: Kafka SQL Connector: dropping events if more partitions then 
source tasks

Hi Jan,

Are you running on historic data? Then your partitions might drift apart 
quickly.

However, I still suspect that this is a bug (Watermark should only be from the 
slowest partition). I'm pulling in Timo who should know more.



On Fri, Feb 19, 2021 at 10:50 AM Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>> 
wrote:
If i increase the watermark, the dropped events getting lower. But why is the 
DataStream API Job still running with 12 hours watermark delay?

By the way, I’m using Flink 1.11. It would be nice if someone could give me 
some advice.

Best,
Jan

Von: Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>>
Gesendet: Donnerstag, 18. Februar 2021 09:51
An: Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>>;
 user mailto:user@flink.apache.org>>
Betreff: AW: Kafka SQL Connector: dropping

Re: Does the Kafka source perform retractions on Key?

2021-03-01 Thread Arvid Heise
> We are rereading the topics, at any time we might want a completely
different materialized view for a different web service for some new
application feature. Other jobs / new jobs need to read all the up-to-date
rows from the databases.
> I still don't see how this is the case if everything just needs to be
overwritten by primary key. To re-emphasize, we do not care about
historical data.
Why are you reading from a CDC topic and not a log-compacted topic that
reflects the state then? CDC is all about history and changes.

What i'd imagine an architecture that would work better for you:

For each SQL table (ingress layer):
SQL Table -> Debezium -> State collecting Flink job -> Kafka state topic
(compacted)

Analytics (processing layer):
Kafka state topics (compacted) -> Analytical Flink job -> Kafka state topic
(compacted)

For each view (egress layer):
Kafka state topics (compacted) -> Aggregating Flink job -> K/V store(s) ->
Web application

The ingress layer is only there to provide you log-compacted Kafka topics.
Then you can do a bunch of analytical queries from Kafka to Kafka. Finally,
you output your views to K/V stores for high-avail web applications
(=decoupled from processing layer).

If that's what you already have, then my apology for not picking that up.
It's really important to stress that no Kafka topics ever contain CDC data
in this instance since you are not interested in historic data. The only
CDC exchange is by using the debezium connector of Flink. At this point,
all discussions of this thread are resolved.



On Sat, Feb 27, 2021 at 9:06 PM Rex Fenley  wrote:

> Hi Arvid,
>
> >If you are not rereading the topics, why do you compact them?
> We are rereading the topics, at any time we might want a completely
> different materialized view for a different web service for some new
> application feature. Other jobs / new jobs need to read all the up-to-date
> rows from the databases.
>
> >correctness depends on compaction < downtime
> I still don't see how this is the case if everything just needs to be
> overwritten by primary key. To re-emphasize, we do not care about
> historical data.
>
> >Again, a cloud-native key/value store would perform much better and be
> much cheaper with better SLAs
> Is there a cloud-native key/value store which can read from a Postgres WAL
> or MySQL binlog and then keep an up-to-date read marker for any
> materialization consumers downstream *besides* Kafka + Debezium?
>
> Appreciate all the feedback, though hopefully we can get closer to the
> same mental model. If there's really a better alternative here I'm all for
> it!
>
>
> On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> Your initial question was about the impact of compaction on your CDC
>> application logic. I have been (unsuccessfully) trying to tell you that you
>> do not need compaction and it's counterproductive.
>>
>> If you are not rereading the topics, why do you compact them? It's lost
>> compute time and I/O on the Kafka brokers (which are both very valuable)
>> and does not give you anything that an appropriate retention time wouldn't
>> give you (=lower SSD usage). It makes the mental model more complicated. An
>> aggressive compaction and a larger backlog (compaction time < application
>> failure/restart/upgrade time) would lead to incorrect results (in the same
>> way an inappropriate retention period may cause data loss for the same
>> reason).
>>
>> The only use case for log compaction is if you're using a Kafka topic for
>> a key/value store to serve a web application (in which case, it's usually
>> better to take a real key/value store) but then you don't need retractions
>> anymore but you'd simply overwrite the actual values or use tombstone
>> records for deletions.
>>
>> If you consume the same topic both for web applications and Flink and
>> don't want to use another technology for key/value store, then log
>> compaction of retractions kinda makes sense to kill 2 birds with one stone.
>> However, you have to live with the downsides on the Flink side (correctness
>> depends on compaction < downtime) and on web application (deal with
>> retractions even though they do not make any sense at that level). Again, a
>> cloud-native key/value store would perform much better and be much cheaper
>> with better SLAs and solve all issues on the Flink side (final note: it's
>> independent of the technology, any stream processor will encounter the same
>> issue as it's a conceptual mismatch).
>>
>> On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley  wrote:
>>
>>> Hi Arvid,
>>>
>>> I really appreciate the thorough response but I don't think this
>>> contradicts our use case. In servicing web applications we're doing nothing
>>> more than taking data from giant databases we use, and performing joins and
>>> denormalizing aggs strictly for performance reasons (joining across a lot
>>> of stuff on query time is slow) and putting specified results into another
>>> database connecte

Re: Issues running multiple Jobs using the same JAR

2021-03-01 Thread Morgan Geldenhuys

That solved it, thank you very much Kezhu :)

On 28.02.21 16:12, Kezhu Wang wrote:

Hi Morgan,

You could check FLINK-11654, from its description, I think it is the 
problem you encountered.


> We run multiple jobs on a cluster which write a lot to the same 
Kafka topic from identically named sinks. When EXACTLY_ONCE semantic 
is enabled for the KafkaProducers we run into a lot of 
ProducerFencedExceptions and all jobs go into a restart cycle.


FLINK-11654: https://issues.apache.org/jira/browse/FLINK-11654


Best,
Kezhu Wang


On February 28, 2021 at 22:35:02, Morgan Geldenhuys 
(morgan.geldenh...@tu-berlin.de 
) wrote:



Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same 
JAR in the same Flink native cluster (all 1.12.1).


When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannotperform send because at 
least one previous transactional oridempotent request has failed with 
errors.
at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
at org.apache.flink.streaming.runtime.io 
.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io 
.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io 
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failedto send data to Kafka: Producerattempted an operation with an 
old epoch. Eitherthere is a newer producer with the same 
transactionalId, orthe producer's transaction has been expired by the 
broker.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

... 3more
Causedby: org.apache.kafka.common.errors.ProducerFencedException: 
Producerattempted an operation with an old epoch. Eitherthere is a 
newer producer with the same transactionalId, orthe producer's 
transaction has been expired by the broker.
Suppressed: java.lang.IllegalStateException: Pendingrecord count must 
be zero at thispoint: 1
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.Abstract

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-03-01 Thread Alexey Trenikhun
Hi Yang,
Unfortunately I didn't save log. Trying to reproduce again, but now hitting 
different error - about incompatible version of ImmutableMapSerializer, which 
is strange, since while serialVersionUID indeed changed, however this 
serializer is only registered but not used, (there is no state using Kryo, I'm 
calling disableGenericTypes to ensure this), could be that when I call 
registerTypeWithKryoSerializer, the serializer become part of JobGraph? If that 
so, then perhaps it same root cause - new JobGraph is not created.

Thanks,
Alexey


From: Yang Wang 
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have 
been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in 
KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering 
from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application 
cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang

Alexey Trenikhun mailto:yen...@msn.com>> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is 
deployed as Job, single TM as StatefulSet). We taken savepoint with 
cancel=true. Now when we are trying to start job using --fromSavepoint A, where 
is A path we got from taking savepoint (ClusterEntrypoint reports A in log), 
but looks like Job for some reason ignores given A and actually trying to 
restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program 
Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
--configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
/opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
--fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
--job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
--job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.502Z","message":"
","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from 
KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in 
KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already 
downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during 
restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting