Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-29 Thread SHI Xiaogang
Hi Datashov,

We faced similar problems in our production clusters.

Now both lauching and stopping of containers are performed in the main
thread of YarnResourceManager. As containers are launched and stopped one
after another, it usually takes long time to boostrap large jobs. Things
get worse when some node managers get lost. Yarn will retry many times to
communicate with them, leading to heartbeat timeout of TaskManagers.

Following are some efforts we made to help Flink deal with large jobs.

1. We provision some common jars in all cluster nodes and ask our users not
to include these jars in their uberjar. When containers bootstrap, these
jars are added to the classpath via JVM options. That way, we can
efficiently reduce the size of uberjars.

2. We deploys some asynchronous threads to launch and stop containers in
YarnResourceManager. The bootstrap time can be efficiently  reduced when
launching a large amount of containers. We'd like to contribute it to the
community very soon.

3. We deploys a timeout timer for each launching container. If a task
manager does not register in time after its container has been launched, a
new container will be allocated and launched. That will lead to certain
waste of resources, but can reduce the effects caused by slow or
problematic nodes.

Now the community is considering the refactoring of ResourceManager. I
think it will be the time for improving its efficiency.

Regards,
Xiaogang

Elkhan Dadashov  于2019年8月30日周五 上午7:10写道:

> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
> containers.
>
> The default HDFS replication is 3.
>
> *The Yarn queue is empty, and 800 containers  are allocated
> almost immediately  by Yarn  RM.*
>
> It takes very long time until all 800 nodes (node managers) will download
> Uberjar from HDFS to local machines.
>
> *Q1:*
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
> size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started
> TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> *Q2:*
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+
> containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15
> minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>   at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>


How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-29 Thread Elkhan Dadashov
Dear Flink developers,

Having  difficulty of getting  a Flink job started.

The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
containers.

The default HDFS replication is 3.

*The Yarn queue is empty, and 800 containers  are allocated
almost immediately  by Yarn  RM.*

It takes very long time until all 800 nodes (node managers) will download
Uberjar from HDFS to local machines.

*Q1:*

a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
size = HDFS replication size)

b) Or Do Flink TM's can replicate from each other  ? or  already started
TM's replicate  to  yet-started  nodes?

Most probably answer is (a), but  want to confirm.

*Q2:*

What  is the recommended way of handling  400MB+ Uberjar with 800+
containers ?

Any specific params to tune?

Thanks.

Because downloading the UberJar takes really   long time, after around 15
minutes since the job kicked, facing this exception:

org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request
to start container.
This token is expired. current time is 1567116179193 found 1567116001610
Note: System times on machines may be out of sync. Check system time
and time zones.
at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
Source)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
at 
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
Thanks, I'll check it out.

On Thu, Aug 29, 2019 at 1:08 PM David Morin 
wrote:

> Vishwas,
>
> A config that works on my Kerberized cluster (Flink on Yarn).
> I hope this will help you.
>
> Flink conf:
> security.kerberos.login.use-ticket-cache: true
> security.kerberos.login.keytab: /home/myuser/myuser.keytab
> security.kerberos.login.principal: myuser@
> security.kerberos.login.contexts: Client
>
> Properties related to security passed as argument of the
> FlinkKafkaConsumerXX constructor:
> sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule
> required username=\"myuser\" password=\"\";"
> sasl.mechanism=PLAIN
> security.protocol=SASL_SSL
>
> Le jeu. 29 août 2019 à 18:20, Vishwas Siravara  a
> écrit :
>
>> Hey David ,
>> My consumers are registered , here is the debug log. The problem is the
>> broker does not belong to me , so I can’t see what is going on there . But
>> this is a new consumer group , so there is no state yet .
>>
>>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
>> Consumer subtask 0 will start reading the following 40 partitions from the 
>> committed group offsets in Kafka: 
>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
>>
>> On Thu, Aug 29, 2019 at 11:39 AM David Morin 
>> wrote:
>>
>>> Hello Vishwas,
>>>
>>> You can use a keytab if you prefer. You generate a keytab for your user
>>> and then you can reference it in the Flink configuration.
>>> Then this keytab will be handled by Flink in a secure way and TGT will
>>> be created based on this keytab.
>>> However, that seems to be working.
>>> Did you check Kafka logs on the broker side ?
>>> Or did you check consumer offsets with Kafka tools in order to validate
>>> consumers are registered onto the different partitions of your topic ?
>>> You could try to switch to a different groupid for your consumer group
>>> in order to force parallel consumption.
>>>
>>> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara  a
>>> écrit :
>>>
 I see this log as well , but I can't see any messages . I know for a
 fact that the topic I am subscribed to has messages as I checked with a
 simple java consumer with a different group.


  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
 Consumer subtask 0 will start reading the following 40 partitions from the 
 committed 

Re: Flink and kerberos

2019-08-29 Thread David Morin
Vishwas,

A config that works on my Kerberized cluster (Flink on Yarn).
I hope this will help you.

Flink conf:
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/myuser/myuser.keytab
security.kerberos.login.principal: myuser@
security.kerberos.login.contexts: Client

Properties related to security passed as argument of the
FlinkKafkaConsumerXX constructor:
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule
required username=\"myuser\" password=\"\";"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL

Le jeu. 29 août 2019 à 18:20, Vishwas Siravara  a
écrit :

> Hey David ,
> My consumers are registered , here is the debug log. The problem is the
> broker does not belong to me , so I can’t see what is going on there . But
> this is a new consumer group , so there is no state yet .
>
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 0 will start reading the following 40 partitions from the 
> committed group offsets in Kafka: 
> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
>
> On Thu, Aug 29, 2019 at 11:39 AM David Morin 
> wrote:
>
>> Hello Vishwas,
>>
>> You can use a keytab if you prefer. You generate a keytab for your user
>> and then you can reference it in the Flink configuration.
>> Then this keytab will be handled by Flink in a secure way and TGT will be
>> created based on this keytab.
>> However, that seems to be working.
>> Did you check Kafka logs on the broker side ?
>> Or did you check consumer offsets with Kafka tools in order to validate
>> consumers are registered onto the different partitions of your topic ?
>> You could try to switch to a different groupid for your consumer group in
>> order to force parallel consumption.
>>
>> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara  a
>> écrit :
>>
>>> I see this log as well , but I can't see any messages . I know for a
>>> fact that the topic I am subscribed to has messages as I checked with a
>>> simple java consumer with a different group.
>>>
>>>
>>>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
>>> Consumer subtask 0 will start reading the following 40 partitions from the 
>>> committed group offsets in Kafka: 
>>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, 
>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, 
>>> 

Re: Flink 1.9, MapR secure cluster, high availability

2019-08-29 Thread Stephan Ewen
Hi Maxim!

The change of the MapR dependency should not have an impact on that.
Do you know if the same thing worked in prior Flink versions? Is that a
regression in 1.9?

The exception that you report, is that from Flink's HA services trying to
connect to ZK, or from the MapR FS client trying to connect to ZK?

Best,
Stephan


On Tue, Aug 27, 2019 at 11:03 AM Maxim Parkachov 
wrote:

> Hi everyone,
>
> I'm testing release 1.9 on MapR secure cluster. I took flink binaries from
> download page and trying to start Yarn session cluster. All MapR specific
> libraries and configs are added according to documentation.
>
> When I start yarn-session without high availability, it uses zookeeper
> from MapR distribution (org.apache.zookeeper) and correctly connects to
> cluster and access to maprfs works as expected.
>
> But if I add zookeeper as high-avalability option, instead of MapR
> zookeeper it tries to use shaded zookeeper and this one could not connect
> with mapr credentials:
>
> 2019-08-27 10:42:45,240 ERROR 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient
>   - An error: (java.security.PrivilegedActionException: 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum Member's  
> received SASL token. Zookeeper Client will go to AUTH_FAILED state.
> 2019-08-27 10:42:45,240 ERROR 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
> authentication with Zookeeper Quorum member failed: 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Failed to find any Kerberos tgt)]) occurred when evaluating 
> Zookeeper Quorum Member's  received SASL token. Zookeeper Client will go to 
> AUTH_FAILED state.
> I tried to use separate zookeeper cluster for HA, but maprfs still doesn't 
> work.
>
> Is this related to removal of MapR specific settings in Release 1.9 ?
> Should I still compile custom version of Flink with MapR dependencies ?
> (trying to do now, but getting some errors during compilation).
>
> Can I somehow force flink to use MapR zookeeper even with HA mode ?
>
> Thanks in advance,
> Maxim.
>
>


Re: End of Window Marker

2019-08-29 Thread Eduardo Winpenny Tejedor
Hi,

I'll chip in with an approach I'm trying at the moment that seems to work,
and I say seems because I'm only running this on a personal project.

Personally, I don't have anything against end-of-message markers per
partition, Padarn you seem to not prefer this option as it overloads the
meaning of the output payload. My approach is equally valid when producing
watermarks/end-of-message markers on a side output though.

The main problem of both approaches is knowing when the window has finished
across all partitions without having to wait for the start of the next
window.

I've taken the approach of sending all output messages of the window to 1.
the sink but also 2. a single task operator. The single task operator
registers an event time based timer at the time of the end of the window.
You have the confidence of the task's timer triggering only once at the
right time because all the post-window watermarks go through to the same
task. At that point I make the task send an end-of-message marker to every
partition. I don't need to send the count because Kafka messages are
ordered. AND IF you prefer to not overload the semantic of your original
Kafka topic you can post the message to a separate location of your choice.

While this does mean that the end of marker message only gets sent through
once the window has finished across all substreams (as opposed to per
stream), it does mean you don't need to wait for the next window to start
AND the watermark gap between substreams should never grow that much anyway.

This approach should be particularly useful when the number of partitions
or keying mechanism is different between the input and output topics.

Hopefully that doesn't sound like a terrible idea.

eduardo




On Wed, 28 Aug 2019, 02:54 Padarn Wilson,  wrote:

> Hi again Fabian,
>
> Thanks for pointing this out to me. In my case there is no need for keyed
> writing - but I do wonder if having each kafka task write only to a single
> partition would significantly affect performance.
>
> Actually now that I think about it, the approach to just wait for the
> first records of the next window is also subject to the problem you mention
> above: a producer lagging behind the rest could end up with a partition
> containing element out of ‘window order’.
>
> I was also thinking this problem is very similar to that of checkpoint
> barriers. I intended to dig into the details of the exactly once Kafka sink
> for some inspiration.
>
> Padarn
>
> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske  wrote:
>
>> Hi Padarn,
>>
>> Yes, this is quite tricky.
>> The "problem" with watermarks is that you need to consider how you write
>> to Kafka.
>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is
>> written by multiple producers), you need to broadcast the watermarks to
>> each partition, i.e., each partition would receive watermarks from each
>> parallel sink task. So in order to reason about the current watermark of a
>> partition, you need to observe them and take the minimum WM across all
>> current sink task WMs.
>> Things become much easier, if each partition is only written by a single
>> task but this also means that data is not key-partitioned in Kafka.
>> In that case, the sink task only needs to write a WM message to each of
>> its assigned partitions.
>>
>> Hope this helps,
>> Fabian
>>
>>
>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <
>> pad...@gmail.com>:
>>
>>> Hi Fabian, thanks for your input
>>>
>>> Exactly. Actually my first instinct was to see if it was possible to
>>> publish the watermarks somehow - my initial idea was to insert regular
>>> watermark messages into each partition of the stream, but exposing this
>>> seemed quite troublesome.
>>>
>>> > In that case, you could have a ProcessFunction that is chained before
>>> the sink and which counts the window results per time slice and emits the
>>> result when the watermark passes to a side output.
>>> All side output messages are collected by a single task and can be
>>> published to a Kafka topic or even be made available via Queryable State.
>>>
>>> I understand the idea here (and exactly once semantics are probably fine
>>> for my use case), but counting events seems a bit fragile. I'm not totally
>>> confident the consumer can guarantee it won't read duplicates (its a golang
>>> kafka library that seems to have some quirks).
>>>
>>> I think ideally each partition of the kafka topic would have some
>>> regular information about watermarks. Perhaps the kafka producer can be
>>> modified to support this.
>>>
>>> Padarn
>>>
>>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske  wrote:
>>>
 Hi Padarn,

 What you describe is essentially publishing Flink's watermarks to an
 outside system.
 Flink processes time windows, by waiting for a watermark that's past
 the window end time. When it receives such a WM it processes and emits all
 ended windows and forwards the watermark.
 When a 

Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
Hey David ,
My consumers are registered , here is the debug log. The problem is the
broker does not belong to me , so I can’t see what is going on there . But
this is a new consumer group , so there is no state yet .

 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
Consumer subtask 0 will start reading the following 40 partitions from
the committed group offsets in Kafka:
[KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]

On Thu, Aug 29, 2019 at 11:39 AM David Morin 
wrote:

> Hello Vishwas,
>
> You can use a keytab if you prefer. You generate a keytab for your user
> and then you can reference it in the Flink configuration.
> Then this keytab will be handled by Flink in a secure way and TGT will be
> created based on this keytab.
> However, that seems to be working.
> Did you check Kafka logs on the broker side ?
> Or did you check consumer offsets with Kafka tools in order to validate
> consumers are registered onto the different partitions of your topic ?
> You could try to switch to a different groupid for your consumer group in
> order to force parallel consumption.
>
> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara  a
> écrit :
>
>> I see this log as well , but I can't see any messages . I know for a fact
>> that the topic I am subscribed to has messages as I checked with a simple
>> java consumer with a different group.
>>
>>
>>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
>> Consumer subtask 0 will start reading the following 40 partitions from the 
>> committed group offsets in Kafka: 
>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, 
>> 

Re: Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?

2019-08-29 Thread Becket Qin
Hi Ashish,

You are right. Flink does not use Kafka based group management. So if you
have two clusters consuming the same topic, they will not divide the
partitions. The cross cluster HA is not quite possible at this point. It
would be good to know the reason you want to have such HA and see if Flink
meets you requirement in another way.

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 9:19 PM ashish pok  wrote:

> Looks like Flink is using “assign” partitions instead of “subscribe” which
> will not allow participating in a group if I read the code correctly.
>
> Has anyone solved this type of problem in past of active-active HA across
> 2 clusters using Kafka?
>
>
> - Ashish
>
> On Wednesday, August 28, 2019, 6:52 PM, ashish pok 
> wrote:
>
> All,
>
> I was wondering what the expected default behavior is when same app is
> deployed in 2 separate clusters but with same group Id. In theory idea was
> to create active-active across separate clusters but it seems like both
> apps are getting all the data from Kafka.
>
> Anyone else has tried something similar or have an insight on expected
> behavior? I was expecting to see partial data on both apps and to get all
> data in one app if other was turned off.
>
> Thanks in advance,
>
> - Ashish
>
>


Re: Flink and kerberos

2019-08-29 Thread David Morin
Hello Vishwas,

You can use a keytab if you prefer. You generate a keytab for your user and
then you can reference it in the Flink configuration.
Then this keytab will be handled by Flink in a secure way and TGT will be
created based on this keytab.
However, that seems to be working.
Did you check Kafka logs on the broker side ?
Or did you check consumer offsets with Kafka tools in order to validate
consumers are registered onto the different partitions of your topic ?
You could try to switch to a different groupid for your consumer group in
order to force parallel consumption.

Le jeu. 29 août 2019 à 09:57, Vishwas Siravara  a
écrit :

> I see this log as well , but I can't see any messages . I know for a fact
> that the topic I am subscribed to has messages as I checked with a simple
> java consumer with a different group.
>
>
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 0 will start reading the following 40 partitions from the 
> committed group offsets in Kafka: 
> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, 
> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
>
>
> On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara 
> wrote:
>
>> Hi guys,
>> I am using kerberos for my kafka source. I pass the jaas config and
>> krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf
>> -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/
>> -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf
>> -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf
>>
>> When I look at debug logs I see that the consumer was created with the
>> following properties.
>>
>> 2019-08-29 06:49:18,298 INFO  
>> org.apache.kafka.clients.consumer.ConsumerConfig  - 
>> ConsumerConfig values:
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [sl73oprdbd018.visa.com:9092]
>> check.crcs = true
>> client.id = consumer-2
>> connections.max.idle.ms = 54
>> enable.auto.commit = true
>> exclude.internal.topics = true
>> fetch.max.bytes = 52428800
>> fetch.max.wait.ms = 500
>> fetch.min.bytes = 1
>>
>>
>> group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
>> heartbeat.interval.ms = 3000
>> interceptor.classes = null
>> key.deserializer = class 
>> 

Re: problem with avro serialization

2019-08-29 Thread Debasish Ghosh
Any update on this ?

regards.

On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> Aljoscha opened a JIRA just recently for this issue:
> https://issues.apache.org/jira/browse/FLINK-12501.
>
> Do you know if this is a regression from previous Flink versions?
> I'm asking just to double check, since from my understanding of the issue,
> the problem should have already existed before.
>
> Thanks,
> Gordon
>
> On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh 
> wrote:
>
>> Hello -
>>
>> Facing an issue with avro serialization with Scala case classes generated
>> through avrohugger ..
>> Scala case classes generated by avrohugger has the avro schema in the
>> companion object. This is a sample generated class (details elided) ..
>>
>> case class Data(var id: Int, var name: String) extends
>> org.apache.avro.specific.SpecificRecordBase {
>>   def this() = this(0, "")
>>   def get(field$: Int): AnyRef = {
>> //..
>>   }
>>   def put(field$: Int, value: Any): Unit = {
>> //..
>>   }
>>   def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
>> }
>> object Data {
>>   val SCHEMA$ = new
>> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
>> }
>>
>> Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$
>> property in the class & is unable 2 use Java reflection 2 identify the
>> SCHEMA$ in the companion object. The exception that I get is the
>> following ..
>>
>> java.lang.RuntimeException: Serializing the source elements failed:
>>> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
>>> org.apache.avro.AvroRuntimeException: Not a Specific class: class
>>> pipelines.flink.avro.Data
>>
>>
>> Any help or workaround will be appreciated ..
>>
>> regards.
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


flink to HDFS Couldn't create proxy provider class ***.ha.ConfiguredFailoverProxyProvider

2019-08-29 Thread 马晓稳


Dear All


【我想咨询一下flink消费数据入HDFS的一个问题】


1. 因为我们有多个HDFS集群
2. 我采用的是将hdfs-site.xml和core-site.xml放到resources文件下


【问题】
java.io.IOException: Couldn't create proxy provider class 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
at 
org.apache.hadoop.hdfs.NameNodeProxies.createFailoverProxyProvider(NameNodeProxies.java:515)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:170)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createFailoverProxyProvider(NameNodeProxies.java:498)
... 18 more
Caused by: java.lang.RuntimeException: Could not find any configured addresses 
for URI hdfs://**/**/**/***.db/***
at 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.(ConfiguredFailoverProxyProvider.java:93)
... 23 more


【尝试】
1. 载入hadoop-hdfs-*.jar文件
2. 载入flink-connector-filesystem-*.jar文件


还是同样的报错


--
Marvin
Email: maxw1...@163.com







flink to HDFS Couldn't create proxy provider class ***.ha.ConfiguredFailoverProxyProvider

2019-08-29 Thread 马晓稳


Dear All


【我想咨询一下flink消费数据入HDFS的一个问题】


1. 因为我们有多个HDFS集群
2. 我采用的是将hdfs-site.xml和core-site.xml放到resources文件下


【问题】
java.io.IOException: Couldn't create proxy provider class 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
at 
org.apache.hadoop.hdfs.NameNodeProxies.createFailoverProxyProvider(NameNodeProxies.java:515)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:170)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createFailoverProxyProvider(NameNodeProxies.java:498)
... 18 more
Caused by: java.lang.RuntimeException: Could not find any configured addresses 
for URI hdfs://**/**/**/***.db/***
at 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.(ConfiguredFailoverProxyProvider.java:93)
... 23 more


【尝试】
1. 载入hadoop-hdfs-*.jar文件
2. 载入flink-connector-filesystem-*.jar文件


还是同样的报错


--
Marvin
Email: maxw1...@163.com







Re: Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?

2019-08-29 Thread ashish pok
Looks like Flink is using “assign” partitions instead of “subscribe” which will 
not allow participating in a group if I read the code correctly. 
Has anyone solved this type of problem in past of active-active HA across 2 
clusters using Kafka? 


- Ashish

On Wednesday, August 28, 2019, 6:52 PM, ashish pok  wrote:

All,
I was wondering what the expected default behavior is when same app is deployed 
in 2 separate clusters but with same group Id. In theory idea was to create 
active-active across separate clusters but it seems like both apps are getting 
all the data from Kafka. 
Anyone else has tried something similar or have an insight on expected 
behavior? I was expecting to see partial data on both apps and to get all data 
in one app if other was turned off.

Thanks in advance,

- Ashish




Left Anti-Join

2019-08-29 Thread ddwcg
hi,

I want to calculate the amount of consumption relative to the user added in the 
previous year, but the result of the following sql calculation is incorrect.

The "appendTable" is a table register from a appendStream


select a.years,a.shopId,a.userId, a.amount
from (select years,shopId,userId,sum(amount) amount from appendTable group by 
years,shopId,userId)a
where not exists (select  years,shopId,userId from appendTable b where 
cast(a.years as int) =cast(b.years as int)-1 and a.shopId=b.shopId and 
a.userId=b.userId)
;

thanks





Re: Loading dylibs

2019-08-29 Thread Yang Wang
Hi Vishwas,

I think it just because dylib is loaded more than once in a jvm
process(TaskManager).
Multiple tasks are deployed in one TaskManager and running in different
threads.
So if you want to make the dylib only loaded once, maybe you use the parent
classloader.
You could use the the following config option to set the packages to be
loaded by parent classloader.


*classloader.parent-first-patterns.additional: xxx.yyy.**

Have a try :)

Best,
Yang


Vishwas Siravara  于2019年8月28日周三 下午9:40写道:

> Yes this is exactly what happens , as a work around I created a small jar
> file which has code to load the dylib and I placed it under the lib folder
> , this library is in provided scope in my actual job, so the dylib gets
> loaded only once when the tm/jm jvm starts .
> What I found interesting in my old approach was even when I check whether
> the dylib has already been loaded in the current thread , and if it is I
> still get the unsatisfied link error even though that dylib is loaded in
> the task manager .
>
> On Wed, Aug 28, 2019 at 7:04 AM Aleksey Pak  wrote:
>
>> Hi Vishwas,
>>
>> There is a known issue in the Flink Jira project [1].
>> Is it possible that you have encountered the same problem?
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-11402
>>
>> Regards,
>> Aleksey
>>
>>
>> On Tue, Aug 27, 2019 at 8:03 AM Vishwas Siravara 
>> wrote:
>>
>>> Hi Jörn,
>>> I tried that. Here is my snippet :
>>>
>>> String[] loadedlibs =  
>>> getLoadedLibraries(Thread.currentThread().getContextClassLoader());
>>> if(!containsVibeSimpleLib(loadedlibs)) {
>>> System.loadLibrary("vibesimplejava");
>>> }
>>>
>>> Now I get the exception Unexpected errorjava.lang.UnsatisfiedLinkError:
>>> com.voltage.securedata.enterprise.ConstantsNative.DIGEST_MD5()I which means
>>> that it could not find vibesimplejava in the loaded libs but I know that
>>> the if was not executed because vibesimplejava was present in loadedlibs(
>>> the control never went inside the if block. Any other suggestions?
>>>
>>> Thanks,
>>> Vishwas
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Aug 27, 2019 at 12:25 AM Jörn Franke 
>>> wrote:
>>>
 I don’t know Dylibs in detail, but can you call a static method where
 it checks if it has been already executed and if not then it loads the
 library (Singleton pattern)?

 Am 27.08.2019 um 06:39 schrieb Vishwas Siravara :

 Hi guys,
 I have a flink application that loads a dylib like this

 System.loadLibrary("vibesimplejava");


 The application runs fine , when I restart the job I get this exception
 :

 com.visa.aip.cryptolib.aipcyptoclient.EncryptionException: Unexpected 
 errorjava.lang.UnsatisfiedLinkError: Native Library 
 /usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/libvibesimplejava.so
  already loaded in another classloader

 This happens because the dylib has already been loaded once by the
 taskmanger, how can I mitigate this? It seems problematic if two
 applications are loading the same dylib.

 Thanks,
 Vishwas




Re: 关于flink 写于kafka时的transactionId 生成问题

2019-08-29 Thread Wesley Peng

Hi

on 2019/8/29 17:50, ddwcg wrote:

broker就一个,flink集群的时钟确实和broker的不一样,是不同的机房,能自己指定transactionalId吗,两个机房的调整成一样怕影响其他的应用


AFAIK the transID is generated by systems.

regards.


Re: 关于flink 写于kafka时的transactionId 生成问题

2019-08-29 Thread ddwcg
broker就一个,flink集群的时钟确实和broker的不一样,是不同的机房,能自己指定transactionalId吗,两个机房的调整成一样怕影响其他的应用

> 在 2019年8月29日,17:45,Wesley Peng  写道:
> 
> Hi
> 
> on 2019/8/29 17:13, ddwcg wrote:
>> 作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报:
>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
>> attempted an operation with an old epoch. Either there is a newer producer 
>> with the same transactionalId, or the producer's transaction has been 
>> expired by the broker.
>> 错误,请问有什么办法可以避免这个问题?
> 
> Maybe you want to check all the broker and producers have the same timezone 
> setup, and all time are synchronous.
> 
> regards.
> 





Re: 关于flink 写于kafka时的transactionId 生成问题

2019-08-29 Thread Wesley Peng

Hi

on 2019/8/29 17:13, ddwcg wrote:

作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报:
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.
错误,请问有什么办法可以避免这个问题?


Maybe you want to check all the broker and producers have the same 
timezone setup, and all time are synchronous.


regards.


关于flink 写于kafka时的transactionId 生成问题

2019-08-29 Thread ddwcg
hi,

在写入kafka的时候自动生成了一个transactionId,请问这个id生成的方式是什么,我自己指定好像并不起作用。
作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报:
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.
错误,请问有什么办法可以避免这个问题?
下面是kafka的日志,一直在刷,很多都是好几天之前的任务了



2019-08-29 17:08:48,911] INFO [TransactionCoordinator id=0] Initialized 
transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, 
Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-3 with producerId 1011 and 
producer epoch 14255 on partition __transaction_state-36 
(kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:49,244] INFO [TransactionCoordinator id=0] Initialized 
transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, 
Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-4 with producerId 1012 and 
producer epoch 16544 on partition __transaction_state-35 
(kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:49,518] INFO [TransactionCoordinator id=0] Initialized 
transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, 
Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-1 with producerId 1013 and 
producer epoch 15460 on partition __transaction_state-38 
(kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:49,786] INFO [TransactionCoordinator id=0] Initialized 
transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, 
Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-2 with producerId 1014 and 
producer epoch 7781 on partition __transaction_state-37 
(kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:50,054] INFO [TransactionCoordinator id=0] Initialized 
transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, 
Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-0 with producerId 1015 and 
producer epoch 7529 on partition __transaction_state-39 
(kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:50,310] INFO [TransactionCoordinator id=0] Initialized 
transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, 
Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-3 with producerId 1011 and 
producer epoch 14256 on partition __transaction_state-36 
(kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:51,078] INFO [TransactionCoordinator id=0] Initialized 
transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, 
Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-4 with producerId 1012 and 
producer epoch 16545 on partition __transaction_state-35 
(kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:53,047] INFO [TransactionCoordinator id=0] Initialized 
transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, 
Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-1 with producerId 1013 and 
producer epoch 15461 on partition __transaction_state-38 
(kafka.coordinator.transaction.TransactionCoordinator)



一个FlinkJob消费多个kafka topic消息问题

2019-08-29 Thread 史 正超
1.平常工作中经常会有同一个统计表中会包含多个不同的统计指标,比如:post_count, send_count
2.然而这些指标来自不同的kafka 消息体
3.有没有在不用uninon all的情况下,向sink 表中写入各自topic的数据,因为union all有很多0值来填充



Re: Flink and kerberos

2019-08-29 Thread Vishwas Siravara
I see this log as well , but I can't see any messages . I know for a fact
that the topic I am subscribed to has messages as I checked with a simple
java consumer with a different group.


 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
Consumer subtask 0 will start reading the following 40 partitions from
the committed group offsets in Kafka:
[KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16},
KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]


On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara 
wrote:

> Hi guys,
> I am using kerberos for my kafka source. I pass the jaas config and
> krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf
> -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/
> -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf
> -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf
>
> When I look at debug logs I see that the consumer was created with the
> following properties.
>
> 2019-08-29 06:49:18,298 INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig  - 
> ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = latest
> bootstrap.servers = [sl73oprdbd018.visa.com:9092]
> check.crcs = true
> client.id = consumer-2
> connections.max.idle.ms = 54
> enable.auto.commit = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
>
>
> group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 500
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> 

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-08-29 Thread Tony Wei
Hi,

Has anyone run into the same problem? I have updated my producer
transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active
controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I
had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske  於 2019年8月16日 週五 下午4:10寫道:

> Hi Tony,
>
> I'm sorry I cannot help you with this issue, but Becket (in CC) might have
> an idea what went wrong here.
>
> Best, Fabian
>
> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
> tony19920...@gmail.com>:
>
>> Hi,
>>
>> Currently, I was trying to update our kafka cluster with larger `
>> transaction.max.timeout.ms`. The
>> original setting is kafka's default value (i.e. 15 minutes) and I tried
>> to set as 3 hours.
>>
>> When I was doing rolling-restart for my brokers, this exception came to
>> me on the next checkpoint
>> after I restarted the broker with active controller.
>>
>> java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748) Caused by:
>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>>> failed, logging first encountered failure at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5
>>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
>>> The producer attempted a transactional operation in an invalid state
>>
>>
>> I have no idea why it happened, and I didn't find any error log from
>> brokers. Does anyone have
>> this exception before? How can I prevent from this exception when I tried
>> to restart kafka cluster?
>> Does this exception mean that I will lost data in some of these
>> transactions?
>>
>> flink cluster version: 1.8.1
>> kafka cluster version: 1.0.1
>> flink kafka producer version: universal
>> producer transaction timeout: 15 minutes
>> checkpoint interval: 5 minutes
>> number of concurrent checkpoint: 1
>> max checkpoint duration before and after the exception occurred:  < 2
>> minutes
>>
>> Best,
>> Tony Wei
>>
>


????????????state??????checkpoint size????????????????????

2019-08-29 Thread Bo?????WJay
??Flink 
processFunction??state??flatmapendTimekey+endTimekeybyprocessFunction
 open()value 
state??processElement()state??Timerontimer()??statestate??
 ??checkpoint 
size??8checkpoint 
size??1.6??Gstate??
  jobprocessFunction val stream = 
env.addSource(consumer).uid("DianDianUserAppViewCount_Source") 
.map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \ 
"type").extract[String] == "track" && (eventJson \ 
"project_id").extract[Int] == 6 && (eventJson \ "properties" \ 
"$is_login_id").extract[Boolean] && (eventJson \ 
"time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None   
} catch { case _ => None   } }).uid("DianDianUserAppViewCount_Map") 
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter") 
.flatMap{   item => val timestamp = (item.get \ "time").extract[Long]   
  SlidingEventTimeWindows .of(Time.days(90), Time.days(1), 
Time.hours(-8)) .assignWindows(null, timestamp, null) 
.map{ case timeWindow => RichAppViewEvent( s"${(item.get \ 
"distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \ 
"distinct_id").extract[String], (item.get \ "event").extract[String], 
timestamp, timeWindow.getEnd ) } 
}.uid("DianDianUserAppViewCount_FlatMap") 
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) { 
override def extractTimestamp(element: RichAppViewEvent): Long = element.time   
  }).uid("DianDianUserAppViewCount_Watermarker") .keyBy(_.key) 
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process") 
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class 
ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, 
(String, String, Int)] { // 30  1000 * 60 * 60 * 24 * 30 private val 
thirtyDaysForTimestamp = 259200L private val dateFormat = new 
SimpleDateFormat("MMdd") private lazy val appViewCount: ValueState[Int] = 
getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", 
classOf[Int])) override def processElement(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context, 
out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ // 
??30?? val beforeThirtyDaysStartTime = value.windowEndTime 
- thirtyDaysForTimestamp // 
??30??$AppViewScreen?? var currentValue 
= appViewCount.value() if (value.time >= beforeThirtyDaysStartTime &&   
value.event.equals("$AppViewScreen")){ currentValue = currentValue + 1 
appViewCount.update(currentValue)   } // ?? out.collect( 
(value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue)
   ) // cleanup timer registerCleanupTimer(value, ctx) }   } override 
def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#OnTimerContext, out: 
Collector[(String, String, Int)]): Unit = { if (appViewCount != null){ 
appViewCount.clear() }   } /** * 
watermarker * 
 * @param value * @param ctx * @return */ 
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = { 
ctx.timerService().currentWatermark() >= value.windowEndTime   } /** * 
 * @param value * @param ctx */ private 
def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= { val cleanupTime = value.windowEndTime 
ctx.timerService().registerEventTimeTimer(cleanupTime)   } }

关于定期清理state,但是checkpoint size大小一直在增加的疑问

2019-08-29 Thread 陈赋赟
各位好,本人在使用Flink 
processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction
 open()里我初始化了value 
state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话
 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint 
size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下


job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => {
try {
val eventJson = parse(rawEvent)

if ((eventJson \ "type").extract[String] == "track" &&
(eventJson \ "project_id").extract[Int] == 6 &&
(eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
(eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
  } catch {
case _ => None
  }
}).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{
  item =>
val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows
.of(Time.days(90), Time.days(1), Time.hours(-8))
.assignWindows(null, timestamp, null)
.map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
)
}
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker")
.keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")


class ProcessAppViewCount extends KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)] {

// 30天毫秒值  1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 259200L
private val dateFormat = new SimpleDateFormat("MMdd")

private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new 
ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))

override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {

if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp

// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()

if (value.time >= beforeThirtyDaysStartTime &&
  value.event.equals("$AppViewScreen")){
currentValue = currentValue + 1
appViewCount.update(currentValue)
  }

// 发送到下游
out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1), 
currentValue)
  )

// 设置cleanup timer
registerCleanupTimer(value, ctx)
}

  }

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {

if (appViewCount != null){
appViewCount.clear()
}

  }

/**
* 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
* 则认为是迟到数据不进行处理。
* @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime
  }

/**
* 以窗口结束时间做为清理定时器的时间戳
* @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= {
val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime)
  }
}




 





 





 

关于定期清理state,但是checkpoint size大小一直在增加的疑问

2019-08-29 Thread 陈赋赟
各位好,本人在使用Flink 
processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction
 open()里我初始化了value 
state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话
 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint 
size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下


job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => {
try {
val eventJson = parse(rawEvent)

if ((eventJson \ "type").extract[String] == "track" &&
(eventJson \ "project_id").extract[Int] == 6 &&
(eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
(eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
  } catch {
case _ => None
  }
}).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{
  item =>
val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows
.of(Time.days(90), Time.days(1), Time.hours(-8))
.assignWindows(null, timestamp, null)
.map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
)
}
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker")
.keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")


class ProcessAppViewCount extends KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)] {

// 30天毫秒值  1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 259200L
private val dateFormat = new SimpleDateFormat("MMdd")

private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new 
ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))

override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {

if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp

// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()

if (value.time >= beforeThirtyDaysStartTime &&
  value.event.equals("$AppViewScreen")){
currentValue = currentValue + 1
appViewCount.update(currentValue)
  }

// 发送到下游
out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1), 
currentValue)
  )

// 设置cleanup timer
registerCleanupTimer(value, ctx)
}

  }

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {

if (appViewCount != null){
appViewCount.clear()
}

  }

/**
* 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
* 则认为是迟到数据不进行处理。
* @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime
  }

/**
* 以窗口结束时间做为清理定时器的时间戳
* @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= {
val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime)
  }
}




 





 

关于定期清理state,但是checkpoint size大小一直在增加的疑问

2019-08-29 Thread 陈赋赟
各位好,本人在使用Flink 
processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction
 open()里我初始化了value 
state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话
 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint 
size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下


job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => {
try {
val eventJson = parse(rawEvent)

if ((eventJson \ "type").extract[String] == "track" &&
(eventJson \ "project_id").extract[Int] == 6 &&
(eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
(eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
  } catch {
case _ => None
  }
}).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{
  item =>
val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows
.of(Time.days(90), Time.days(1), Time.hours(-8))
.assignWindows(null, timestamp, null)
.map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
)
}
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker")
.keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")


class ProcessAppViewCount extends KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)] {

// 30天毫秒值  1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 259200L
private val dateFormat = new SimpleDateFormat("MMdd")

private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new 
ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))

override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {

if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp

// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()

if (value.time >= beforeThirtyDaysStartTime &&
  value.event.equals("$AppViewScreen")){
currentValue = currentValue + 1
appViewCount.update(currentValue)
  }

// 发送到下游
out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1), 
currentValue)
  )

// 设置cleanup timer
registerCleanupTimer(value, ctx)
}

  }

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {

if (appViewCount != null){
appViewCount.clear()
}

  }

/**
* 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
* 则认为是迟到数据不进行处理。
* @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime
  }

/**
* 以窗口结束时间做为清理定时器的时间戳
* @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= {
val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime)
  }
}




 

关于定期清理state,但是checkpoint size大小一直在增加的疑问

2019-08-29 Thread 陈赋赟
各位好,本人在使用Flink 
processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction
 open()里我初始化了value 
state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话
 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint 
size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下


job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => {
try {
val eventJson = parse(rawEvent)

if ((eventJson \ "type").extract[String] == "track" &&
(eventJson \ "project_id").extract[Int] == 6 &&
(eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
(eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
  } catch {
case _ => None
  }
}).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{
  item =>
val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows
.of(Time.days(90), Time.days(1), Time.hours(-8))
.assignWindows(null, timestamp, null)
.map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
)
}
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker")
.keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")


class ProcessAppViewCount extends KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)] {

// 30天毫秒值  1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 259200L
private val dateFormat = new SimpleDateFormat("MMdd")

private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new 
ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))

override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {

if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp

// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()

if (value.time >= beforeThirtyDaysStartTime &&
  value.event.equals("$AppViewScreen")){
currentValue = currentValue + 1
appViewCount.update(currentValue)
  }

// 发送到下游
out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1), 
currentValue)
  )

// 设置cleanup timer
registerCleanupTimer(value, ctx)
}

  }

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {

if (appViewCount != null){
appViewCount.clear()
}

  }

/**
* 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
* 则认为是迟到数据不进行处理。
* @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime
  }

/**
* 以窗口结束时间做为清理定时器的时间戳
* @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= {
val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime)
  }
}

Flink and kerberos

2019-08-29 Thread Vishwas Siravara
Hi guys,
I am using kerberos for my kafka source. I pass the jaas config and
krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf
-Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.-Linux-x86_64-64b-r234867/lib/
-Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf
-Djava.security.krb5.conf=/home/was/Jaas/krb5.conf

When I look at debug logs I see that the consumer was created with the
following properties.

2019-08-29 06:49:18,298 INFO
org.apache.kafka.clients.consumer.ConsumerConfig  -
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [sl73oprdbd018.visa.com:9092]
check.crcs = true
client.id = consumer-2
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1


group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 1
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer


I can also see that the kerberos login is working fine. Here is the log for it:



2019-08-29 06:49:18,312 INFO
org.apache.kafka.common.security.authenticator.AbstractLogin  -
Successfully logged in.
2019-08-29 06:49:18,313 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh
thread started.
2019-08-29 06:49:18,314 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid
starting at: Thu Aug 29 06:49:18 GMT 2019
2019-08-29 06:49:18,314 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT
expires: Thu Aug 29 16:49:18 GMT 2019
2019-08-29 06:49:18,315 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh
sleeping until: Thu Aug 29 15:00:10 GMT 2019
2019-08-29 06:49:18,316 WARN
org.apache.kafka.clients.consumer.ConsumerConfig  - The
configuration 'zookeeper.connect' was supplied but isn't a known
config.
2019-08-29 06:49:18,316 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
version : 0.10.2.0
2019-08-29 06:49:18,316 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
commitId : 576d93a8dc0cf421


I then see this log :

INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633
rack: null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)



*The problem is I do not see any error log but there is no data being
processed by the consmer and it has been a nightmare to debug. *


Thanks for all the help .


Thanks,Vishwas


Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-29 Thread 蒋涛涛
Hi 唐云

你这个方法我可以尝试下(目前我使用的是flink 1.6.2 )

PS:flink 1.9 的 state processor api,应该可以直接修改 savepoint 中的数据,修改下 kafka 的
offset

祝好
蒋涛涛

Yun Tang  于2019年8月29日周四 下午12:12写道:

> Hi 蒋涛涛
>
> 有一种比较hack的方式可以实现,代码里面source是需要根据uid来找到相关的state进行offset恢复,如果你不想通过checkpoint恢复source的state,可以在代码里面手动把source的uid给改掉,同时在从checkpoint恢复时带上
> --allowNonRestoredState 参数,这样kafka
> source从恢复的checkpoint/savepoint里面找不到相关的source state,就会从你设置的offset进行恢复了。
>
> 祝好
> 唐云
> 
> From: 蒋涛涛 
> Sent: Thursday, August 29, 2019 11:45
> To: user-zh@flink.apache.org 
> Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论
>
> Hi  Yun Tang,
>
> 其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed
> state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置
> “auto.commit.enable” 为false,这个时候就不提交kafka
> offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。
>
>
> Yun Tang  于2019年8月29日周四 上午10:57写道:
>
> > Hi 蒋涛涛
> >
> > Flink的kafka consumer一共有三种offset commit模式:
> >
> >   1.  OffsetCommitMode.DISABLED   完全disable offset的commit
> >   2.  OffsetCommitMode.ON_CHECKPOINTS   Flink的默认行为,只有当Flink
> > checkpoint完成时,才会将offset commit到Kafka
> >   3.  OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal
> > client的默认行为,周期性将offset commit到kafka
> >
> > 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置
> > setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置
> > “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka
> > consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed
> > state相关吧),也可以从最新的offset消费。
> >
> > 祝好
> > 唐云
> > 
> > From: wang jinhai 
> > Sent: Thursday, August 29, 2019 10:25
> > To: user-zh@flink.apache.org 
> > Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论
> >
> > 可以选择从之前的某个checkpoint恢复吧
> >
> >
> > 在 2019/8/29 上午10:01,“蒋涛涛” 写入:
> >
> > Hi everyone:
> >
> >
> >
> 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。
> >
> > Regards,
> > JackJiang
> >
> >
>


Re: 全局并行度和算子并行度的关系

2019-08-29 Thread ddwcg
谢谢您的回复,那如果启动的时候只给了一个solt,算子并行度设置为2,最终也是按并行度为1去执行呢

> 在 2019年8月29日,10:54,pengcheng...@bonc.com.cn 写道:
> 
> 你好,以我的理解,并行度的优先级setParallelism>命令>配置文件。
> 每个算子有多个并行度的话,每个并行度占一个slot。
> flink sql无法设置并行度。
> 
> 
> 
> pengcheng...@bonc.com.cn
> 
> 发件人: ddwcg
> 发送时间: 2019-08-29 10:18
> 收件人: user-zh
> 主题: 全局并行度和算子并行度的关系
> hi,
> 请问在作业启动时设置的并行度,和后面算子的并行度是什么关系?
> 比如启动时设置为1,map算子设置为5,map(...).setParallelism(5),这个算子并行度的设置会起作用吗(因为它大于全局设置的1)?
> 启动时设置的并行数是slot的个数,每个slot的资源是固定的(比如是1G内存),那么后面的算子增加了并行度,资源怎么划分?
> 
> 另外flink sql的并行度是怎么设置的,我没有发现有setParallelism,只有一个最大并行度的设置:setMaxParallelism()
> 
> 
> 
> 谢谢
> 
> 
> 
> 
>