Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?
Hi Datashov, We faced similar problems in our production clusters. Now both lauching and stopping of containers are performed in the main thread of YarnResourceManager. As containers are launched and stopped one after another, it usually takes long time to boostrap large jobs. Things get worse when some node managers get lost. Yarn will retry many times to communicate with them, leading to heartbeat timeout of TaskManagers. Following are some efforts we made to help Flink deal with large jobs. 1. We provision some common jars in all cluster nodes and ask our users not to include these jars in their uberjar. When containers bootstrap, these jars are added to the classpath via JVM options. That way, we can efficiently reduce the size of uberjars. 2. We deploys some asynchronous threads to launch and stop containers in YarnResourceManager. The bootstrap time can be efficiently reduced when launching a large amount of containers. We'd like to contribute it to the community very soon. 3. We deploys a timeout timer for each launching container. If a task manager does not register in time after its container has been launched, a new container will be allocated and launched. That will lead to certain waste of resources, but can reduce the effects caused by slow or problematic nodes. Now the community is considering the refactoring of ResourceManager. I think it will be the time for improving its efficiency. Regards, Xiaogang Elkhan Dadashov 于2019年8月30日周五 上午7:10写道: > Dear Flink developers, > > Having difficulty of getting a Flink job started. > > The job's uberjar/fat jar is around 400MB, and I need to kick 800+ > containers. > > The default HDFS replication is 3. > > *The Yarn queue is empty, and 800 containers are allocated > almost immediately by Yarn RM.* > > It takes very long time until all 800 nodes (node managers) will download > Uberjar from HDFS to local machines. > > *Q1:* > > a) Do all those 800 nodes download of batch of 3 at a time ? ( batch > size = HDFS replication size) > > b) Or Do Flink TM's can replicate from each other ? or already started > TM's replicate to yet-started nodes? > > Most probably answer is (a), but want to confirm. > > *Q2:* > > What is the recommended way of handling 400MB+ Uberjar with 800+ > containers ? > > Any specific params to tune? > > Thanks. > > Because downloading the UberJar takes really long time, after around 15 > minutes since the job kicked, facing this exception: > > org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to > start container. > This token is expired. current time is 1567116179193 found 1567116001610 > Note: System times on machines may be out of sync. Check system time and time > zones. > at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown > Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) > at > org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) > at > org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205) > at > org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > >
How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?
Dear Flink developers, Having difficulty of getting a Flink job started. The job's uberjar/fat jar is around 400MB, and I need to kick 800+ containers. The default HDFS replication is 3. *The Yarn queue is empty, and 800 containers are allocated almost immediately by Yarn RM.* It takes very long time until all 800 nodes (node managers) will download Uberjar from HDFS to local machines. *Q1:* a) Do all those 800 nodes download of batch of 3 at a time ? ( batch size = HDFS replication size) b) Or Do Flink TM's can replicate from each other ? or already started TM's replicate to yet-started nodes? Most probably answer is (a), but want to confirm. *Q2:* What is the recommended way of handling 400MB+ Uberjar with 800+ containers ? Any specific params to tune? Thanks. Because downloading the UberJar takes really long time, after around 15 minutes since the job kicked, facing this exception: org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container. This token is expired. current time is 1567116179193 found 1567116001610 Note: System times on machines may be out of sync. Check system time and time zones. at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205) at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Re: Flink and kerberos
Thanks, I'll check it out. On Thu, Aug 29, 2019 at 1:08 PM David Morin wrote: > Vishwas, > > A config that works on my Kerberized cluster (Flink on Yarn). > I hope this will help you. > > Flink conf: > security.kerberos.login.use-ticket-cache: true > security.kerberos.login.keytab: /home/myuser/myuser.keytab > security.kerberos.login.principal: myuser@ > security.kerberos.login.contexts: Client > > Properties related to security passed as argument of the > FlinkKafkaConsumerXX constructor: > sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"myuser\" password=\"\";" > sasl.mechanism=PLAIN > security.protocol=SASL_SSL > > Le jeu. 29 août 2019 à 18:20, Vishwas Siravara a > écrit : > >> Hey David , >> My consumers are registered , here is the debug log. The problem is the >> broker does not belong to me , so I can’t see what is going on there . But >> this is a new consumer group , so there is no state yet . >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> Consumer subtask 0 will start reading the following 40 partitions from the >> committed group offsets in Kafka: >> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] >> >> On Thu, Aug 29, 2019 at 11:39 AM David Morin >> wrote: >> >>> Hello Vishwas, >>> >>> You can use a keytab if you prefer. You generate a keytab for your user >>> and then you can reference it in the Flink configuration. >>> Then this keytab will be handled by Flink in a secure way and TGT will >>> be created based on this keytab. >>> However, that seems to be working. >>> Did you check Kafka logs on the broker side ? >>> Or did you check consumer offsets with Kafka tools in order to validate >>> consumers are registered onto the different partitions of your topic ? >>> You could try to switch to a different groupid for your consumer group >>> in order to force parallel consumption. >>> >>> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara a >>> écrit : >>> I see this log as well , but I can't see any messages . I know for a fact that the topic I am subscribed to has messages as I checked with a simple java consumer with a different group. org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 40 partitions from the committed
Re: Flink and kerberos
Vishwas, A config that works on my Kerberized cluster (Flink on Yarn). I hope this will help you. Flink conf: security.kerberos.login.use-ticket-cache: true security.kerberos.login.keytab: /home/myuser/myuser.keytab security.kerberos.login.principal: myuser@ security.kerberos.login.contexts: Client Properties related to security passed as argument of the FlinkKafkaConsumerXX constructor: sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"myuser\" password=\"\";" sasl.mechanism=PLAIN security.protocol=SASL_SSL Le jeu. 29 août 2019 à 18:20, Vishwas Siravara a écrit : > Hey David , > My consumers are registered , here is the debug log. The problem is the > broker does not belong to me , so I can’t see what is going on there . But > this is a new consumer group , so there is no state yet . > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 will start reading the following 40 partitions from the > committed group offsets in Kafka: > [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] > > On Thu, Aug 29, 2019 at 11:39 AM David Morin > wrote: > >> Hello Vishwas, >> >> You can use a keytab if you prefer. You generate a keytab for your user >> and then you can reference it in the Flink configuration. >> Then this keytab will be handled by Flink in a secure way and TGT will be >> created based on this keytab. >> However, that seems to be working. >> Did you check Kafka logs on the broker side ? >> Or did you check consumer offsets with Kafka tools in order to validate >> consumers are registered onto the different partitions of your topic ? >> You could try to switch to a different groupid for your consumer group in >> order to force parallel consumption. >> >> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara a >> écrit : >> >>> I see this log as well , but I can't see any messages . I know for a >>> fact that the topic I am subscribed to has messages as I checked with a >>> simple java consumer with a different group. >>> >>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >>> Consumer subtask 0 will start reading the following 40 partitions from the >>> committed group offsets in Kafka: >>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, >>>
Re: Flink 1.9, MapR secure cluster, high availability
Hi Maxim! The change of the MapR dependency should not have an impact on that. Do you know if the same thing worked in prior Flink versions? Is that a regression in 1.9? The exception that you report, is that from Flink's HA services trying to connect to ZK, or from the MapR FS client trying to connect to ZK? Best, Stephan On Tue, Aug 27, 2019 at 11:03 AM Maxim Parkachov wrote: > Hi everyone, > > I'm testing release 1.9 on MapR secure cluster. I took flink binaries from > download page and trying to start Yarn session cluster. All MapR specific > libraries and configs are added according to documentation. > > When I start yarn-session without high availability, it uses zookeeper > from MapR distribution (org.apache.zookeeper) and correctly connects to > cluster and access to maprfs works as expected. > > But if I add zookeeper as high-avalability option, instead of MapR > zookeeper it tries to use shaded zookeeper and this one could not connect > with mapr credentials: > > 2019-08-27 10:42:45,240 ERROR > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient > - An error: (java.security.PrivilegedActionException: > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to find > any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum Member's > received SASL token. Zookeeper Client will go to AUTH_FAILED state. > 2019-08-27 10:42:45,240 ERROR > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL > authentication with Zookeeper Quorum member failed: > javax.security.sasl.SaslException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Failed to find any Kerberos tgt)]) occurred when evaluating > Zookeeper Quorum Member's received SASL token. Zookeeper Client will go to > AUTH_FAILED state. > I tried to use separate zookeeper cluster for HA, but maprfs still doesn't > work. > > Is this related to removal of MapR specific settings in Release 1.9 ? > Should I still compile custom version of Flink with MapR dependencies ? > (trying to do now, but getting some errors during compilation). > > Can I somehow force flink to use MapR zookeeper even with HA mode ? > > Thanks in advance, > Maxim. > >
Re: End of Window Marker
Hi, I'll chip in with an approach I'm trying at the moment that seems to work, and I say seems because I'm only running this on a personal project. Personally, I don't have anything against end-of-message markers per partition, Padarn you seem to not prefer this option as it overloads the meaning of the output payload. My approach is equally valid when producing watermarks/end-of-message markers on a side output though. The main problem of both approaches is knowing when the window has finished across all partitions without having to wait for the start of the next window. I've taken the approach of sending all output messages of the window to 1. the sink but also 2. a single task operator. The single task operator registers an event time based timer at the time of the end of the window. You have the confidence of the task's timer triggering only once at the right time because all the post-window watermarks go through to the same task. At that point I make the task send an end-of-message marker to every partition. I don't need to send the count because Kafka messages are ordered. AND IF you prefer to not overload the semantic of your original Kafka topic you can post the message to a separate location of your choice. While this does mean that the end of marker message only gets sent through once the window has finished across all substreams (as opposed to per stream), it does mean you don't need to wait for the next window to start AND the watermark gap between substreams should never grow that much anyway. This approach should be particularly useful when the number of partitions or keying mechanism is different between the input and output topics. Hopefully that doesn't sound like a terrible idea. eduardo On Wed, 28 Aug 2019, 02:54 Padarn Wilson, wrote: > Hi again Fabian, > > Thanks for pointing this out to me. In my case there is no need for keyed > writing - but I do wonder if having each kafka task write only to a single > partition would significantly affect performance. > > Actually now that I think about it, the approach to just wait for the > first records of the next window is also subject to the problem you mention > above: a producer lagging behind the rest could end up with a partition > containing element out of ‘window order’. > > I was also thinking this problem is very similar to that of checkpoint > barriers. I intended to dig into the details of the exactly once Kafka sink > for some inspiration. > > Padarn > > On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske wrote: > >> Hi Padarn, >> >> Yes, this is quite tricky. >> The "problem" with watermarks is that you need to consider how you write >> to Kafka. >> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is >> written by multiple producers), you need to broadcast the watermarks to >> each partition, i.e., each partition would receive watermarks from each >> parallel sink task. So in order to reason about the current watermark of a >> partition, you need to observe them and take the minimum WM across all >> current sink task WMs. >> Things become much easier, if each partition is only written by a single >> task but this also means that data is not key-partitioned in Kafka. >> In that case, the sink task only needs to write a WM message to each of >> its assigned partitions. >> >> Hope this helps, >> Fabian >> >> >> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson < >> pad...@gmail.com>: >> >>> Hi Fabian, thanks for your input >>> >>> Exactly. Actually my first instinct was to see if it was possible to >>> publish the watermarks somehow - my initial idea was to insert regular >>> watermark messages into each partition of the stream, but exposing this >>> seemed quite troublesome. >>> >>> > In that case, you could have a ProcessFunction that is chained before >>> the sink and which counts the window results per time slice and emits the >>> result when the watermark passes to a side output. >>> All side output messages are collected by a single task and can be >>> published to a Kafka topic or even be made available via Queryable State. >>> >>> I understand the idea here (and exactly once semantics are probably fine >>> for my use case), but counting events seems a bit fragile. I'm not totally >>> confident the consumer can guarantee it won't read duplicates (its a golang >>> kafka library that seems to have some quirks). >>> >>> I think ideally each partition of the kafka topic would have some >>> regular information about watermarks. Perhaps the kafka producer can be >>> modified to support this. >>> >>> Padarn >>> >>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske wrote: >>> Hi Padarn, What you describe is essentially publishing Flink's watermarks to an outside system. Flink processes time windows, by waiting for a watermark that's past the window end time. When it receives such a WM it processes and emits all ended windows and forwards the watermark. When a
Re: Flink and kerberos
Hey David , My consumers are registered , here is the debug log. The problem is the broker does not belong to me , so I can’t see what is going on there . But this is a new consumer group , so there is no state yet . org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] On Thu, Aug 29, 2019 at 11:39 AM David Morin wrote: > Hello Vishwas, > > You can use a keytab if you prefer. You generate a keytab for your user > and then you can reference it in the Flink configuration. > Then this keytab will be handled by Flink in a secure way and TGT will be > created based on this keytab. > However, that seems to be working. > Did you check Kafka logs on the broker side ? > Or did you check consumer offsets with Kafka tools in order to validate > consumers are registered onto the different partitions of your topic ? > You could try to switch to a different groupid for your consumer group in > order to force parallel consumption. > > Le jeu. 29 août 2019 à 09:57, Vishwas Siravara a > écrit : > >> I see this log as well , but I can't see any messages . I know for a fact >> that the topic I am subscribed to has messages as I checked with a simple >> java consumer with a different group. >> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> Consumer subtask 0 will start reading the following 40 partitions from the >> committed group offsets in Kafka: >> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, >>
Re: Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?
Hi Ashish, You are right. Flink does not use Kafka based group management. So if you have two clusters consuming the same topic, they will not divide the partitions. The cross cluster HA is not quite possible at this point. It would be good to know the reason you want to have such HA and see if Flink meets you requirement in another way. Thanks, Jiangjie (Becket) Qin On Thu, Aug 29, 2019 at 9:19 PM ashish pok wrote: > Looks like Flink is using “assign” partitions instead of “subscribe” which > will not allow participating in a group if I read the code correctly. > > Has anyone solved this type of problem in past of active-active HA across > 2 clusters using Kafka? > > > - Ashish > > On Wednesday, August 28, 2019, 6:52 PM, ashish pok > wrote: > > All, > > I was wondering what the expected default behavior is when same app is > deployed in 2 separate clusters but with same group Id. In theory idea was > to create active-active across separate clusters but it seems like both > apps are getting all the data from Kafka. > > Anyone else has tried something similar or have an insight on expected > behavior? I was expecting to see partial data on both apps and to get all > data in one app if other was turned off. > > Thanks in advance, > > - Ashish > >
Re: Flink and kerberos
Hello Vishwas, You can use a keytab if you prefer. You generate a keytab for your user and then you can reference it in the Flink configuration. Then this keytab will be handled by Flink in a secure way and TGT will be created based on this keytab. However, that seems to be working. Did you check Kafka logs on the broker side ? Or did you check consumer offsets with Kafka tools in order to validate consumers are registered onto the different partitions of your topic ? You could try to switch to a different groupid for your consumer group in order to force parallel consumption. Le jeu. 29 août 2019 à 09:57, Vishwas Siravara a écrit : > I see this log as well , but I can't see any messages . I know for a fact > that the topic I am subscribed to has messages as I checked with a simple > java consumer with a different group. > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 will start reading the following 40 partitions from the > committed group offsets in Kafka: > [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] > > > On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara > wrote: > >> Hi guys, >> I am using kerberos for my kafka source. I pass the jaas config and >> krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf >> -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/ >> -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf >> -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf >> >> When I look at debug logs I see that the consumer was created with the >> following properties. >> >> 2019-08-29 06:49:18,298 INFO >> org.apache.kafka.clients.consumer.ConsumerConfig - >> ConsumerConfig values: >> auto.commit.interval.ms = 5000 >> auto.offset.reset = latest >> bootstrap.servers = [sl73oprdbd018.visa.com:9092] >> check.crcs = true >> client.id = consumer-2 >> connections.max.idle.ms = 54 >> enable.auto.commit = true >> exclude.internal.topics = true >> fetch.max.bytes = 52428800 >> fetch.max.wait.ms = 500 >> fetch.min.bytes = 1 >> >> >> group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) >> heartbeat.interval.ms = 3000 >> interceptor.classes = null >> key.deserializer = class >>
Re: problem with avro serialization
Any update on this ? regards. On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai wrote: > Hi, > > Aljoscha opened a JIRA just recently for this issue: > https://issues.apache.org/jira/browse/FLINK-12501. > > Do you know if this is a regression from previous Flink versions? > I'm asking just to double check, since from my understanding of the issue, > the problem should have already existed before. > > Thanks, > Gordon > > On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh > wrote: > >> Hello - >> >> Facing an issue with avro serialization with Scala case classes generated >> through avrohugger .. >> Scala case classes generated by avrohugger has the avro schema in the >> companion object. This is a sample generated class (details elided) .. >> >> case class Data(var id: Int, var name: String) extends >> org.apache.avro.specific.SpecificRecordBase { >> def this() = this(0, "") >> def get(field$: Int): AnyRef = { >> //.. >> } >> def put(field$: Int, value: Any): Unit = { >> //.. >> } >> def getSchema(): org.apache.avro.Schema = Data.SCHEMA$ >> } >> object Data { >> val SCHEMA$ = new >> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}") >> } >> >> Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ >> property in the class & is unable 2 use Java reflection 2 identify the >> SCHEMA$ in the companion object. The exception that I get is the >> following .. >> >> java.lang.RuntimeException: Serializing the source elements failed: >>> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: >>> org.apache.avro.AvroRuntimeException: Not a Specific class: class >>> pipelines.flink.avro.Data >> >> >> Any help or workaround will be appreciated .. >> >> regards. >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg
flink to HDFS Couldn't create proxy provider class ***.ha.ConfiguredFailoverProxyProvider
Dear All 【我想咨询一下flink消费数据入HDFS的一个问题】 1. 因为我们有多个HDFS集群 2. 我采用的是将hdfs-site.xml和core-site.xml放到resources文件下 【问题】 java.io.IOException: Couldn't create proxy provider class org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider at org.apache.hadoop.hdfs.NameNodeProxies.createFailoverProxyProvider(NameNodeProxies.java:515) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:170) at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668) at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148) at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.hdfs.NameNodeProxies.createFailoverProxyProvider(NameNodeProxies.java:498) ... 18 more Caused by: java.lang.RuntimeException: Could not find any configured addresses for URI hdfs://**/**/**/***.db/*** at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.(ConfiguredFailoverProxyProvider.java:93) ... 23 more 【尝试】 1. 载入hadoop-hdfs-*.jar文件 2. 载入flink-connector-filesystem-*.jar文件 还是同样的报错 -- Marvin Email: maxw1...@163.com
flink to HDFS Couldn't create proxy provider class ***.ha.ConfiguredFailoverProxyProvider
Dear All 【我想咨询一下flink消费数据入HDFS的一个问题】 1. 因为我们有多个HDFS集群 2. 我采用的是将hdfs-site.xml和core-site.xml放到resources文件下 【问题】 java.io.IOException: Couldn't create proxy provider class org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider at org.apache.hadoop.hdfs.NameNodeProxies.createFailoverProxyProvider(NameNodeProxies.java:515) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:170) at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668) at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148) at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.hdfs.NameNodeProxies.createFailoverProxyProvider(NameNodeProxies.java:498) ... 18 more Caused by: java.lang.RuntimeException: Could not find any configured addresses for URI hdfs://**/**/**/***.db/*** at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.(ConfiguredFailoverProxyProvider.java:93) ... 23 more 【尝试】 1. 载入hadoop-hdfs-*.jar文件 2. 载入flink-connector-filesystem-*.jar文件 还是同样的报错 -- Marvin Email: maxw1...@163.com
Re: Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?
Looks like Flink is using “assign” partitions instead of “subscribe” which will not allow participating in a group if I read the code correctly. Has anyone solved this type of problem in past of active-active HA across 2 clusters using Kafka? - Ashish On Wednesday, August 28, 2019, 6:52 PM, ashish pok wrote: All, I was wondering what the expected default behavior is when same app is deployed in 2 separate clusters but with same group Id. In theory idea was to create active-active across separate clusters but it seems like both apps are getting all the data from Kafka. Anyone else has tried something similar or have an insight on expected behavior? I was expecting to see partial data on both apps and to get all data in one app if other was turned off. Thanks in advance, - Ashish
Left Anti-Join
hi, I want to calculate the amount of consumption relative to the user added in the previous year, but the result of the following sql calculation is incorrect. The "appendTable" is a table register from a appendStream select a.years,a.shopId,a.userId, a.amount from (select years,shopId,userId,sum(amount) amount from appendTable group by years,shopId,userId)a where not exists (select years,shopId,userId from appendTable b where cast(a.years as int) =cast(b.years as int)-1 and a.shopId=b.shopId and a.userId=b.userId) ; thanks
Re: Loading dylibs
Hi Vishwas, I think it just because dylib is loaded more than once in a jvm process(TaskManager). Multiple tasks are deployed in one TaskManager and running in different threads. So if you want to make the dylib only loaded once, maybe you use the parent classloader. You could use the the following config option to set the packages to be loaded by parent classloader. *classloader.parent-first-patterns.additional: xxx.yyy.** Have a try :) Best, Yang Vishwas Siravara 于2019年8月28日周三 下午9:40写道: > Yes this is exactly what happens , as a work around I created a small jar > file which has code to load the dylib and I placed it under the lib folder > , this library is in provided scope in my actual job, so the dylib gets > loaded only once when the tm/jm jvm starts . > What I found interesting in my old approach was even when I check whether > the dylib has already been loaded in the current thread , and if it is I > still get the unsatisfied link error even though that dylib is loaded in > the task manager . > > On Wed, Aug 28, 2019 at 7:04 AM Aleksey Pak wrote: > >> Hi Vishwas, >> >> There is a known issue in the Flink Jira project [1]. >> Is it possible that you have encountered the same problem? >> >> [1]: https://issues.apache.org/jira/browse/FLINK-11402 >> >> Regards, >> Aleksey >> >> >> On Tue, Aug 27, 2019 at 8:03 AM Vishwas Siravara >> wrote: >> >>> Hi Jörn, >>> I tried that. Here is my snippet : >>> >>> String[] loadedlibs = >>> getLoadedLibraries(Thread.currentThread().getContextClassLoader()); >>> if(!containsVibeSimpleLib(loadedlibs)) { >>> System.loadLibrary("vibesimplejava"); >>> } >>> >>> Now I get the exception Unexpected errorjava.lang.UnsatisfiedLinkError: >>> com.voltage.securedata.enterprise.ConstantsNative.DIGEST_MD5()I which means >>> that it could not find vibesimplejava in the loaded libs but I know that >>> the if was not executed because vibesimplejava was present in loadedlibs( >>> the control never went inside the if block. Any other suggestions? >>> >>> Thanks, >>> Vishwas >>> >>> >>> >>> >>> >>> >>> On Tue, Aug 27, 2019 at 12:25 AM Jörn Franke >>> wrote: >>> I don’t know Dylibs in detail, but can you call a static method where it checks if it has been already executed and if not then it loads the library (Singleton pattern)? Am 27.08.2019 um 06:39 schrieb Vishwas Siravara : Hi guys, I have a flink application that loads a dylib like this System.loadLibrary("vibesimplejava"); The application runs fine , when I restart the job I get this exception : com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected errorjava.lang.UnsatisfiedLinkError: Native Library /usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/libvibesimplejava.so already loaded in another classloader This happens because the dylib has already been loaded once by the taskmanger, how can I mitigate this? It seems problematic if two applications are loading the same dylib. Thanks, Vishwas
Re: 关于flink 写于kafka时的transactionId 生成问题
Hi on 2019/8/29 17:50, ddwcg wrote: broker就一个,flink集群的时钟确实和broker的不一样,是不同的机房,能自己指定transactionalId吗,两个机房的调整成一样怕影响其他的应用 AFAIK the transID is generated by systems. regards.
Re: 关于flink 写于kafka时的transactionId 生成问题
broker就一个,flink集群的时钟确实和broker的不一样,是不同的机房,能自己指定transactionalId吗,两个机房的调整成一样怕影响其他的应用 > 在 2019年8月29日,17:45,Wesley Peng 写道: > > Hi > > on 2019/8/29 17:13, ddwcg wrote: >> 作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报: >> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer >> attempted an operation with an old epoch. Either there is a newer producer >> with the same transactionalId, or the producer's transaction has been >> expired by the broker. >> 错误,请问有什么办法可以避免这个问题? > > Maybe you want to check all the broker and producers have the same timezone > setup, and all time are synchronous. > > regards. >
Re: 关于flink 写于kafka时的transactionId 生成问题
Hi on 2019/8/29 17:13, ddwcg wrote: 作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报: Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. 错误,请问有什么办法可以避免这个问题? Maybe you want to check all the broker and producers have the same timezone setup, and all time are synchronous. regards.
关于flink 写于kafka时的transactionId 生成问题
hi, 在写入kafka的时候自动生成了一个transactionId,请问这个id生成的方式是什么,我自己指定好像并不起作用。 作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报: Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. 错误,请问有什么办法可以避免这个问题? 下面是kafka的日志,一直在刷,很多都是好几天之前的任务了 2019-08-29 17:08:48,911] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-3 with producerId 1011 and producer epoch 14255 on partition __transaction_state-36 (kafka.coordinator.transaction.TransactionCoordinator) [2019-08-29 17:08:49,244] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-4 with producerId 1012 and producer epoch 16544 on partition __transaction_state-35 (kafka.coordinator.transaction.TransactionCoordinator) [2019-08-29 17:08:49,518] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-1 with producerId 1013 and producer epoch 15460 on partition __transaction_state-38 (kafka.coordinator.transaction.TransactionCoordinator) [2019-08-29 17:08:49,786] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-2 with producerId 1014 and producer epoch 7781 on partition __transaction_state-37 (kafka.coordinator.transaction.TransactionCoordinator) [2019-08-29 17:08:50,054] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-0 with producerId 1015 and producer epoch 7529 on partition __transaction_state-39 (kafka.coordinator.transaction.TransactionCoordinator) [2019-08-29 17:08:50,310] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-3 with producerId 1011 and producer epoch 14256 on partition __transaction_state-36 (kafka.coordinator.transaction.TransactionCoordinator) [2019-08-29 17:08:51,078] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-4 with producerId 1012 and producer epoch 16545 on partition __transaction_state-35 (kafka.coordinator.transaction.TransactionCoordinator) [2019-08-29 17:08:53,047] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-1 with producerId 1013 and producer epoch 15461 on partition __transaction_state-38 (kafka.coordinator.transaction.TransactionCoordinator)
一个FlinkJob消费多个kafka topic消息问题
1.平常工作中经常会有同一个统计表中会包含多个不同的统计指标,比如:post_count, send_count 2.然而这些指标来自不同的kafka 消息体 3.有没有在不用uninon all的情况下,向sink 表中写入各自topic的数据,因为union all有很多0值来填充
Re: Flink and kerberos
I see this log as well , but I can't see any messages . I know for a fact that the topic I am subscribed to has messages as I checked with a simple java consumer with a different group. org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara wrote: > Hi guys, > I am using kerberos for my kafka source. I pass the jaas config and > krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf > -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/ > -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf > -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf > > When I look at debug logs I see that the consumer was created with the > following properties. > > 2019-08-29 06:49:18,298 INFO > org.apache.kafka.clients.consumer.ConsumerConfig - > ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [sl73oprdbd018.visa.com:9092] > check.crcs = true > client.id = consumer-2 > connections.max.idle.ms = 54 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > > > group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 >
Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction
Hi, Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours, but the problem sill happened when I restarted broker with active controller. It might not due to the problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out what's wrong about my kafka producer. Could someone help me please? Best, Tony Wei Fabian Hueske 於 2019年8月16日 週五 下午4:10寫道: > Hi Tony, > > I'm sorry I cannot help you with this issue, but Becket (in CC) might have > an idea what went wrong here. > > Best, Fabian > > Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei < > tony19920...@gmail.com>: > >> Hi, >> >> Currently, I was trying to update our kafka cluster with larger ` >> transaction.max.timeout.ms`. The >> original setting is kafka's default value (i.e. 15 minutes) and I tried >> to set as 3 hours. >> >> When I was doing rolling-restart for my brokers, this exception came to >> me on the next checkpoint >> after I restarted the broker with active controller. >> >> java.lang.RuntimeException: Error while confirming checkpoint at >>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at >>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) Caused by: >>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions >>> failed, logging first encountered failure at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) >>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 >>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: >>> The producer attempted a transactional operation in an invalid state >> >> >> I have no idea why it happened, and I didn't find any error log from >> brokers. Does anyone have >> this exception before? How can I prevent from this exception when I tried >> to restart kafka cluster? >> Does this exception mean that I will lost data in some of these >> transactions? >> >> flink cluster version: 1.8.1 >> kafka cluster version: 1.0.1 >> flink kafka producer version: universal >> producer transaction timeout: 15 minutes >> checkpoint interval: 5 minutes >> number of concurrent checkpoint: 1 >> max checkpoint duration before and after the exception occurred: < 2 >> minutes >> >> Best, >> Tony Wei >> >
????????????state??????checkpoint size????????????????????
??Flink processFunction??state??flatmapendTimekey+endTimekeybyprocessFunction open()value state??processElement()state??Timerontimer()??statestate?? ??checkpoint size??8checkpoint size??1.6??Gstate?? jobprocessFunction val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source") .map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \ "type").extract[String] == "track" && (eventJson \ "project_id").extract[Int] == 6 && (eventJson \ "properties" \ "$is_login_id").extract[Boolean] && (eventJson \ "time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None } catch { case _ => None } }).uid("DianDianUserAppViewCount_Map") .filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter") .flatMap{ item => val timestamp = (item.get \ "time").extract[Long] SlidingEventTimeWindows .of(Time.days(90), Time.days(1), Time.hours(-8)) .assignWindows(null, timestamp, null) .map{ case timeWindow => RichAppViewEvent( s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \ "distinct_id").extract[String], (item.get \ "event").extract[String], timestamp, timeWindow.getEnd ) } }.uid("DianDianUserAppViewCount_FlatMap") .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) { override def extractTimestamp(element: RichAppViewEvent): Long = element.time }).uid("DianDianUserAppViewCount_Watermarker") .keyBy(_.key) .process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process") .writeUsingOutputFormat(new HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)] { // 30 1000 * 60 * 60 * 24 * 30 private val thirtyDaysForTimestamp = 259200L private val dateFormat = new SimpleDateFormat("MMdd") private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", classOf[Int])) override def processElement(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context, out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ // ??30?? val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp // ??30??$AppViewScreen?? var currentValue = appViewCount.value() if (value.time >= beforeThirtyDaysStartTime && value.event.equals("$AppViewScreen")){ currentValue = currentValue + 1 appViewCount.update(currentValue) } // ?? out.collect( (value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue) ) // cleanup timer registerCleanupTimer(value, ctx) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#OnTimerContext, out: Collector[(String, String, Int)]): Unit = { if (appViewCount != null){ appViewCount.clear() } } /** * watermarker * * @param value * @param ctx * @return */ private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context): Boolean = { ctx.timerService().currentWatermark() >= value.windowEndTime } /** * * @param value * @param ctx */ private def registerCleanupTimer(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) = { val cleanupTime = value.windowEndTime ctx.timerService().registerEventTimeTimer(cleanupTime) } }
关于定期清理state,但是checkpoint size大小一直在增加的疑问
各位好,本人在使用Flink processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction open()里我初始化了value state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下 job以及processFunction代码 val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source") .map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \ "type").extract[String] == "track" && (eventJson \ "project_id").extract[Int] == 6 && (eventJson \ "properties" \ "$is_login_id").extract[Boolean] && (eventJson \ "time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None } catch { case _ => None } }).uid("DianDianUserAppViewCount_Map") .filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter") .flatMap{ item => val timestamp = (item.get \ "time").extract[Long] SlidingEventTimeWindows .of(Time.days(90), Time.days(1), Time.hours(-8)) .assignWindows(null, timestamp, null) .map{ case timeWindow => RichAppViewEvent( s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \ "distinct_id").extract[String], (item.get \ "event").extract[String], timestamp, timeWindow.getEnd ) } }.uid("DianDianUserAppViewCount_FlatMap") .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) { override def extractTimestamp(element: RichAppViewEvent): Long = element.time }).uid("DianDianUserAppViewCount_Watermarker") .keyBy(_.key) .process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process") .writeUsingOutputFormat(new HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)] { // 30天毫秒值 1000 * 60 * 60 * 24 * 30 private val thirtyDaysForTimestamp = 259200L private val dateFormat = new SimpleDateFormat("MMdd") private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", classOf[Int])) override def processElement(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context, out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ // 根据窗口结束时间往前推30天 val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp // 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加 var currentValue = appViewCount.value() if (value.time >= beforeThirtyDaysStartTime && value.event.equals("$AppViewScreen")){ currentValue = currentValue + 1 appViewCount.update(currentValue) } // 发送到下游 out.collect( (value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue) ) // 设置cleanup timer registerCleanupTimer(value, ctx) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#OnTimerContext, out: Collector[(String, String, Int)]): Unit = { if (appViewCount != null){ appViewCount.clear() } } /** * 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话, * 则认为是迟到数据不进行处理。 * @param value * @param ctx * @return */ private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context): Boolean = { ctx.timerService().currentWatermark() >= value.windowEndTime } /** * 以窗口结束时间做为清理定时器的时间戳 * @param value * @param ctx */ private def registerCleanupTimer(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) = { val cleanupTime = value.windowEndTime ctx.timerService().registerEventTimeTimer(cleanupTime) } }
关于定期清理state,但是checkpoint size大小一直在增加的疑问
各位好,本人在使用Flink processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction open()里我初始化了value state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下 job以及processFunction代码 val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source") .map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \ "type").extract[String] == "track" && (eventJson \ "project_id").extract[Int] == 6 && (eventJson \ "properties" \ "$is_login_id").extract[Boolean] && (eventJson \ "time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None } catch { case _ => None } }).uid("DianDianUserAppViewCount_Map") .filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter") .flatMap{ item => val timestamp = (item.get \ "time").extract[Long] SlidingEventTimeWindows .of(Time.days(90), Time.days(1), Time.hours(-8)) .assignWindows(null, timestamp, null) .map{ case timeWindow => RichAppViewEvent( s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \ "distinct_id").extract[String], (item.get \ "event").extract[String], timestamp, timeWindow.getEnd ) } }.uid("DianDianUserAppViewCount_FlatMap") .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) { override def extractTimestamp(element: RichAppViewEvent): Long = element.time }).uid("DianDianUserAppViewCount_Watermarker") .keyBy(_.key) .process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process") .writeUsingOutputFormat(new HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)] { // 30天毫秒值 1000 * 60 * 60 * 24 * 30 private val thirtyDaysForTimestamp = 259200L private val dateFormat = new SimpleDateFormat("MMdd") private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", classOf[Int])) override def processElement(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context, out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ // 根据窗口结束时间往前推30天 val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp // 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加 var currentValue = appViewCount.value() if (value.time >= beforeThirtyDaysStartTime && value.event.equals("$AppViewScreen")){ currentValue = currentValue + 1 appViewCount.update(currentValue) } // 发送到下游 out.collect( (value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue) ) // 设置cleanup timer registerCleanupTimer(value, ctx) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#OnTimerContext, out: Collector[(String, String, Int)]): Unit = { if (appViewCount != null){ appViewCount.clear() } } /** * 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话, * 则认为是迟到数据不进行处理。 * @param value * @param ctx * @return */ private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context): Boolean = { ctx.timerService().currentWatermark() >= value.windowEndTime } /** * 以窗口结束时间做为清理定时器的时间戳 * @param value * @param ctx */ private def registerCleanupTimer(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) = { val cleanupTime = value.windowEndTime ctx.timerService().registerEventTimeTimer(cleanupTime) } }
关于定期清理state,但是checkpoint size大小一直在增加的疑问
各位好,本人在使用Flink processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction open()里我初始化了value state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下 job以及processFunction代码 val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source") .map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \ "type").extract[String] == "track" && (eventJson \ "project_id").extract[Int] == 6 && (eventJson \ "properties" \ "$is_login_id").extract[Boolean] && (eventJson \ "time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None } catch { case _ => None } }).uid("DianDianUserAppViewCount_Map") .filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter") .flatMap{ item => val timestamp = (item.get \ "time").extract[Long] SlidingEventTimeWindows .of(Time.days(90), Time.days(1), Time.hours(-8)) .assignWindows(null, timestamp, null) .map{ case timeWindow => RichAppViewEvent( s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \ "distinct_id").extract[String], (item.get \ "event").extract[String], timestamp, timeWindow.getEnd ) } }.uid("DianDianUserAppViewCount_FlatMap") .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) { override def extractTimestamp(element: RichAppViewEvent): Long = element.time }).uid("DianDianUserAppViewCount_Watermarker") .keyBy(_.key) .process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process") .writeUsingOutputFormat(new HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)] { // 30天毫秒值 1000 * 60 * 60 * 24 * 30 private val thirtyDaysForTimestamp = 259200L private val dateFormat = new SimpleDateFormat("MMdd") private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", classOf[Int])) override def processElement(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context, out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ // 根据窗口结束时间往前推30天 val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp // 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加 var currentValue = appViewCount.value() if (value.time >= beforeThirtyDaysStartTime && value.event.equals("$AppViewScreen")){ currentValue = currentValue + 1 appViewCount.update(currentValue) } // 发送到下游 out.collect( (value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue) ) // 设置cleanup timer registerCleanupTimer(value, ctx) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#OnTimerContext, out: Collector[(String, String, Int)]): Unit = { if (appViewCount != null){ appViewCount.clear() } } /** * 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话, * 则认为是迟到数据不进行处理。 * @param value * @param ctx * @return */ private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context): Boolean = { ctx.timerService().currentWatermark() >= value.windowEndTime } /** * 以窗口结束时间做为清理定时器的时间戳 * @param value * @param ctx */ private def registerCleanupTimer(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) = { val cleanupTime = value.windowEndTime ctx.timerService().registerEventTimeTimer(cleanupTime) } }
关于定期清理state,但是checkpoint size大小一直在增加的疑问
各位好,本人在使用Flink processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction open()里我初始化了value state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下 job以及processFunction代码 val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source") .map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \ "type").extract[String] == "track" && (eventJson \ "project_id").extract[Int] == 6 && (eventJson \ "properties" \ "$is_login_id").extract[Boolean] && (eventJson \ "time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None } catch { case _ => None } }).uid("DianDianUserAppViewCount_Map") .filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter") .flatMap{ item => val timestamp = (item.get \ "time").extract[Long] SlidingEventTimeWindows .of(Time.days(90), Time.days(1), Time.hours(-8)) .assignWindows(null, timestamp, null) .map{ case timeWindow => RichAppViewEvent( s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \ "distinct_id").extract[String], (item.get \ "event").extract[String], timestamp, timeWindow.getEnd ) } }.uid("DianDianUserAppViewCount_FlatMap") .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) { override def extractTimestamp(element: RichAppViewEvent): Long = element.time }).uid("DianDianUserAppViewCount_Watermarker") .keyBy(_.key) .process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process") .writeUsingOutputFormat(new HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)] { // 30天毫秒值 1000 * 60 * 60 * 24 * 30 private val thirtyDaysForTimestamp = 259200L private val dateFormat = new SimpleDateFormat("MMdd") private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", classOf[Int])) override def processElement(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context, out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ // 根据窗口结束时间往前推30天 val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp // 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加 var currentValue = appViewCount.value() if (value.time >= beforeThirtyDaysStartTime && value.event.equals("$AppViewScreen")){ currentValue = currentValue + 1 appViewCount.update(currentValue) } // 发送到下游 out.collect( (value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue) ) // 设置cleanup timer registerCleanupTimer(value, ctx) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#OnTimerContext, out: Collector[(String, String, Int)]): Unit = { if (appViewCount != null){ appViewCount.clear() } } /** * 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话, * 则认为是迟到数据不进行处理。 * @param value * @param ctx * @return */ private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context): Boolean = { ctx.timerService().currentWatermark() >= value.windowEndTime } /** * 以窗口结束时间做为清理定时器的时间戳 * @param value * @param ctx */ private def registerCleanupTimer(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) = { val cleanupTime = value.windowEndTime ctx.timerService().registerEventTimeTimer(cleanupTime) } }
Flink and kerberos
Hi guys, I am using kerberos for my kafka source. I pass the jaas config and krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/ -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf When I look at debug logs I see that the consumer was created with the following properties. 2019-08-29 06:49:18,298 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [sl73oprdbd018.visa.com:9092] check.crcs = true client.id = consumer-2 connections.max.idle.ms = 54 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 500 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer I can also see that the kerberos login is working fine. Here is the log for it: 2019-08-29 06:49:18,312 INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. 2019-08-29 06:49:18,313 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh thread started. 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid starting at: Thu Aug 29 06:49:18 GMT 2019 2019-08-29 06:49:18,314 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT expires: Thu Aug 29 16:49:18 GMT 2019 2019-08-29 06:49:18,315 INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh sleeping until: Thu Aug 29 15:00:10 GMT 2019 2019-08-29 06:49:18,316 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421 I then see this log : INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) *The problem is I do not see any error log but there is no data being processed by the consmer and it has been a nightmare to debug. * Thanks for all the help . Thanks,Vishwas
Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论
Hi 唐云 你这个方法我可以尝试下(目前我使用的是flink 1.6.2 ) PS:flink 1.9 的 state processor api,应该可以直接修改 savepoint 中的数据,修改下 kafka 的 offset 祝好 蒋涛涛 Yun Tang 于2019年8月29日周四 下午12:12写道: > Hi 蒋涛涛 > > 有一种比较hack的方式可以实现,代码里面source是需要根据uid来找到相关的state进行offset恢复,如果你不想通过checkpoint恢复source的state,可以在代码里面手动把source的uid给改掉,同时在从checkpoint恢复时带上 > --allowNonRestoredState 参数,这样kafka > source从恢复的checkpoint/savepoint里面找不到相关的source state,就会从你设置的offset进行恢复了。 > > 祝好 > 唐云 > > From: 蒋涛涛 > Sent: Thursday, August 29, 2019 11:45 > To: user-zh@flink.apache.org > Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 > > Hi Yun Tang, > > 其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed > state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置 > “auto.commit.enable” 为false,这个时候就不提交kafka > offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。 > > > Yun Tang 于2019年8月29日周四 上午10:57写道: > > > Hi 蒋涛涛 > > > > Flink的kafka consumer一共有三种offset commit模式: > > > > 1. OffsetCommitMode.DISABLED 完全disable offset的commit > > 2. OffsetCommitMode.ON_CHECKPOINTS Flink的默认行为,只有当Flink > > checkpoint完成时,才会将offset commit到Kafka > > 3. OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal > > client的默认行为,周期性将offset commit到kafka > > > > 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置 > > setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置 > > “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka > > consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed > > state相关吧),也可以从最新的offset消费。 > > > > 祝好 > > 唐云 > > > > From: wang jinhai > > Sent: Thursday, August 29, 2019 10:25 > > To: user-zh@flink.apache.org > > Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 > > > > 可以选择从之前的某个checkpoint恢复吧 > > > > > > 在 2019/8/29 上午10:01,“蒋涛涛” 写入: > > > > Hi everyone: > > > > > > > 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。 > > > > Regards, > > JackJiang > > > > >
Re: 全局并行度和算子并行度的关系
谢谢您的回复,那如果启动的时候只给了一个solt,算子并行度设置为2,最终也是按并行度为1去执行呢 > 在 2019年8月29日,10:54,pengcheng...@bonc.com.cn 写道: > > 你好,以我的理解,并行度的优先级setParallelism>命令>配置文件。 > 每个算子有多个并行度的话,每个并行度占一个slot。 > flink sql无法设置并行度。 > > > > pengcheng...@bonc.com.cn > > 发件人: ddwcg > 发送时间: 2019-08-29 10:18 > 收件人: user-zh > 主题: 全局并行度和算子并行度的关系 > hi, > 请问在作业启动时设置的并行度,和后面算子的并行度是什么关系? > 比如启动时设置为1,map算子设置为5,map(...).setParallelism(5),这个算子并行度的设置会起作用吗(因为它大于全局设置的1)? > 启动时设置的并行数是slot的个数,每个slot的资源是固定的(比如是1G内存),那么后面的算子增加了并行度,资源怎么划分? > > 另外flink sql的并行度是怎么设置的,我没有发现有setParallelism,只有一个最大并行度的设置:setMaxParallelism() > > > > 谢谢 > > > > >