Re: 如何按照数据量,可用内存,核为flink job分配合适的资源

2021-08-23 Thread 张锴
不是,你应该认错了

yidan zhao  于2021年8月24日周二 下午12:50写道:

> 你是zhangkai30吗~
>
> 张锴  于2021年8月24日周二 上午11:16写道:
>
> > 我们数据量大概一天6个T,只做一个简单的处理,不涉及计算,在资源够用的情况下,用命令行的方式启动flink job
> > 需要给多少比较合适,比如jobmanager内存 ,taskmanager内存, slot ,   taskmanager数量 ,并行度p,
> > 具体是怎么考虑的呢?
> >
> > 对于大规模的数据量经验还比较浅,有大佬给指明一下吗
> >
>


Re: map concurrent modification exception analysis when checkpoint

2021-08-23 Thread yidan zhao
The issue has been resolved, as I said in the previous email. It is caused
by the async function, every record processed by the async function will be
a state in the async operator, which is a map type(UserAccessLog).

Arvid Heise  于2021年8月23日周一 下午11:26写道:

> I don't see anything suspicious in your code. The stacktrace is also for a
> MapSerializer. Do you have another operator where you put Map into a custom
> state?
>
> On Fri, Aug 20, 2021 at 6:43 PM yidan zhao  wrote:
>
>> But, I do not know why this leads to the job's failure and recovery
>> since I have set the tolerable failed checkpoint to Integer.MAX_VALUE.
>> Due to the failure, my task manager failed because of the task cancel
>> timeout, and about 80% of task managers went down due to cancel
>> timeout.
>>
>> yidan zhao  于2021年8月21日周六 上午12:35写道:
>> >
>> > Ok, thanks. I have some result, and you can give some ensure. Here is
>> > the issue code:
>> >
>> > The async function's implementation. It do async redis query, and fill
>> > some data back.
>> > In code [ currentBatch.get(i).getD().put("ipLabel",
>> > objects.getResponses().get(i)); ] the getD() returns a map attr in
>> > OriginalUserAccessLog class.
>> > The issue occurred when ckpt is doing, and the redis query result
>> > returns concurrently when the async function's input queue is being
>> > serialized.
>> >
>> >
>> > @Override
>> > public void asyncInvoke0(OriginalUserAccessLog input,
>> >  ResultFuture
>> resultFuture) {
>> > inputBuffer.add(input);
>> > if (inputBuffer.size() >= 1000) {
>> > List currentBatch = inputBuffer;
>> > inputBuffer = new ArrayList<>();
>> >
>> > RBatch rBatch = redissonClient.createBatch();
>> >
>> > for (OriginalUserAccessLog i : currentBatch) {
>> > rBatch.getBucket("ip:" + i.getIp()).getAsync();
>> > }
>> >
>> > rBatch.executeAsync().onComplete((objects, throwable) -> {
>> > if (throwable == null) {
>> > for (int i = 0; i < currentBatch.size(); i++) {
>> > currentBatch.get(i).getD().put("ipLabel",
>> > objects.getResponses().get(i));
>> > }
>> > }
>> > resultFuture.complete(currentBatch);
>> > });
>> >
>> > } else {
>> > resultFuture.complete(Collections.emptyList());
>> > }
>> > }
>> >
>> > Chesnay Schepler  于2021年8月20日周五 上午1:56写道:
>> > >
>> > > Essentially this exception means that the state was modified while a
>> > > snapshot was being taken.
>> > >
>> > > We usually see this when users hold on to some state value beyond a
>> > > single call to a user-defined function, particularly from different
>> threads.
>> > >
>> > > We may be able to pinpoint the issue if you were to provide us with
>> the
>> > > functions.
>> > >
>> > > On 19/08/2021 16:59, yidan zhao wrote:
>> > > > Flink web ui shows the exception as follows.
>> > > > In the task (ual_transform_UserLogBlackUidJudger ->
>> > > > ual_transform_IpLabel ), the first one is a broadcast process
>> > > > function, and the second one is an async function. I do not know
>> > > > whether the issues have some relation to it.
>> > > >
>> > > > And the issues not occurred before, it occurred after I upgraded to
>> > > > flink 1.13.2.
>> > > >
>> > > >
>> > > >
>> > > > _exception info from flink web ui:_
>> > > > java.io.IOException: Could not perform checkpoint 58 for operator
>> > > > ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
>> > > > (29/60)#0.
>> > > >
>> > > > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>> > > >
>> > > > at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>> > > >
>> > > > at
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>> > > >
>> > > > at
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>> > > >
>> > > > at
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>> > > >
>> > > > at
>> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>> > > >
>> > > > at
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>> > > >
>> > > > at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>> > > >
>> > > > at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>> > > >
>> > > > at 

Re: 如何按照数据量,可用内存,核为flink job分配合适的资源

2021-08-23 Thread yidan zhao
你是zhangkai30吗~

张锴  于2021年8月24日周二 上午11:16写道:

> 我们数据量大概一天6个T,只做一个简单的处理,不涉及计算,在资源够用的情况下,用命令行的方式启动flink job
> 需要给多少比较合适,比如jobmanager内存 ,taskmanager内存, slot ,   taskmanager数量 ,并行度p,
> 具体是怎么考虑的呢?
>
> 对于大规模的数据量经验还比较浅,有大佬给指明一下吗
>


如何按照数据量,可用内存,核为flink job分配合适的资源

2021-08-23 Thread 张锴
我们数据量大概一天6个T,只做一个简单的处理,不涉及计算,在资源够用的情况下,用命令行的方式启动flink job
需要给多少比较合适,比如jobmanager内存 ,taskmanager内存, slot ,   taskmanager数量 ,并行度p,
具体是怎么考虑的呢?

对于大规模的数据量经验还比较浅,有大佬给指明一下吗


Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-23 Thread Rion Williams
Hi David,

Thanks again for the response, I believe that I'm getting pretty close for
at least a POC-level implementation of this. Currently, I'm working with
JsonObject instances throughout the pipeline, so I wanted to try this out
and simply stored the routing information within the element itself for
simplicity's sake right now, so it has a shape that looks something like
this:

{
"route": {
"hosts": "...",
"index": "...",
...
},
"all-other-fields-here"
}

And I've stripped back several of the layers of the routers (since I
already have all of the information in the element at that point). I tried
using something like this:

class DynamicElasticsearchSink: RichSinkFunction(),
CheckpointedFunction {
private val sinkRoutes: MutableMap> = ConcurrentHashMap()
private lateinit var configuration: Configuration

override fun open(parameters: Configuration) {
configuration = parameters
}

override fun invoke(element: JsonObject, context: SinkFunction.Context) {
val route = getHost(element)
// Check if we already have a router for this cluster
var sink = sinkRoutes[route]
if (sink == null) {
// If not, create one
sink = buildSinkFromRoute(element)
sink.open(configuration)
sinkRoutes[route] = sink
}

sink.invoke(element, context)
}

override fun initializeState(context: FunctionInitializationContext) {
// No-op.
}

override fun snapshotState(context: FunctionSnapshotContext) {
// This is used only to flush pending writes.
for (sink in sinkRoutes.values) {
sink.snapshotState(context)
}
}

override fun close() {
for (sink in sinkRoutes.values) {
sink.close()
}
}

private fun buildSinkFromRoute(element: JsonObject, ho):
ElasticsearchSink {
val builder = ElasticsearchSink.Builder(
buildHostsFromElement(element),
ElasticsearchRoutingFunction()
)

builder.setBulkFlushMaxActions(1)

// TODO: Configure authorization if available
//builder.setRestClientFactory { restClient ->
//restClient.setHttpClientConfigCallback(object :
RestClientBuilder.HttpClientConfigCallback {
//override fun customizeHttpClient(builder:
HttpAsyncClientBuilder): HttpAsyncClientBuilder {
//// Configure authorization here
//val credentialsProvider =
BasicCredentialsProvider().apply {
//setCredentials(
//AuthScope.ANY,
//UsernamePasswordCredentials("$USERNAME",
"$PASSWORD")
//)
//}
//
//return
builder.setDefaultCredentialsProvider(credentialsProvider);
//}
//})
//}

return builder.build()
}

private fun buildHostsFromElement(element: JsonObject): List{
val transportAddresses = element
.get("route").asJsonObject
.get("hosts").asString

// If there are multiple, they should be comma-delimited
val addresses = transportAddresses.split(",")
return addresses
.filter { address -> address.isNotEmpty() }
.map { address ->
HttpHost.create(address)
}
}

private fun getHost(element: JsonObject): String {
return element
.get("route").asJsonObject
.get("hosts").asString
}

private class ElasticsearchRoutingFunction:
ElasticsearchSinkFunction {
override fun process(element: JsonObject, context:
RuntimeContext, indexer: RequestIndexer) {
indexer.add(request(element))
}

private fun request(element: JsonObject): IndexRequest {
// Access routing information
val index = element
.get("route").asJsonObject
.get("index").asString

// Strip off routing information
element.remove("route")

// Send the request
return Requests.indexRequest()
.index(index)
.type("_doc")
.source(mapOf(
"data" to "$element"
))
}
}
}

After running an integration test, I keep encountering running into the
following error during the invocation of the child sink:

// The runtime context has not been initialized.
sink.invoke(element, context)

I can see the underlying sink getting initialized, the open call being
made, etc. however for some reason it looks like there's an issue related
to the context during the invoke call namely* "The runtime context has not
been initialized". *I had assumed this would be alright since the context
for the "wrapper" should have already been initialized, but maybe there's
something that I'm missing.


Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
Actually, we are using the `FlinkKafkaConsumer` [0] rather than
`KafkaSource`. Is there a way to disable the consumer metrics using
`FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?


[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam  wrote:

> Thanks Arvid! I will give this a try and report back.
>
> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise  wrote:
>
>> Hi Kevin,
>>
>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
>> have been loaded. [1]
>> If you only see that after a while, it's indicating that there is a
>> classloader leak. I suspect that this is because of Kafka metrics. There
>> have been some reports in the past.
>> You can try to see what happens when you disable the forwarding of the
>> Kafka metrics with register.consumer.metrics: false [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>
>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam  wrote:
>>
>>> Hi all,
>>>
>>> I'm observing an issue sometimes, and it's been hard to reproduce, where
>>> task managers are not able to register with the Flink cluster. We provision
>>> only the number of task managers required to run a given application, and
>>> so the absence of any of the task managers causes the job to enter a crash
>>> loop where it fails to get the required task slots.
>>>
>>> The failure occurs after a job has been running for a while, and when
>>> there have been job and task manager restarts. We run in kubernetes so pod
>>> disruptions occur from time to time, however we're running using the high
>>> availability setup [0]
>>>
>>> Has anyone encountered this before? Any suggestions?
>>>
>>> Below are some error messages pulled from the task managers failing to
>>> re-register.
>>>
>>> ```
>>> ] - Starting DefaultLeaderRetrievalService with
>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>> 2021-08-16 13:15:10,112 INFO
>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>> Starting DefaultLeaderElectionService with
>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-restserver-leader.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-resourcemanager-leader.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-dispatcher-leader.
>>> 2021-08-16 13:15:10,211 INFO
>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>> - Starting DefaultLeaderRetrievalService with
>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>>> 2021-08-16 13:16:26,103 WARN
>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>> - Error while retrieving the leader gateway. Retrying to connect to
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>> 2021-08-16 13:16:30,978 WARN
>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>> - Error while retrieving the leader gateway. Retrying to connect to
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>> ```
>>>
>>> ```
>>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>>>[] - Uncaught exception in thread
>>> 'kafka-producer-network-thread |
>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>>> at
>>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>>> ~[?:?]
>>> at
>>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>>> ~[?:?]
>>> at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>>> at
>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>>> ~[?:?]
>>> at
>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>> ~[?:?]
>>> at java.lang.Thread.run(Unknown Source) [?:?]
>>> Caused by: java.lang.ClassNotFoundException:

Re: Kafka Metrics

2021-08-23 Thread Arvid Heise
Hi Mason,

We always appreciate contributions but I think that this metric reporter
would need to be beefed up quite a bit to be of general use. In this case,
it works because there is an existing relatively narrow group to deny, but
there are plenty of other cases where this isn't the case (like most
Flink-provided job/task/operator metrics):
- "I don't want the KafkaConsumer metrics, except that exact once" (so also
an include list?), or
- "I only want Kafka metrics from the JM" (rules depending on the scope?),
or
- "I don't care about perSecond IO metrics, just give me the raw counts"
(support for patterns?).

To support those we'd first need some form of categorization (which
MetricGroups could be, but currently aren't); we can't expect users to
setup a deny list for 50 metrics as it just doesn't scale nor is it
maintainable for them, especially if we add more metrics in the future.

If you are still interested: the first step would be a ticket on
https://issues.apache.org/jira/ where we would discuss some options until
we reach an agreement. Then, you would be assigned and can start to
implement.

Alternatively, you could also write a (Flink) blog post where you outline
your solution and provide your code. Then other advanced users could follow
your approach and implement their own version of reporter that fits their
needs.

On Mon, Aug 23, 2021 at 7:07 PM Mason Chen  wrote:

> Sweet, I suspected it but I thought I might ask anyway.
>
> Consequently, I've implemented a deny list feature for my reporter (based
> on groupNameKey and metricName). The reporter will skip reporting metrics
> if a metric's variables set contains keys that map to the groupNameKey and
> if the metric has a name equal to the specified metricName.
>
> Configurations are specified as follows
> `groupNameKey1:metricName1;groupNameKey2:metricName2`. Thus, I can deny
> list KafkaConsumer (group name key) and committed_offsets (metric name)
> which correspond to the legacy kafka metrics.
>
> Would Flink appreciate this as a contribution? I can see this being used
> generically over all reporters.
>
> Best,
> Mason
>
> On Mon, Aug 23, 2021 at 8:21 AM Arvid Heise  wrote:
>
>> Hi Mason,
>>
>> I'm afraid it's an all-or-nothing. Either you get the proxied metrics
>> with all partitions or none.
>>
>> You could also implement a custom MetricReporter that delegates to your
>> actual reporter and filters the respective metrics.
>>
>> Best,
>>
>> Arvid
>>
>> On Fri, Aug 20, 2021 at 8:16 AM Mason Chen 
>> wrote:
>>
>>> FYI, I'm referring to the legacy offsets metric gauges.
>>>
>>> On Thu, Aug 19, 2021 at 4:53 PM Mason Chen 
>>> wrote:
>>>
 Hi all,

 We have found that the per partition Kafka metrics contributes to a lot
 of metrics being indexed by our metrics system.

 We would still like to have the proxied kafka metrics from the kafka
 clients library. Is there a flag to only exclude Flink's additional Kafka
 metrics?

 Best,
 Mason




Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
Thanks Arvid! I will give this a try and report back.

On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise  wrote:

> Hi Kevin,
>
> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
> have been loaded. [1]
> If you only see that after a while, it's indicating that there is a
> classloader leak. I suspect that this is because of Kafka metrics. There
> have been some reports in the past.
> You can try to see what happens when you disable the forwarding of the
> Kafka metrics with register.consumer.metrics: false [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>
> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> I'm observing an issue sometimes, and it's been hard to reproduce, where
>> task managers are not able to register with the Flink cluster. We provision
>> only the number of task managers required to run a given application, and
>> so the absence of any of the task managers causes the job to enter a crash
>> loop where it fails to get the required task slots.
>>
>> The failure occurs after a job has been running for a while, and when
>> there have been job and task manager restarts. We run in kubernetes so pod
>> disruptions occur from time to time, however we're running using the high
>> availability setup [0]
>>
>> Has anyone encountered this before? Any suggestions?
>>
>> Below are some error messages pulled from the task managers failing to
>> re-register.
>>
>> ```
>> ] - Starting DefaultLeaderRetrievalService with
>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>> 2021-08-16 13:15:10,112 INFO
>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>> Starting DefaultLeaderElectionService with
>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-restserver-leader.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-resourcemanager-leader.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-dispatcher-leader.
>> 2021-08-16 13:15:10,211 INFO
>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>> - Starting DefaultLeaderRetrievalService with
>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>> 2021-08-16 13:16:26,103 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>> 2021-08-16 13:16:30,978 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>> ```
>>
>> ```
>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>>  [] - Uncaught exception in thread
>> 'kafka-producer-network-thread |
>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>> at
>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>> ~[?:?]
>> at
>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>> ~[?:?]
>> at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>> ~[?:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>> ~[?:?]
>> at java.lang.Thread.run(Unknown Source) [?:?]
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.clients.NetworkClient$1
>> at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>> at
>> 

Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

2021-08-23 Thread Zbyszko Papierski
Yes, it turned out that we only configured TM<->JM communication correctly
- inter TM config was missing, hence being "reject all". Thanks for the
suggestion!

On Mon, Aug 23, 2021 at 5:29 PM Arvid Heise  wrote:

> It rather looks to me as if the task manager can not communicate with each
> other. Can you check your network policies? Are they allowed to communicate
> on random ports?
>
> On Mon, Aug 23, 2021 at 8:37 AM Zbyszko Papierski <
> zpapier...@wikimedia.org> wrote:
>
>> Hi,
>>
>> No, they don't - only the job is being restarted after that, without any
>> luck. Exception I provided is added to a exceptions list of the job itself.
>>
>> On Mon, Aug 23, 2021 at 4:50 AM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> This might be that some task managers cannot reach out to the job
>>> manager in time. Has any of the task manager instance restarted after this
>>> failure? If yes, what does the log (Flink log and kubernetes log) of the
>>> failed task manager say?
>>>
>>> Zbyszko Papierski  于2021年8月20日周五 下午11:07写道:
>>>
 Hi!

 We're trying to successfully deploy our application to our Kubernetes
 cluster and we seem to have hit a snag. Long story short - any kind of
 deployment that involves a cluster of more than 1 TM seem to fail our job
 almost immediately with this exception:

 org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> Sending the partition request to 'null' failed.
> at
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
> at
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
> buffer.memory = 33554432
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.nio.channels.ClosedChannelException
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
> ... 11 more


 I don't have enough experience to judge from the rest of the logs what
 can be the reason for that, but I'm including the debug logs that I can get
 from kubectl, both JM and TM.
 We use a session cluster deployed on Kubernetes (not Kubernetes native,
 there are still some issues why we can't use it) and we deploy our app to
 that cluster. We have confirmed that everything works when there's a single
 Task Manager, but 

Re: Kafka Metrics

2021-08-23 Thread Mason Chen
Sweet, I suspected it but I thought I might ask anyway.

Consequently, I've implemented a deny list feature for my reporter (based
on groupNameKey and metricName). The reporter will skip reporting metrics
if a metric's variables set contains keys that map to the groupNameKey and
if the metric has a name equal to the specified metricName.

Configurations are specified as follows
`groupNameKey1:metricName1;groupNameKey2:metricName2`. Thus, I can deny
list KafkaConsumer (group name key) and committed_offsets (metric name)
which correspond to the legacy kafka metrics.

Would Flink appreciate this as a contribution? I can see this being used
generically over all reporters.

Best,
Mason

On Mon, Aug 23, 2021 at 8:21 AM Arvid Heise  wrote:

> Hi Mason,
>
> I'm afraid it's an all-or-nothing. Either you get the proxied metrics with
> all partitions or none.
>
> You could also implement a custom MetricReporter that delegates to your
> actual reporter and filters the respective metrics.
>
> Best,
>
> Arvid
>
> On Fri, Aug 20, 2021 at 8:16 AM Mason Chen  wrote:
>
>> FYI, I'm referring to the legacy offsets metric gauges.
>>
>> On Thu, Aug 19, 2021 at 4:53 PM Mason Chen 
>> wrote:
>>
>>> Hi all,
>>>
>>> We have found that the per partition Kafka metrics contributes to a lot
>>> of metrics being indexed by our metrics system.
>>>
>>> We would still like to have the proxied kafka metrics from the kafka
>>> clients library. Is there a flag to only exclude Flink's additional Kafka
>>> metrics?
>>>
>>> Best,
>>> Mason
>>>
>>>


Re: DataStream to Table API

2021-08-23 Thread Matthias Broecheler
Perfect, that worked.

Thanks a lot, JING!

On Sun, Aug 22, 2021 at 1:25 AM JING ZHANG  wrote:

> Hi Matthias,
> Before the bug is fixed, you could specify the return type explicitly in
> the second parameter of the map function.
>
> DataStream rows = integers.map(i -> Row.of("Name"+i, i));   ->
>
> DataStream rows = integers.map(i -> Row.of("Name"+i, i), new 
> RowTypeInfo(Types.STRING, Types.INT));
>
> Best,
> JING ZHANG
>
>
>
> Matthias Broecheler  于2021年8月21日周六 上午12:40写道:
>
>> Thank you, Caizhi, for looking into this and identifying the source of
>> the bug. Is there a way to work around this at the API level until this bug
>> is resolved? Can I somehow "inject" the type?
>>
>> Thanks a lot for your help,
>> Matthias
>>
>> On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng 
>> wrote:
>>
>>> Hi!
>>>
>>> I've created a JIRA ticket[1] for this issue. Please check it out and
>>> track the progress there.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23885
>>>
>>> Caizhi Weng  于2021年8月20日周五 上午10:47写道:
>>>
 Hi!

 This is because TypeExtractor#getMapReturnTypes are not dealing with
 row types (see that method and also TypeExtractor#privateGetForClass). You
 might want to open a JIRA ticket for this.

 Matthias Broecheler  于2021年8月20日周五 上午7:01写道:

> Hey Flinkers,
>
> I am trying to follow the docs
> 
>  to
> convert a DataStream to a Table. Specifically, I have a DataStream of Row
> and want the columns of the row to become the columns of the resulting
> table.
>
> That works but only if I construct the Rows statically. If I construct
> them dynamically (in a map) then Flink turns the entire Row into one 
> column
> of type "RAW('org.apache.flink.types.Row', '...')".
>
> Does anybody know why this is the case or how to fix it? Take a look
> at the simple Flink program below where I construct the DataStream "rows"
> in two different ways. I would expect those to be identical (and the sink
> does print identical information) but the inferred table schema is
> different.
>
> Thanks a ton,
> Matthias
>
> --
>
> StreamExecutionEnvironment flinkEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
> DataStream integers = flinkEnv.fromElements(12, 5);
>
> DataStream rows = integers.map(i -> Row.of("Name"+i, i));
>
> //  This alternative way of constructing this data stream produces the 
> expected table schema
> //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 
> 12), Row.of("Name5", 5));
>
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(flinkEnv);
> Table table = tableEnv.fromDataStream(rows);
> table.printSchema();
>
> rows.addSink(new PrintSinkFunction<>());
>
> flinkEnv.execute();
>
>


Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-23 Thread David Morávek
Hi Rion,

Sorry for late reply, I've missed your previous message. Thanks Arvid for
the reminder <3.

something like a MessageWrapper and pass those
> elements to the sink, which would create the tenant-specific Elastic
> connection from the ConfigurationT element and handle caching it and then
> just grab the element and send it on it's way?


Yes, this is exactly what I had in mind. There should be almost no overhead
as sink can be easily chained with your join (KeyedCoProcessFunction)
function.
-
-
>
> The shape of the elements being evicted from the process function (Is a
> simple wrapper with the configuration for the sink enough here? Do I need
> to explicitly initialize the sink within this function? Etc.)

-
- To write an element you need a configuration for the destination and the
element itself, so a tuple of *(ElasticConfiguration, Element)* should be
enough (that's basically your MessageWrapper
class).
-
-
>
> The actual use of the *DynamicElasticsearchSink* class (Would it just be
> something like an *.addSink(**DynamicElasticSearchSink<**String,
> Configuration>())* or perhaps something else entirely?)

-
I guess it could look something like the snippet below. It would be
definitely good to play around with the *DynamicElasticSearchSink* API and
make it more meaningful / user friendly (the gist I've shared was just a
very rough prototype to showcase the idea).

- static class Destination {

private final List httpHosts;

Destination(List httpHosts) {
this.httpHosts = httpHosts;
}
}
-
- final DataStream> toWrite = ...;
toWrite.addSink(
new DynamicElasticsearchSink<>(
new SinkRouter<
Tuple2,
String,
ElasticsearchSink>>() {

@Override
public String getRoute(Tuple2
element) {
- // Construct a deterministic unique caching key
for the destination... (this could be cheaper if you know the data)
return element.f0.httpHosts.stream()
.map(HttpHost::toHostString)
.collect(Collectors.joining(","));
}

@Override
public ElasticsearchSink>
createSink(
String cacheKey, Tuple2
element) {
return new ElasticsearchSink.Builder<>(
element.f0.httpHosts,
(ElasticsearchSinkFunction<
Tuple2>)
(el, ctx, indexer) -> {
// Construct index
request.
final IndexRequest
request = ...;
indexer.add(request);
})
.build();
}
}));

I hope this helps ;)

Best,
D.


On Mon, Aug 16, 2021 at 5:18 PM Rion Williams  wrote:

> Thanks for this suggestion David, it's extremely helpful.
>
> Since this will vary depending on the elements retrieved from a separate
> stream, I'm guessing something like the following would be roughly the
> avenue to continue down:
>
> fun main(args: Array) {
> val parameters = mergeParametersFromProperties(args)
> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>
> // Get the stream for tenant-specific Elastic configurations
> val connectionStream = stream
> .fromSource(
> KafkaSource.of(parameters, listOf("elastic-configs")),
> WatermarkStrategy.noWatermarks(),
> "elastic-configs"
> )
>
> // Get the stream of incoming messages to be routed to Elastic
> stream
> .fromSource(
> KafkaSource.of(parameters, listOf("messages")),
> WatermarkStrategy.noWatermarks(),
> "messages"
> )
> .keyBy { message ->
> // Key by the tenant in the message
> message.getTenant()
> }
> .connect(
> // Connect the messages stream with the configurations
> connectionStream
> )
> .process(object : KeyedCoProcessFunction String>() {
> // For this key, we need to store all of the previous messages in 
> state
> // in the case where we don't have a given mapping for this 
> tenant yet
> lateinit var messagesAwaitingConfigState: ListState
> lateinit var configState: ValueState
>
> override fun open(parameters: Configuration) {
> super.open(parameters)
> // Initialize the states
> messagesAwaitingConfigState = 
> runtimeContext.getListState(awaitingStateDesc)
> 

Can't start FlinkKafkaProducer using SSL

2021-08-23 Thread Wouter Zorgdrager
Hi all,

I'm trying to deploy a FlinkKafkaProducer in PyFlink on a remote cluster.
Unfortunately, I'm getting the following exception:

Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.
KafkaException: Failed to construct kafka producer
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.(KafkaProducer.java:432)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.(KafkaProducer.java:298)
at org.apache.flink.streaming.connectors.kafka.internals.
FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.createProducer(FlinkKafkaProducer.java:1230)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.initProducer(FlinkKafkaProducer.java:1346)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.beginTransaction(FlinkKafkaProducer.java:990)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.beginTransaction(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.beginTransactionInternal(
TwoPhaseCommitSinkFunction.java:403)
at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:
394)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.initializeState(FlinkKafkaProducer.java:1195)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.tryRestoreFunction(StreamingFunctionUtils.java:189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.restoreFunctionState(StreamingFunctionUtils.java:171)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.initializeOperatorState(StreamOperatorStateHandler.java:118)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:290)
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:436)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
StreamTask.java:574)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
StreamTask.java:554)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.
KafkaException: javax.security.auth.login.LoginException: unable to find
LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils
.createChannelBuilder(ClientUtils.java:99)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.newSender(KafkaProducer.java:450)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.(KafkaProducer.java:421)
... 22 more
Caused by: javax.security.auth.login.LoginException: unable to find
LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:
195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:
680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.
AbstractLogin.login(AbstractLogin.java:60)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.
LoginManager.(LoginManager.java:62)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.
LoginManager.acquireLoginManager(LoginManager.java:105)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
SaslChannelBuilder.configure(SaslChannelBuilder.java:147)
... 27 more

My Kafka 

Flink 1.13.1 Kafka Producer Error

2021-08-23 Thread Debraj Manna
I am trying to use flink kafka producer like below

public static FlinkKafkaProducer createProducer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());

return new FlinkKafkaProducer<>(
"FlinkSdmKafkaTopic",
new SerializationSchema("FlinkSdmKafkaTopic", 8),
props,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}

private static class SerializationSchema implements
KafkaSerializationSchema {
final String topic;
final int numPartitions;

public SerializationSchema(final String topic, final int numPartitions) {
this.topic = topic;
this.numPartitions = numPartitions;
}

@Override
public ProducerRecord
serialize(SelfDescribingMessageDO sdm, @Nullable Long aLong) {
return new ProducerRecord<>(topic,
KafkaPublisher.getPartitionId(sdm.getHashKey(), numPartitions),
sdm.getHashKey().getBytes(StandardCharsets.UTF_8),
sdm.toByteArray());
}
}

I am getting the below exception when trying to deploy the flink job.
During unit tests I am not getting this error.

2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source:
MetricSource -> Filter -> MetricStoreMapper -> (Filter ->
Timestamps/Watermarks -> Map -> Flat Map, Sink: FlinkKafkaProducer11,
Sink: TSDBSink14) (5/8)#0 transitionState:1069 Source: MetricSource ->
Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map
-> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0
(5764a387ede7d6710bcf3ad4e248) switched from INITIALIZING to
FAILED with failure cause: org.apache.kafka.common.KafkaException:
Failed to construct kafka producer
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an
instance of 

Re: Flink's Kafka Offset Management

2021-08-23 Thread Arvid Heise
It's basically as you have said.

If you resume from a checkpoint/savepoint (./bin flink run -s ),
Flink will always use the offset that has been stored inside it.
If you don't resume from a checkpoint, it depends on how you have
configured the consumer. If you have supplied a group.id and left the other
configurations as they have been. It uses that offset from the group.
How recent that offset is, depends on your consumer configuration (synced
on checkpoint or auto-committed after each read).

On Mon, Aug 23, 2021 at 10:30 AM Pranjul Ahuja  wrote:

> I use FlinkKafkaConsumer to consume Kafka and enable checkpoints. Now I'm
> a little confused about the offset management and checkpoint mechanism.
>
> What is the behavior if I stop the application by executing the yarn
> application -kill appId and run the start command like ./bin flink run ...?
> Will flink get the offset from a checkpoint or from the group-id managed by
> Kafka?
>
> Right now, I am doing the checkpoints on HDFS but not executing the flink
> with the checkpoint directory so I am assuming that the Kafka consumer is
> picking the offsets from kafka for the particular group.id. I have also
> validated from the kafka-consumer-groups.sh tool that once the flink is
> restarted, it picks offsets from the last commit in Kafka topic.
>
>
>


Re: Keystore format limitations for TLS

2021-08-23 Thread Alexander Fedulov
Hi Alexis,

the first step would be to verify whether the keystore that you are trying
to use is compatible with the Java version inside of your Docker container
( even before involving any of Flink specifics). Try the following:

   - Run your Flink Docker container locally
   - Mount a folder with your certificate into this container at startup
   - Open a shell into this running connector, locate the "keytool" utility
   and try to use it to import the certificate

Best,

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner



On Mon, Aug 16, 2021 at 7:52 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hello,
>
> I am trying to configure TLS communication for a Flink cluster running on
> Kubernetes. I am currently using the BCFKS format and setting that as
> default via javax.net.ssl.keystoretype and javax.net.ssl.truststoretype
> (which are injected in the environment variable FLINK_ENV_JAVA_OPTS). The
> task manager is failing with "Invalid Keystore format", so I'm wondering if
> there are special limitations with regards to supported TLS configurations?
>
> Regards,
> Alexis.
>
>


Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

2021-08-23 Thread Arvid Heise
It rather looks to me as if the task manager can not communicate with each
other. Can you check your network policies? Are they allowed to communicate
on random ports?

On Mon, Aug 23, 2021 at 8:37 AM Zbyszko Papierski 
wrote:

> Hi,
>
> No, they don't - only the job is being restarted after that, without any
> luck. Exception I provided is added to a exceptions list of the job itself.
>
> On Mon, Aug 23, 2021 at 4:50 AM Caizhi Weng  wrote:
>
>> Hi!
>>
>> This might be that some task managers cannot reach out to the job manager
>> in time. Has any of the task manager instance restarted after this failure?
>> If yes, what does the log (Flink log and kubernetes log) of the failed task
>> manager say?
>>
>> Zbyszko Papierski  于2021年8月20日周五 下午11:07写道:
>>
>>> Hi!
>>>
>>> We're trying to successfully deploy our application to our Kubernetes
>>> cluster and we seem to have hit a snag. Long story short - any kind of
>>> deployment that involves a cluster of more than 1 TM seem to fail our job
>>> almost immediately with this exception:
>>>
>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
 Sending the partition request to 'null' failed.
 at
 org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
 at
 org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
 at
 org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
 at
 org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
 at
 org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
 buffer.memory = 33554432
 at
 org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
 at
 org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
 at
 org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)

 app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)

 app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
 at
 org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
 at
 org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
 at
 org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
 at
 org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
 at
 org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
 at
 org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
 at
 org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
 at
 org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
 at
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
 at
 org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 at
 org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 at java.base/java.lang.Thread.run(Thread.java:834)
 Caused by: java.nio.channels.ClosedChannelException
 at
 org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
 ... 11 more
>>>
>>>
>>> I don't have enough experience to judge from the rest of the logs what
>>> can be the reason for that, but I'm including the debug logs that I can get
>>> from kubectl, both JM and TM.
>>> We use a session cluster deployed on Kubernetes (not Kubernetes native,
>>> there are still some issues why we can't use it) and we deploy our app to
>>> that cluster. We have confirmed that everything works when there's a single
>>> Task Manager, but we rather not continue with that limitation. The way we
>>> define the cluster itself on k8s is shown here [1] and the chart for the
>>> deployment itself can be found here [2]. App we're deploying is available
>>> here [3]. We're running Flink 1.21.1 on openjdk-jre 11.
>>>
>>>  Since I overslept k8s revolution a bit and am somewhat 

Re: map concurrent modification exception analysis when checkpoint

2021-08-23 Thread Arvid Heise
I don't see anything suspicious in your code. The stacktrace is also for a
MapSerializer. Do you have another operator where you put Map into a custom
state?

On Fri, Aug 20, 2021 at 6:43 PM yidan zhao  wrote:

> But, I do not know why this leads to the job's failure and recovery
> since I have set the tolerable failed checkpoint to Integer.MAX_VALUE.
> Due to the failure, my task manager failed because of the task cancel
> timeout, and about 80% of task managers went down due to cancel
> timeout.
>
> yidan zhao  于2021年8月21日周六 上午12:35写道:
> >
> > Ok, thanks. I have some result, and you can give some ensure. Here is
> > the issue code:
> >
> > The async function's implementation. It do async redis query, and fill
> > some data back.
> > In code [ currentBatch.get(i).getD().put("ipLabel",
> > objects.getResponses().get(i)); ] the getD() returns a map attr in
> > OriginalUserAccessLog class.
> > The issue occurred when ckpt is doing, and the redis query result
> > returns concurrently when the async function's input queue is being
> > serialized.
> >
> >
> > @Override
> > public void asyncInvoke0(OriginalUserAccessLog input,
> >  ResultFuture
> resultFuture) {
> > inputBuffer.add(input);
> > if (inputBuffer.size() >= 1000) {
> > List currentBatch = inputBuffer;
> > inputBuffer = new ArrayList<>();
> >
> > RBatch rBatch = redissonClient.createBatch();
> >
> > for (OriginalUserAccessLog i : currentBatch) {
> > rBatch.getBucket("ip:" + i.getIp()).getAsync();
> > }
> >
> > rBatch.executeAsync().onComplete((objects, throwable) -> {
> > if (throwable == null) {
> > for (int i = 0; i < currentBatch.size(); i++) {
> > currentBatch.get(i).getD().put("ipLabel",
> > objects.getResponses().get(i));
> > }
> > }
> > resultFuture.complete(currentBatch);
> > });
> >
> > } else {
> > resultFuture.complete(Collections.emptyList());
> > }
> > }
> >
> > Chesnay Schepler  于2021年8月20日周五 上午1:56写道:
> > >
> > > Essentially this exception means that the state was modified while a
> > > snapshot was being taken.
> > >
> > > We usually see this when users hold on to some state value beyond a
> > > single call to a user-defined function, particularly from different
> threads.
> > >
> > > We may be able to pinpoint the issue if you were to provide us with the
> > > functions.
> > >
> > > On 19/08/2021 16:59, yidan zhao wrote:
> > > > Flink web ui shows the exception as follows.
> > > > In the task (ual_transform_UserLogBlackUidJudger ->
> > > > ual_transform_IpLabel ), the first one is a broadcast process
> > > > function, and the second one is an async function. I do not know
> > > > whether the issues have some relation to it.
> > > >
> > > > And the issues not occurred before, it occurred after I upgraded to
> > > > flink 1.13.2.
> > > >
> > > >
> > > >
> > > > _exception info from flink web ui:_
> > > > java.io.IOException: Could not perform checkpoint 58 for operator
> > > > ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
> > > > (29/60)#0.
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> > > >
> > > > at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> > > >
> > > > at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> > > >
> > > > at org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
> > > >
> > > > at
> 

Re: Kafka Metrics

2021-08-23 Thread Arvid Heise
Hi Mason,

I'm afraid it's an all-or-nothing. Either you get the proxied metrics with
all partitions or none.

You could also implement a custom MetricReporter that delegates to your
actual reporter and filters the respective metrics.

Best,

Arvid

On Fri, Aug 20, 2021 at 8:16 AM Mason Chen  wrote:

> FYI, I'm referring to the legacy offsets metric gauges.
>
> On Thu, Aug 19, 2021 at 4:53 PM Mason Chen  wrote:
>
>> Hi all,
>>
>> We have found that the per partition Kafka metrics contributes to a lot
>> of metrics being indexed by our metrics system.
>>
>> We would still like to have the proxied kafka metrics from the kafka
>> clients library. Is there a flag to only exclude Flink's additional Kafka
>> metrics?
>>
>> Best,
>> Mason
>>
>>


Re: Looking for suggestions about multithreaded CEP to be used with flink

2021-08-23 Thread Arvid Heise
I'm afraid that Flink CEP is built rather for lots of data on a couple of
rules.

Using any kind of other library would push down the state management and
data distribution onto you and negate the main idea of using CEP in the
first place.

The question is if some of the patterns could be aggregated in a way that
fewer operators overall are needed. I was assuming that this kind of
optimization already happens but maybe it's insufficient.

Could you share your patterns (in an anonymous way)?


On Fri, Aug 20, 2021 at 1:46 AM Tejas B  wrote:

> Hi,
> Here's our use case :
> We are planning to build a rule based engine on top of flink with huge
> number of rules(1000s). the rules could be stateless or stateful.
> Example stateless rule is : A.id = 3 && A.name = 'abc' || A.color = red.
> Example stateful rule is : A is event.id =3, B is event.name = 'abc', C
> is event.color = red and we are looking for pattern AB*C over time window
> of 1 hour.
>
> Now we have tried to use flink CEP for this purpose and program crashed
> because of lot of threads. The explanation is : every application of
> CEP.pattern creates a new operator in the graph and flink can't support
> that many vertices in a graph.
>
> Other approach could be to use processFunction in flink, but still to run
> the rules on events stream you'd have to use some sort of CEP or write your
> own.
>
> My question is, does anybody have any other suggestions on how to achieve
> this ? Any other CEPs that integrate and work better with flink (siddhi,
> jasper, drools) ? Any experience would be helpful.
>


Re: Task Managers having trouble registering after restart

2021-08-23 Thread Arvid Heise
Hi Kevin,

"java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
have been loaded. [1]
If you only see that after a while, it's indicating that there is a
classloader leak. I suspect that this is because of Kafka metrics. There
have been some reports in the past.
You can try to see what happens when you disable the forwarding of the
Kafka metrics with register.consumer.metrics: false [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties

On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam  wrote:

> Hi all,
>
> I'm observing an issue sometimes, and it's been hard to reproduce, where
> task managers are not able to register with the Flink cluster. We provision
> only the number of task managers required to run a given application, and
> so the absence of any of the task managers causes the job to enter a crash
> loop where it fails to get the required task slots.
>
> The failure occurs after a job has been running for a while, and when
> there have been job and task manager restarts. We run in kubernetes so pod
> disruptions occur from time to time, however we're running using the high
> availability setup [0]
>
> Has anyone encountered this before? Any suggestions?
>
> Below are some error messages pulled from the task managers failing to
> re-register.
>
> ```
> ] - Starting DefaultLeaderRetrievalService with
> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
> 2021-08-16 13:15:10,112 INFO
>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-restserver-leader.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-resourcemanager-leader.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-dispatcher-leader.
> 2021-08-16 13:15:10,211 INFO
>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
> - Starting DefaultLeaderRetrievalService with
> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
> 2021-08-16 13:16:26,103 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
> 2021-08-16 13:16:30,978 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
> ```
>
> ```
> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>  [] - Uncaught exception in thread
> 'kafka-producer-network-thread |
> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
> at
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
> ~[?:?]
> at
> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
> ~[?:?]
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
> ~[?:?]
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.clients.NetworkClient$1
> at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> 

Re: Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-23 Thread Kevin Lam
Hi,

I was able to understand what was causing this. We were using the restart
strategy `fixed-delay` with a maximum number of restarts set to 10. Using
exponential-delay resolved the issue of restarting the job from fresh.

On Thu, Aug 19, 2021 at 2:04 PM Chesnay Schepler  wrote:

> How do you deploy Flink on Kubernetes? Do you use the standalone
> 
> or native
> 
> mode?
>
> Is it really just task managers going down? It seems unlikely that the
> loss of a TM could have such an effect.
>
> Can you provide us with the JobManager logs at the time the TM crash
> occurred? They should contain some hints as to how Flink handled the TM
> failure.
>
> On 19/08/2021 16:06, Kevin Lam wrote:
>
> Hi all,
>
> I've noticed that sometimes when task managers go down--it looks like the
> job is not restored from checkpoint, but instead restarted from a fresh
> state (when I go to the job's checkpoint tab in the UI, I don't see the
> restore, and the number in the job overview all get reset). Under what
> circumstances does this happen?
>
> I've been trying to debug and we really want the job to restore from the
> checkpoint at all times for our use case.
>
> We're running Apache Flink 1.13 on Kubernetes in a high availability
> set-up.
>
> Thanks in advance!
>
>
>


Re: Apache Flink matrics are not alligned in the reporter

2021-08-23 Thread Jawad Tahir
Hi Chesnay,

Thank you for your message. My reporter interval is set to 1 second.

On Thu, 19 Aug 2021 at 19:51, Chesnay Schepler  wrote:

> What reporter interval do you have configured?
>
> On 19/08/2021 13:31, Jawad Tahir wrote:
> > Hi,
> >
> > I have defined Graphite as my matrics reporter with my Flink
> > (v1.13.2). My pipeline is pretty simple. It consists of one source,
> > one stateful operator (simple window aggregation), and one sink
> > (operations-playground, basically). I have set the parallel factor as
> > 2. The graph of the pipeline is as follows:
> >
> > [Flink pipeline][1]
> >
> > The program is running well and producing the correct results.
> > However, when I see the matrics, I see that source started sending the
> > records way after the system has started even though my sink was
> > producing correct results since the start of the job. Here is the
> > [graph][2] of uptime of the job and numRecordsOut of the source. As
> > far as I understood, Apache Flink sources' numRecordsOut should start
> > with uptime as my sink was producing correct results since the start.
> >
> >
> >   [1]: https://i.stack.imgur.com/rZm5h.png
> > 
> >   [2]: https://i.stack.imgur.com/nlBoS.png
> > 
>
>
>


Creating a generic ARRAY_AGG aggregate function for Flink SQL

2021-08-23 Thread Yuval Itzchakov
Hi,

I'm trying to implement a generic ARRAY_AGG UDF function (identical to the
one that exists in many data WHs, e.g
https://docs.snowflake.com/en/sql-reference/functions/array_agg.html) to
utilize in Flink SQL.

Taking reference from CollectAggFunction
,
I tried using ArrayData to generate a GenericArrayData as an output type.
The problem with is I need a way to convert from the external format being
used in the UDF (e.g String, Integer) to the internal representation
required by Flink (i.e. StringData). I haven't found a straight way of
going about that.

Here is a gist of the implementation
.
Would appreciate any help on how to tackle this.

-- 
Best Regards,
Yuval Itzchakov.


Re: Using RMQ connector in pyflink

2021-08-23 Thread Nadia Mostafa
but RMQ is not listed in the supported Taple API connectors, should I
define a custom source?

On Mon, Aug 23, 2021 at 4:56 AM Caizhi Weng  wrote:

> Hi!
>
> You can first use the Table & SQL API to create a RMQ source table[1].
>
> Then you can use the to_data_stream method in TableEnvironment to change
> the table to a data stream.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/python/table/python_table_api_connectors/
>
> Nadia Mostafa  于2021年8月22日周日 下午11:57写道:
>
>> Hello,
>>
>> Is there any way to use RMQ as a data source in the DataStream python api?
>>
>> Thanks in advance
>>
>


退订

2021-08-23 Thread 牛成
退订




Flink's Kafka Offset Management

2021-08-23 Thread Pranjul Ahuja
I use FlinkKafkaConsumer to consume Kafka and enable checkpoints. Now I'm a 
little confused about the offset management and checkpoint mechanism.

What is the behavior if I stop the application by executing the yarn 
application -kill appId and run the start command like ./bin flink run ...? 
Will flink get the offset from a checkpoint or from the group-id managed by 
Kafka?

Right now, I am doing the checkpoints on HDFS but not executing the flink with 
the checkpoint directory so I am assuming that the Kafka consumer is picking 
the offsets from kafka for the particular group.id. I have also validated from 
the kafka-consumer-groups.sh tool that once the flink is restarted, it picks 
offsets from the last commit in Kafka topic.




Re: Looking for suggestions about multithreaded CEP to be used with flink

2021-08-23 Thread Schwalbe Matthias
Hi Tejas,



I had your question sit in my mind for a while before I realized I had 
something to say about it 





Although not related to CEP, we had had a very similar problem with too many 
threads/tasks in an overwhelming split-join-pattern of about 1600 concurrent 
paths.



A colleague of mine worked on this for his master's thesis [1] ... we came to 
the conclusion to

- radically reduce fine grained parallelism,  i.e.

- use it (almost) only for Flink scale out (partitioning for specific key 
attributes)

- transform our algorithms to run multiple cases sequentially instead of 
parallel

- unify multiple key domains by generalizing the key and duplicating incoming 
events per key domain together with the unified key (ideas taken from e.g. [2])

- try to unify all state primitives that work on the same key into list or map 
state primitives, and iterate on these (this works especially well for RocksDB)

- patch Flink task chaining to create longer chains and allow chaining for 
operator with multiple inputs (only mentioned in [1]) ... (= fewer 
tasks/threads)



In your specific case with the CEP rules it is probably best

- to implement the patterns yourself or integrate some external library, but

- to make the CEP rules 'data' and broadcast them into the respective operators 
(ideally a single operator only), that iterates over the rules for each 
incoming event

- for the stateless rules, I once integrated Spring Boot SpEL for a similar 
rules system (precompiled when initially loaded, rules are actually quite fast)

- for the stateful rules

  - you could either integrate some proper library (which leaves you with the 
problem of integrating intermediate state into the Flink TypeInformation 
serialization system)

  - or implement it yourself e.g. by means of a regular expressions library 
that exposes its state transition tables generated for specific regular 
expressions



This way you could use Flink for what it is excellent (low-latency 
high-throughput stream processing with consistent state over restarts/crashes 
(e.g.)) and optimize in areas that are not optimal for your use case.





[1] https://www.merlin.uzh.ch/publication/show/21108

[2] https://www.youtube.com/watch?v=tHStmoj8WbQ

[3] 
https://docs.spring.io/spring-integration/docs/5.3.0.RELEASE/reference/html/spel.html



Feel free to get back to me for further discussion (on the user list)



Thias







On 2021/08/19 23:39:18, Tejas B  wrote:

> Hi,>

> Here's our use case :>

> We are planning to build a rule based engine on top of flink with huge number 
> of rules(1000s). the rules could be stateless or stateful. >

> Example stateless rule is : A.id = 3 && A.name = 'abc' || A.color = red. >

> Example stateful rule is : A is event.id =3, B is event.name = 'abc', C is 
> event.color = red and we are looking for pattern AB*C over time window of 1 
> hour.>

>

> Now we have tried to use flink CEP for this purpose and program crashed 
> because of lot of threads. The explanation is : every application of 
> CEP.pattern creates a new operator in the graph and flink can't support that 
> many vertices in a graph.>

>

> Other approach could be to use processFunction in flink, but still to run the 
> rules on events stream you'd have to use some sort of CEP or write your own.>

>

> My question is, does anybody have any other suggestions on how to achieve 
> this ? Any other CEPs that integrate and work better with flink (siddhi, 
> jasper, drools) ? Any experience would be helpful.>

>
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

2021-08-23 Thread Zbyszko Papierski
Hi,

No, they don't - only the job is being restarted after that, without any
luck. Exception I provided is added to a exceptions list of the job itself.

On Mon, Aug 23, 2021 at 4:50 AM Caizhi Weng  wrote:

> Hi!
>
> This might be that some task managers cannot reach out to the job manager
> in time. Has any of the task manager instance restarted after this failure?
> If yes, what does the log (Flink log and kubernetes log) of the failed task
> manager say?
>
> Zbyszko Papierski  于2021年8月20日周五 下午11:07写道:
>
>> Hi!
>>
>> We're trying to successfully deploy our application to our Kubernetes
>> cluster and we seem to have hit a snag. Long story short - any kind of
>> deployment that involves a cluster of more than 1 TM seem to fail our job
>> almost immediately with this exception:
>>
>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> Sending the partition request to 'null' failed.
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>>> buffer.memory = 33554432
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>>>
>>> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>>>
>>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>> Caused by: java.nio.channels.ClosedChannelException
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>>> ... 11 more
>>
>>
>> I don't have enough experience to judge from the rest of the logs what
>> can be the reason for that, but I'm including the debug logs that I can get
>> from kubectl, both JM and TM.
>> We use a session cluster deployed on Kubernetes (not Kubernetes native,
>> there are still some issues why we can't use it) and we deploy our app to
>> that cluster. We have confirmed that everything works when there's a single
>> Task Manager, but we rather not continue with that limitation. The way we
>> define the cluster itself on k8s is shown here [1] and the chart for the
>> deployment itself can be found here [2]. App we're deploying is available
>> here [3]. We're running Flink 1.21.1 on openjdk-jre 11.
>>
>>  Since I overslept k8s revolution a bit and am somewhat new to it, I am
>> not sure which information to provide to make the situation clearer, but
>> any help is greatly appreciated!
>>
>>
>> [1]
>> https://github.com/wikimedia/operations-deployment-charts/tree/master/charts/flink-session-cluster
>> [2]
>>