Hi, Tauseef.
This error happens after the job is running, so I think at least it proves that
there is no problem with the modified code.
Now let's focus on the new error.
The exception seems that flink can't read the metadata from kafka to get all of
the partitions about this topic.
Can you check if it is ok by reading the kafka source and directly printing it
using DataStream api?
--
Best!
Xuyang
在 2023-11-22 14:45:19,"Tauseef Janvekar" <[email protected]> 写道:
Hi Xuyang,
Taking inspiration from your code, I modified my code and got the earlier error
resolved.
But I am getting new error
2023-11-2206:20:23,088WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient
clientId=KafkaSource--2346810451100924693-enumerator-admin-client] Connection
to node -1
(a231164a0b8914b278e35c2acc056079-2141023414.us-east-1.elb.amazonaws.com/52.70.204.171:9092)
terminated during authentication. This may happen due to anyof the following
reasons: (1) Authentication failed due to invalid credentials with brokers
older than 1.0.0, (2) Firewall blocking KafkaTLS traffic (eg it may only allow
HTTPS traffic), (3) Transient network issue.
2023-11-2206:20:23,477INFO
org.apache.kafka.clients.admin.internals.AdminMetadataManager [] - [AdminClient
clientId=KafkaSource--2346810451100924693-enumerator-admin-client]
Metadataupdate failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node
assignment. Call: fetchMetadata
2023-11-2206:20:23,486ERROR
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] -
Exceptionwhile handling result from async call inSourceCoordinator-Source:
otel_source. Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic
partitions due to
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$1(ExecutorNotifier.java:83)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Causedby: java.lang.RuntimeException: Failed to get metadata for topics [aiops].
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
~[flink-dist-1.17.1.jar:1.17.1]
... 6 more
Causedby: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node
assignment. Call: describeTopics
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
~[?:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
~[flink-dist-1.17.1.jar:1.17.1]
... 6 more
Causedby: org.apache.kafka.common.errors.TimeoutException: Timed out waiting
for a node assignment. Call: describeTopics
2023-11-2206:20:23,499INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered
byOperatorCoordinatorfor'Source: otel_source -> Map -> Flat Map -> metric_sink:
Writer -> metric_sink: Committer' (operator cbc357ccb763df2852fee8c4fc7d55f2).
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:387)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Causedby: org.apache.flink.util.FlinkRuntimeException: Failed to list
subscribed topic partitions due to
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
~[?:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$1(ExecutorNotifier.java:83)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
~[flink-dist-1.17.1.jar:1.17.1]
... 6 more
Causedby: java.lang.RuntimeException: Failed to get metadata for topics [aiops].
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
~[?:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
~[?:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
~[?:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
~[flink-dist-1.17.1.jar:1.17.1]
... 6 more
Causedby: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node
assignment. Call: describeTopics
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
~[?:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
~[?:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
~[?:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
~[?:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
~[?:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
~[flink-dist-1.17.1.jar:1.17.1]
... 6 more
Thanks,
Tauseef
On Wed, 22 Nov 2023 at 11:08, Tauseef Janvekar <[email protected]>
wrote:
Hi Xuyang,
The modified code does not work for us because there is no String metric. It is
list within list within list. and at every level there is an object rather than
String.
I just wanted to understand how to pass POJO type to returns. I tried
".returns(GenericPipelineDTO.class) and it did not work.
Thanks,
Tauseef
On Wed, 22 Nov 2023 at 09:03, Xuyang <[email protected]> wrote:
Hi, Tauseef.
I modify you code and the following can work. Can you try that?
```
static class C1 {
Metrics metrics;
public C1(Metrics metrics) {
this.metrics = metrics;
}
public Metrics getMetrics() {
return metrics;
}
}
static class Metrics {
List<String> values;
public Metrics(List<String> values) {
this.values = values;
}
public List<String> getValues() {
return values;
}
}
static class C2 {
String value;
public C2(String value) {
this.value = value;
}
}
public static DataStream<C2> defineWorkflow(DataStream<C1> opentelSource) {
DataStream<C2> ret =
opentelSource
.map(input -> input.getMetrics().getValues())
.returns(new TypeHint<List<String>>() {}.getTypeInfo())
.flatMap(
(FlatMapFunction<List<String>, C2>)
(list, collector) -> {
for (String metric : list) {
collector.collect(
new C2(
metric
+ "_"
+
UUID.randomUUID()
.toString()));
}
})
.returns(C2.class);
return ret;
}
```
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap
--
Best!
Xuyang
--
Best!
Xuyang
在 2023-11-21 23:34:37,"Tauseef Janvekar" <[email protected]> 写道:
Hi Xuyang,
I read the post that you shared but unfortunately it does not use custom POJO
but uses String.
Also it does not use flatmap.returns method.
My flink version is 1.17.1
My code is as below
publicstatic DataStream<GenericPipelineDTO>
defineWorkflow(DataStream<OpenTelDTO> opentelSource) {
DataStream<GenericPipelineDTO> ret = opentelSource
.map(input -> input.getResourceMetrics())
.flatMap((list,collector) -> {
for(ResourceMetrics metric : list) {
String containerName = "";
String podName = "";
String hostName = "";
String namespaceName = "";
String cluster = "nyquist";
String tenant = "opis";
for(Attributes attr : metric.getResource().getAttributes()) {
if(attr.getKey().equalsIgnoreCase("k8s.container.name")) {
containerName = attr.getValue().getStringValue();
} elseif (attr.getKey().equalsIgnoreCase("k8s.pod.name")) {
podName = attr.getValue().getStringValue();
} elseif (attr.getKey().equalsIgnoreCase("k8s.namespace.name")) {
namespaceName = attr.getValue().getStringValue();
} elseif (attr.getKey().equalsIgnoreCase("k8s.node.name")) {
hostName = attr.getValue().getStringValue();
}
}
String unqKey = containerName
+"_"+podName+"_"+hostName+"_"+namespaceName+"_"+cluster+tenant;
for(ScopeMetrics smtr:metric.getScopeMetrics()) {
for(Metrics mtr : smtr.getMetrics()) {
GenericPipelineDTO dto = new GenericPipelineDTO();
dto.setUniqueKey(unqKey);
try {
dto.setGeneratedTime(mtr.getGauge().getDataPoints().get(0).getTimeUnixNano());
} catch( Exception e) {
dto.setGeneratedTime(mtr.getSum().getDataPoints().get(0).getTimeUnixNano());
}
dto.setMetricName(mtr.getName());
dto.setMetricDescription(mtr.getDescription());
dto.setMetricUnit(mtr.getUnit());
try {
dto.setMetricValue(
mtr.getGauge().getDataPoints().get(0).getAsDouble() +
Integer.parseInt(mtr.getGauge().getDataPoints().get(0).getAsInt())
);
} catch( Exception e) {
dto.setMetricValue(
mtr.getSum().getDataPoints().get(0).getAsDouble() +
Integer.parseInt(mtr.getSum().getDataPoints().get(0).getAsInt())
);
}
dto.setUuid(UUID.fromString(unqKey+mtr.getName()+dto.getGeneratedTime()));
collector.collect(dto);
}
}
}
}).returns(????????);//What to put here ?
returnret;
}
On Tue, 21 Nov 2023 at 20:57, Xuyang <[email protected]> wrote:
Hi, Tauseef.
This is an example to use custom POJO with flatmap[1]. If possible, can you
post your code and tag the flink version?
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap
--
Best!
Xuyang
At 2023-11-21 22:48:41, "Tauseef Janvekar" <[email protected]> wrote:
Dear Team,
I am getting the following error while using flatMap.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: The return type of function
'defineWorkflow(OtelTransformerJob.java:75)' could not be determined
automatically, due to type erasure. You can give type information hints by
using the returns(...) method on the result of the transformation call, or by
letting your function implement the 'ResultTypeQueryable' interface.
On further investigation I found that flatMap should end with
.returns(Types.STRING) when the return type is a STRING.
My question is if my return type is a custom DTO named mypckg.MyCustomDTO then
how do I pass parameters to returns method.
Thanks,
Tauseef.