Hey Jonas,

May I ask what version of Kinesalite you're targeting? With 3.3.3 and
STREAM_INITIAL_POSITION = "LATEST", I received a "The timestampInMillis
parameter cannot be greater than the currentTimestampInMillis" which may
be a misconfiguration on my setup, but with STREAM_INITIAL_POSITION =
"TRIM_HORIZON" I was able to consume events from the stream.

This was with 1.14.0 of the Kinesis Flink connector.

Kind regards,
Mika


On 02.12.2021 23:05, jonas eyob wrote:
Hi all, I have a really simple pipeline to consume events from a local
kinesis (kinesalite) and print them out to stdout. But struggling to make
sense of why it's failing almost immediately

The pipeline code:

/* Added this to verify it wasn't a problem with AWS CBOR which needs
to be disabled */
System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true")
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true")
System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking",
"true")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val consumerConfig = new Properties()

consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")
consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"FAKE_ACCESS_KEY")
consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"FAKE_SECRET_ACCESS_KEY")
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST")
consumerConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT,
"http://localhost:4567";)

env
 .addSource(
   new FlinkKinesisConsumer[String](
     "user-profile-events-local",
     new SimpleStringSchema,
     consumerConfig
   )
 )
 .print()

env.execute("echo stream")

When running this I am getting this:

Error I get from running this locally:

22:27:23.372 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
switched from INITIALIZING to RUNNING.
Dec 02, 2021 10:27:23 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:23 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:23 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient
createSocketFactoryRegistry
WARNING: SSL Certificate checking for endpoints has been explicitly
disabled.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
Dec 02, 2021 10:27:24 PM
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
loadProfiles
WARNING: Your profile name includes a 'profile ' prefix. This is considered
part of the profile name in the Java SDK, so you will need to include this
prefix in your profile name when you reference this profile from your Java
code.
22:27:24.328 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
WARN  org.apache.flink.runtime.taskmanager.Task - Source: Custom Source ->
Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264) switched
from RUNNING to FAILED with failure cause:
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
null (Service: AmazonKinesis; Status Code: 400; Error Code:
UnknownOperationException; Request ID:
05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
Suppressed: java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
at
org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
22:27:24.329 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources
for Source: Custom Source -> Sink: Print to Std. Out (1/1)#0
(7e920c6918655278fbd09e7658847264).
22:27:24.341 [flink-akka.actor.default-dispatcher-9] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
and sending final execution state FAILED to JobManager for task Source:
Custom Source -> Sink: Print to Std. Out (1/1)#0
7e920c6918655278fbd09e7658847264.
22:27:24.359 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
switched from RUNNING to FAILED on 948a82f7-e5e2-4e5f-b309-49ee92eb2006 @
localhost (dataPort=-1).
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
null (Service: AmazonKinesis; Status Code: 400; Error Code:
UnknownOperationException; Request ID:
05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
Suppressed: java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
~[flink-core-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
~[flink-runtime-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-runtime-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
~[flink-runtime-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-runtime-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-runtime-1.14.0.jar:1.14.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

--
*Med Vänliga Hälsningar*
*Jonas Eyob*

Mika Naylor
https://autophagy.io

Reply via email to