Hello,
We are using Flink kinesis connector for processing the streaming data from 
kinesis. We are running the application behind the proxy. After the proxyhost 
and proxyport settings, the Connector works with default publisher 
type(Polling) but it doesn’t work when we enable the publisher type as Enhanced 
fanout (EFO). We tried with different connector version but it the behaviours 
is same. I am wondering if the proxy settings are ignored for EFO type. I am 
looking forward to your feedback/recommendations.

Flink version: 1.3.5
Java version: 11

Here is the error log:

2022-01-17 18:59:20,707 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 
(fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED with failure 
cause: 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException:
 Error registering stream: a367945-consumer-stream-dit

        at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125)

        at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106)

        at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:429)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365)

        at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536)

        at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)

        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)

        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)

        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

        Suppressed: java.lang.NullPointerException

                at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)

                at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756)

                at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662)

                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)

                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkClientException:
 Unable to execute HTTP request: Network is unreachable: 
kinesis.us-east-1.amazonaws.com/3.227.250.203:443

        at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)

        at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)

        at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101)

        at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191)

        at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100)

        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90)

        at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122)

        ... 9 more



Thanks
-Saravan

Reply via email to