Hey Mika,

Were using kinesalite 1.11.5

Yeah, after bumping it to 3.3.3 the first issue which was related to not
being able to list shards disappeared and instead I'm seeing the same issue
you are mentioning

"The timestampInMillis parameter cannot be greater than the
currentTimestampInMillis"

Found a discussion on the same topic:
https://github.com/awslabs/amazon-kinesis-connector-flink/issues/13
It appears to be an issue with aws sdk and likely due to setting AWS_CBOR
to disabled (required when running locally).

Specifically, when using AT_TIMESTAMP (which LATEST is translated to
https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L310-L331)
it appears the Flink job is requesting the ShardIterator with a timestamp
in ms rather than seconds.

This raises the problem mentioned in kinesalite:
https://github.com/mhart/kinesalite/blob/4019d70a135226f33f1cdec4091f4391e631d2c9/actions/getShardIterator.js#L79-L89

Perhaps using TRIM_HORIZON locally will be the workaround for now - doesn't
appear a fix is in sight


Den fre 3 dec. 2021 kl 12:47 skrev Mika Naylor <m...@autophagy.io>:

> Hey Jonas,
>
> May I ask what version of Kinesalite you're targeting? With 3.3.3 and
> STREAM_INITIAL_POSITION = "LATEST", I received a "The timestampInMillis
> parameter cannot be greater than the currentTimestampInMillis" which may
> be a misconfiguration on my setup, but with STREAM_INITIAL_POSITION =
> "TRIM_HORIZON" I was able to consume events from the stream.
>
> This was with 1.14.0 of the Kinesis Flink connector.
>
> Kind regards,
> Mika
>
>
> On 02.12.2021 23:05, jonas eyob wrote:
> >Hi all, I have a really simple pipeline to consume events from a local
> >kinesis (kinesalite) and print them out to stdout. But struggling to make
> >sense of why it's failing almost immediately
> >
> >The pipeline code:
> >
> >/* Added this to verify it wasn't a problem with AWS CBOR which needs
> >to be disabled */
>
> >System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
> >"true")
>
> >System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
> >"true")
>
> >System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking",
> >"true")
> >
> >val env = StreamExecutionEnvironment.getExecutionEnvironment
> >env.setParallelism(1)
> >
> >val consumerConfig = new Properties()
> >
> >consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
> "AUTO")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
> >"FAKE_ACCESS_KEY")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
> >"FAKE_SECRET_ACCESS_KEY")
>
> >consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> >"LATEST")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT,
> >"http://localhost:4567";)
> >
> >env
> >  .addSource(
> >    new FlinkKinesisConsumer[String](
> >      "user-profile-events-local",
> >      new SimpleStringSchema,
> >      consumerConfig
> >    )
> >  )
> >  .print()
> >
> >env.execute("echo stream")
> >
> >When running this I am getting this:
> >
> >Error I get from running this locally:
> >
> >22:27:23.372 [flink-akka.actor.default-dispatcher-5] INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
> >Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
> >switched from INITIALIZING to RUNNING.
> >Dec 02, 2021 10:27:23 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:23 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:23 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:24 PM
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient
> >createSocketFactoryRegistry
> >WARNING: SSL Certificate checking for endpoints has been explicitly
> >disabled.
> >Dec 02, 2021 10:27:24 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:24 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:24 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >22:27:24.328 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
> >WARN  org.apache.flink.runtime.taskmanager.Task - Source: Custom Source ->
> >Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264)
> switched
> >from RUNNING to FAILED with failure cause:
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
> >null (Service: AmazonKinesis; Status Code: 400; Error Code:
> >UnknownOperationException; Request ID:
> >05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
> >Suppressed: java.lang.NullPointerException
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
> >at
>
> >org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> >at
>
> >org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
> >at
>
> >org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
> >at
>
> >org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> >at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> >at java.lang.Thread.run(Thread.java:748)
> >22:27:24.329 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0]
> >INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources
> >for Source: Custom Source -> Sink: Print to Std. Out (1/1)#0
> >(7e920c6918655278fbd09e7658847264).
> >22:27:24.341 [flink-akka.actor.default-dispatcher-9] INFO
> > org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
> >and sending final execution state FAILED to JobManager for task Source:
> >Custom Source -> Sink: Print to Std. Out (1/1)#0
> >7e920c6918655278fbd09e7658847264.
> >22:27:24.359 [flink-akka.actor.default-dispatcher-5] INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
> >Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
> >switched from RUNNING to FAILED on 948a82f7-e5e2-4e5f-b309-49ee92eb2006 @
> >localhost (dataPort=-1).
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
> >null (Service: AmazonKinesis; Status Code: 400; Error Code:
> >UnknownOperationException; Request ID:
> >05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null)
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >Suppressed: java.lang.NullPointerException
> >at
>
> >org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
> >~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> >~[flink-core-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
> >~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at
>
> >org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at
> >org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> >~[flink-runtime-1.14.0.jar:1.14.0]
> >at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> >
> >--
> >*Med Vänliga Hälsningar*
> >*Jonas Eyob*
>
> Mika Naylor
> https://autophagy.io
>


-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*

Reply via email to