Re:Re: Re: flatmap returns a custom class object

2023-11-21 Thread Xuyang
Hi, Tauseef.
This error happens after the job is running, so I think at least it proves that 
there is no problem with the modified code.
Now let's focus on the new error.


The exception seems that flink can't read the metadata from kafka to get all of 
the partitions about this topic. 
Can you check if it is ok by reading the kafka source and directly printing it 
using DataStream api?



--

Best!
Xuyang




在 2023-11-22 14:45:19,"Tauseef Janvekar"  写道:

Hi Xuyang,


Taking inspiration from your code, I modified my code and got the earlier error 
resolved.
But I am getting new error


2023-11-2206:20:23,088WARN  org.apache.kafka.clients.NetworkClient  
 [] - [AdminClient 
clientId=KafkaSource--2346810451100924693-enumerator-admin-client] Connection 
to node -1 
(a231164a0b8914b278e35c2acc056079-2141023414.us-east-1.elb.amazonaws.com/52.70.204.171:9092)
 terminated during authentication. This may happen due to anyof the following 
reasons: (1) Authentication failed due to invalid credentials with brokers 
older than 1.0.0, (2) Firewall blocking KafkaTLS traffic (eg it may only allow 
HTTPS traffic), (3) Transient network issue.
2023-11-2206:20:23,477INFO  
org.apache.kafka.clients.admin.internals.AdminMetadataManager [] - [AdminClient 
clientId=KafkaSource--2346810451100924693-enumerator-admin-client] 
Metadataupdate failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: fetchMetadata
2023-11-2206:20:23,486ERROR 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] - 
Exceptionwhile handling result from async call inSourceCoordinator-Source: 
otel_source. Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic 
partitions due to
at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$1(ExecutorNotifier.java:83)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 [flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Causedby: java.lang.RuntimeException: Failed to get metadata for topics [aiops].
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
 ~[flink-dist-1.17.1.jar:1.17.1]
... 6 more
Causedby: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: describeTopics
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) 
~[?:?]
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
 
~[blob_p-a87f710782171c599ad9617579d8e47bf231c4f3-979cc0c7358de7a23be2eb86078ee626:?]
at 

Re:Re: flatmap returns a custom class object

2023-11-21 Thread Xuyang
Hi, Tauseef.



I modify you code and the following can work. Can you try that?


```
static class C1 {
Metrics metrics;


public C1(Metrics metrics) {
this.metrics = metrics;
}


public Metrics getMetrics() {
return metrics;
}
}


static class Metrics {
List values;


public Metrics(List values) {
this.values = values;
}


public List getValues() {
return values;
}
}


static class C2 {
String value;


public C2(String value) {
this.value = value;
}
}


public static DataStream defineWorkflow(DataStream opentelSource) {


DataStream ret =
opentelSource
.map(input -> input.getMetrics().getValues())
.returns(new TypeHint>() {}.getTypeInfo())
.flatMap(
(FlatMapFunction, C2>)
(list, collector) -> {
for (String metric : list) {


collector.collect(
new C2(
metric
+ "_"
+ 
UUID.randomUUID()

.toString()));
}
})
.returns(C2.class);


return ret;
}
```






[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap



--

Best!
Xuyang










--

Best!
Xuyang




在 2023-11-21 23:34:37,"Tauseef Janvekar"  写道:

Hi Xuyang,


I read the post that you shared but unfortunately it does not use custom POJO 
but uses String.
Also it does not use flatmap.returns method.


My flink version is 1.17.1


My code is as below

publicstatic DataStream 
defineWorkflow(DataStream opentelSource) {

DataStream ret = opentelSource

.map(input -> input.getResourceMetrics())

.flatMap((list,collector) -> {

for(ResourceMetrics metric : list) {

String containerName = "";

String podName = "";

String hostName = "";

String namespaceName = "";

String cluster = "nyquist";

String tenant = "opis";

for(Attributes attr : metric.getResource().getAttributes()) {

if(attr.getKey().equalsIgnoreCase("k8s.container.name")) {

containerName = attr.getValue().getStringValue();

} elseif (attr.getKey().equalsIgnoreCase("k8s.pod.name")) {

podName = attr.getValue().getStringValue();

} elseif (attr.getKey().equalsIgnoreCase("k8s.namespace.name")) {

namespaceName = attr.getValue().getStringValue();

} elseif (attr.getKey().equalsIgnoreCase("k8s.node.name")) {

hostName = attr.getValue().getStringValue();

}

}

String unqKey = containerName 
+"_"+podName+"_"+hostName+"_"+namespaceName+"_"+cluster+tenant;

for(ScopeMetrics smtr:metric.getScopeMetrics()) {

for(Metrics mtr : smtr.getMetrics()) {

GenericPipelineDTO dto = new GenericPipelineDTO();

dto.setUniqueKey(unqKey);

try {

dto.setGeneratedTime(mtr.getGauge().getDataPoints().get(0).getTimeUnixNano());

} catch( Exception e) {

dto.setGeneratedTime(mtr.getSum().getDataPoints().get(0).getTimeUnixNano());

}

dto.setMetricName(mtr.getName());

dto.setMetricDescription(mtr.getDescription());

dto.setMetricUnit(mtr.getUnit());

try {

dto.setMetricValue(

mtr.getGauge().getDataPoints().get(0).getAsDouble() +

Integer.parseInt(mtr.getGauge().getDataPoints().get(0).getAsInt())

);

} catch( Exception e) {

dto.setMetricValue(

mtr.getSum().getDataPoints().get(0).getAsDouble() +

Integer.parseInt(mtr.getSum().getDataPoints().get(0).getAsInt())

);

}

dto.setUuid(UUID.fromString(unqKey+mtr.getName()+dto.getGeneratedTime()));

collector.collect(dto);

}

}

}

}).returns();//What to put here ?

returnret;

}









On Tue, 21 Nov 2023 at 20:57, Xuyang  wrote:

Hi, Tauseef.
This is an example to use custom POJO with flatmap[1].  If possible, can you 
post your code and tag the flink version?


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap







--

Best!
Xuyang




At 2023-11-21 22:48:41, "Tauseef Janvekar"  wrote:

Dear Team,



I am getting the following error while using flatMap.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: The return type of function 
'defineWorkflow(OtelTransformerJob.java:75)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.



On further investigation I found that flatMap should end with 
.returns(Types.STRING) when the return type is a STRING.


My question is if my return type is a custom 

flatmap returns a custom class object

2023-11-21 Thread Tauseef Janvekar
Dear Team,

I am getting the following error while using flatMap.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: The return type of function
'defineWorkflow(OtelTransformerJob.java:75)' could not be determined
automatically, due to type erasure. You can give type information hints by
using the returns(...) method on the result of the transformation call, or
by letting your function implement the 'ResultTypeQueryable' interface.

On further investigation I found that flatMap should end with
.returns(Types.STRING) when the return type is a STRING.

My question is if my return type is a custom DTO named mypckg.MyCustomDTO
then how do I pass parameters to returns method.

Thanks,
Tauseef.