Hi, Tauseef.
This error happens after the job is running, so I think at least it proves that 
there is no problem with the modified code.
Now let's focus on the new error.


The exception seems that flink can't read the metadata from kafka to get all of 
the partitions about this topic. 
Can you check if it is ok by reading the kafka source and directly printing it 
using DataStream api?



--

    Best!
    Xuyang




在 2023-11-22 14:45:19,"Tauseef Janvekar" <tauseefjanve...@gmail.com> 写道:

Hi Xuyang,


Taking inspiration from your code, I modified my code and got the earlier error 
resolved.
But I am getting new error


2023-11-2206:20:23,088WARN  org.apache.kafka.clients.NetworkClient              
         [] - [AdminClient 
clientId=KafkaSource--2346810451100924693-enumerator-admin-client] Connection 
to node -1 
(a231164a0b8914b278e35c2acc056079-2141023414.us-east-1.elb.amazonaws.com/52.70.204.171:9092)
 terminated during authentication. This may happen due to anyof the following 
reasons: (1) Authentication failed due to invalid credentials with brokers 
older than 1.0.0, (2) Firewall blocking KafkaTLS traffic (eg it may only allow 
HTTPS traffic), (3) Transient network issue.
2023-11-2206:20:23,477INFO  
org.apache.kafka.clients.admin.internals.AdminMetadataManager [] - [AdminClient 
clientId=KafkaSource--2346810451100924693-enumerator-admin-client] 
Metadataupdate failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: fetchMetadata
2023-11-2206:20:23,486ERROR 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] - 
Exceptionwhile handling result from async call inSourceCoordinator-Source: 
otel_source. Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic 
partitions due to
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
    at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$1(ExecutorNotifier.java:83)
 ~[flink-dist-1.17.1.jar:1.17.1]
    at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 [flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Causedby: java.lang.RuntimeException: Failed to get metadata for topics [aiops].
    at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
    at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
 ~[flink-dist-1.17.1.jar:1.17.1]
    ... 6 more
Causedby: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: describeTopics
    at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) 
~[?:?]
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
    at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
 ~[flink-dist-1.17.1.jar:1.17.1]
    ... 6 more
Causedby: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
for a node assignment. Call: describeTopics
2023-11-2206:20:23,499INFO  org.apache.flink.runtime.jobmaster.JobMaster        
         [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered 
byOperatorCoordinatorfor'Source: otel_source -> Map -> Flat Map -> metric_sink: 
Writer -> metric_sink: Committer' (operator cbc357ccb763df2852fee8c4fc7d55f2).
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600)
 ~[flink-dist-1.17.1.jar:1.17.1]
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
 ~[flink-dist-1.17.1.jar:1.17.1]
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374)
 ~[flink-dist-1.17.1.jar:1.17.1]
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:387)
 ~[flink-dist-1.17.1.jar:1.17.1]
    at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
 ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 ~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
Causedby: org.apache.flink.util.FlinkRuntimeException: Failed to list 
subscribed topic partitions due to
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
 ~[?:?]
    at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$1(ExecutorNotifier.java:83)
 ~[flink-dist-1.17.1.jar:1.17.1]
    at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 ~[flink-dist-1.17.1.jar:1.17.1]
    ... 6 more
Causedby: java.lang.RuntimeException: Failed to get metadata for topics [aiops].
    at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
 ~[?:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
 ~[?:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
 ~[?:?]
    at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
 ~[flink-dist-1.17.1.jar:1.17.1]
    ... 6 more
Causedby: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: describeTopics
    at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) 
~[?:?]
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) 
~[?:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
 ~[?:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
 ~[?:?]
    at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
 ~[?:?]
    at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
 ~[flink-dist-1.17.1.jar:1.17.1]
    ... 6 more


Thanks,
Tauseef


On Wed, 22 Nov 2023 at 11:08, Tauseef Janvekar <tauseefjanve...@gmail.com> 
wrote:

Hi Xuyang,


The modified code does not work for us because there is no String metric. It is 
list within list within list. and at every level there is an object rather than 
String.


I just wanted to understand how to pass POJO type to returns. I tried 
".returns(GenericPipelineDTO.class) and it did not work.


Thanks,
Tauseef


On Wed, 22 Nov 2023 at 09:03, Xuyang <xyzhong...@163.com> wrote:


Hi, Tauseef.



I modify you code and the following can work. Can you try that?


```
static class C1 {
    Metrics metrics;


    public C1(Metrics metrics) {
        this.metrics = metrics;
    }


    public Metrics getMetrics() {
        return metrics;
    }
}


static class Metrics {
    List<String> values;


    public Metrics(List<String> values) {
        this.values = values;
    }


    public List<String> getValues() {
        return values;
    }
}


static class C2 {
    String value;


    public C2(String value) {
        this.value = value;
    }
}


public static DataStream<C2> defineWorkflow(DataStream<C1> opentelSource) {


    DataStream<C2> ret =
            opentelSource
                    .map(input -> input.getMetrics().getValues())
                    .returns(new TypeHint<List<String>>() {}.getTypeInfo())
                    .flatMap(
                            (FlatMapFunction<List<String>, C2>)
                                    (list, collector) -> {
                                        for (String metric : list) {


                                            collector.collect(
                                                    new C2(
                                                            metric
                                                                    + "_"
                                                                    + 
UUID.randomUUID()
                                                                            
.toString()));
                                        }
                                    })
                    .returns(C2.class);


    return ret;
}
```






[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap



--

    Best!
    Xuyang










--

    Best!
    Xuyang




在 2023-11-21 23:34:37,"Tauseef Janvekar" <tauseefjanve...@gmail.com> 写道:

Hi Xuyang,


I read the post that you shared but unfortunately it does not use custom POJO 
but uses String.
Also it does not use flatmap.returns method.


My flink version is 1.17.1


My code is as below

publicstatic DataStream<GenericPipelineDTO> 
defineWorkflow(DataStream<OpenTelDTO> opentelSource) {

DataStream<GenericPipelineDTO> ret = opentelSource

.map(input -> input.getResourceMetrics())

.flatMap((list,collector) -> {

for(ResourceMetrics metric : list) {

String containerName = "";

String podName = "";

String hostName = "";

String namespaceName = "";

String cluster = "nyquist";

String tenant = "opis";

for(Attributes attr : metric.getResource().getAttributes()) {

if(attr.getKey().equalsIgnoreCase("k8s.container.name")) {

containerName = attr.getValue().getStringValue();

} elseif (attr.getKey().equalsIgnoreCase("k8s.pod.name")) {

podName = attr.getValue().getStringValue();

} elseif (attr.getKey().equalsIgnoreCase("k8s.namespace.name")) {

namespaceName = attr.getValue().getStringValue();

} elseif (attr.getKey().equalsIgnoreCase("k8s.node.name")) {

hostName = attr.getValue().getStringValue();

}

}

String unqKey = containerName 
+"_"+podName+"_"+hostName+"_"+namespaceName+"_"+cluster+tenant;

for(ScopeMetrics smtr:metric.getScopeMetrics()) {

for(Metrics mtr : smtr.getMetrics()) {

GenericPipelineDTO dto = new GenericPipelineDTO();

dto.setUniqueKey(unqKey);

try {

dto.setGeneratedTime(mtr.getGauge().getDataPoints().get(0).getTimeUnixNano());

} catch( Exception e) {

dto.setGeneratedTime(mtr.getSum().getDataPoints().get(0).getTimeUnixNano());

}

dto.setMetricName(mtr.getName());

dto.setMetricDescription(mtr.getDescription());

dto.setMetricUnit(mtr.getUnit());

try {

dto.setMetricValue(

mtr.getGauge().getDataPoints().get(0).getAsDouble() +

Integer.parseInt(mtr.getGauge().getDataPoints().get(0).getAsInt())

);

} catch( Exception e) {

dto.setMetricValue(

mtr.getSum().getDataPoints().get(0).getAsDouble() +

Integer.parseInt(mtr.getSum().getDataPoints().get(0).getAsInt())

);

}

dto.setUuid(UUID.fromString(unqKey+mtr.getName()+dto.getGeneratedTime()));

collector.collect(dto);

}

}

}

}).returns(????????);//What to put here ?

returnret;

}









On Tue, 21 Nov 2023 at 20:57, Xuyang <xyzhong...@163.com> wrote:

Hi, Tauseef.
This is an example to use custom POJO with flatmap[1].  If possible, can you 
post your code and tag the flink version?


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap







--

    Best!
    Xuyang




At 2023-11-21 22:48:41, "Tauseef Janvekar" <tauseefjanve...@gmail.com> wrote:

Dear Team,



I am getting the following error while using flatMap.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: The return type of function 
'defineWorkflow(OtelTransformerJob.java:75)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.



On further investigation I found that flatMap should end with 
.returns(Types.STRING) when the return type is a STRING.


My question is if my return type is a custom DTO named mypckg.MyCustomDTO then 
how do I pass parameters to returns method.


Thanks,
Tauseef.

Reply via email to