Hello, We are using Flink kinesis connector for processing the streaming data from kinesis. We are running the application behind the proxy. After the proxyhost and proxyport settings, the Connector works with default publisher type(Polling) but it doesn’t work when we enable the publisher type as Enhanced fanout (EFO). We tried with different connector version but it the behaviours is same. I am wondering if the proxy settings are ignored for EFO type. I am looking forward to your feedback/recommendations.
Flink version: 1.3.5 Java version: 11 Here is the error log: 2022-01-17 18:59:20,707 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED with failure cause: org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException: Error registering stream: a367945-consumer-stream-dit at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125) at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106) at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:429) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) Suppressed: java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Network is unreachable: kinesis.us-east-1.amazonaws.com/3.227.250.203:443 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90) at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122) ... 9 more Thanks -Saravan