Hi, Tauseef. This error happens after the job is running, so I think at least it proves that there is no problem with the modified code. Now let's focus on the new error.
The exception seems that flink can't read the metadata from kafka to get all of the partitions about this topic. Can you check if it is ok by reading the kafka source and directly printing it using DataStream api? -- Best! Xuyang 在 2023-11-22 14:45:19,"Tauseef Janvekar" <tauseefjanve...@gmail.com> 写道: Hi Xuyang, Taking inspiration from your code, I modified my code and got the earlier error resolved. But I am getting new error 2023-11-2206:20:23,088WARN org.apache.kafka.clients.NetworkClient [] - [AdminClient clientId=KafkaSource--2346810451100924693-enumerator-admin-client] Connection to node -1 (a231164a0b8914b278e35c2acc056079-2141023414.us-east-1.elb.amazonaws.com/52.70.204.171:9092) terminated during authentication. This may happen due to anyof the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking KafkaTLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. 2023-11-2206:20:23,477INFO org.apache.kafka.clients.admin.internals.AdminMetadataManager [] - [AdminClient clientId=KafkaSource--2346810451100924693-enumerator-admin-client] Metadataupdate failed org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata 2023-11-2206:20:23,486ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] - Exceptionwhile handling result from async call inSourceCoordinator-Source: otel_source. Triggering job failover. org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234) ~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$1(ExecutorNotifier.java:83) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-dist-1.17.1.jar:1.17.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Causedby: java.lang.RuntimeException: Failed to get metadata for topics [aiops]. at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47) ~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?] at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) ~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?] at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219) ~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-dist-1.17.1.jar:1.17.1] ... 6 more Causedby: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?] at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) ~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?] at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44) ~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?] at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) ~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?] at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219) ~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-dist-1.17.1.jar:1.17.1] ... 6 more Causedby: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics 2023-11-2206:20:23,499INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure. org.apache.flink.util.FlinkException: Global failure triggered byOperatorCoordinatorfor'Source: otel_source -> Map -> Flat Map -> metric_sink: Writer -> metric_sink: Committer' (operator cbc357ccb763df2852fee8c4fc7d55f2). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:387) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) ~[flink-dist-1.17.1.jar:1.17.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:829) ~[?:?] Causedby: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234) ~[?:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$1(ExecutorNotifier.java:83) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ~[flink-dist-1.17.1.jar:1.17.1] ... 6 more Causedby: java.lang.RuntimeException: Failed to get metadata for topics [aiops]. at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47) ~[?:?] at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) ~[?:?] at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219) ~[?:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-dist-1.17.1.jar:1.17.1] ... 6 more Causedby: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?] at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) ~[?:?] at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44) ~[?:?] at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) ~[?:?] at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219) ~[?:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-dist-1.17.1.jar:1.17.1] ... 6 more Thanks, Tauseef On Wed, 22 Nov 2023 at 11:08, Tauseef Janvekar <tauseefjanve...@gmail.com> wrote: Hi Xuyang, The modified code does not work for us because there is no String metric. It is list within list within list. and at every level there is an object rather than String. I just wanted to understand how to pass POJO type to returns. I tried ".returns(GenericPipelineDTO.class) and it did not work. Thanks, Tauseef On Wed, 22 Nov 2023 at 09:03, Xuyang <xyzhong...@163.com> wrote: Hi, Tauseef. I modify you code and the following can work. Can you try that? ``` static class C1 { Metrics metrics; public C1(Metrics metrics) { this.metrics = metrics; } public Metrics getMetrics() { return metrics; } } static class Metrics { List<String> values; public Metrics(List<String> values) { this.values = values; } public List<String> getValues() { return values; } } static class C2 { String value; public C2(String value) { this.value = value; } } public static DataStream<C2> defineWorkflow(DataStream<C1> opentelSource) { DataStream<C2> ret = opentelSource .map(input -> input.getMetrics().getValues()) .returns(new TypeHint<List<String>>() {}.getTypeInfo()) .flatMap( (FlatMapFunction<List<String>, C2>) (list, collector) -> { for (String metric : list) { collector.collect( new C2( metric + "_" + UUID.randomUUID() .toString())); } }) .returns(C2.class); return ret; } ``` [1] https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap -- Best! Xuyang -- Best! Xuyang 在 2023-11-21 23:34:37,"Tauseef Janvekar" <tauseefjanve...@gmail.com> 写道: Hi Xuyang, I read the post that you shared but unfortunately it does not use custom POJO but uses String. Also it does not use flatmap.returns method. My flink version is 1.17.1 My code is as below publicstatic DataStream<GenericPipelineDTO> defineWorkflow(DataStream<OpenTelDTO> opentelSource) { DataStream<GenericPipelineDTO> ret = opentelSource .map(input -> input.getResourceMetrics()) .flatMap((list,collector) -> { for(ResourceMetrics metric : list) { String containerName = ""; String podName = ""; String hostName = ""; String namespaceName = ""; String cluster = "nyquist"; String tenant = "opis"; for(Attributes attr : metric.getResource().getAttributes()) { if(attr.getKey().equalsIgnoreCase("k8s.container.name")) { containerName = attr.getValue().getStringValue(); } elseif (attr.getKey().equalsIgnoreCase("k8s.pod.name")) { podName = attr.getValue().getStringValue(); } elseif (attr.getKey().equalsIgnoreCase("k8s.namespace.name")) { namespaceName = attr.getValue().getStringValue(); } elseif (attr.getKey().equalsIgnoreCase("k8s.node.name")) { hostName = attr.getValue().getStringValue(); } } String unqKey = containerName +"_"+podName+"_"+hostName+"_"+namespaceName+"_"+cluster+tenant; for(ScopeMetrics smtr:metric.getScopeMetrics()) { for(Metrics mtr : smtr.getMetrics()) { GenericPipelineDTO dto = new GenericPipelineDTO(); dto.setUniqueKey(unqKey); try { dto.setGeneratedTime(mtr.getGauge().getDataPoints().get(0).getTimeUnixNano()); } catch( Exception e) { dto.setGeneratedTime(mtr.getSum().getDataPoints().get(0).getTimeUnixNano()); } dto.setMetricName(mtr.getName()); dto.setMetricDescription(mtr.getDescription()); dto.setMetricUnit(mtr.getUnit()); try { dto.setMetricValue( mtr.getGauge().getDataPoints().get(0).getAsDouble() + Integer.parseInt(mtr.getGauge().getDataPoints().get(0).getAsInt()) ); } catch( Exception e) { dto.setMetricValue( mtr.getSum().getDataPoints().get(0).getAsDouble() + Integer.parseInt(mtr.getSum().getDataPoints().get(0).getAsInt()) ); } dto.setUuid(UUID.fromString(unqKey+mtr.getName()+dto.getGeneratedTime())); collector.collect(dto); } } } }).returns(????????);//What to put here ? returnret; } On Tue, 21 Nov 2023 at 20:57, Xuyang <xyzhong...@163.com> wrote: Hi, Tauseef. This is an example to use custom POJO with flatmap[1]. If possible, can you post your code and tag the flink version? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap -- Best! Xuyang At 2023-11-21 22:48:41, "Tauseef Janvekar" <tauseefjanve...@gmail.com> wrote: Dear Team, I am getting the following error while using flatMap. Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: The return type of function 'defineWorkflow(OtelTransformerJob.java:75)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. On further investigation I found that flatMap should end with .returns(Types.STRING) when the return type is a STRING. My question is if my return type is a custom DTO named mypckg.MyCustomDTO then how do I pass parameters to returns method. Thanks, Tauseef.