Zhaobo Yu created SPARK-25721:
---------------------------------

             Summary: maxRate configuration not being used in Kinesis receiver
                 Key: SPARK-25721
                 URL: https://issues.apache.org/jira/browse/SPARK-25721
             Project: Spark
          Issue Type: Bug
          Components: DStreams
    Affects Versions: 2.2.0
            Reporter: Zhaobo Yu


In the onStart() function of KinesisReceiver class, the 
KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so 
KinesisClientLibConfiguration will set it to 10000 by default since it has been 
hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 10000;

In such a case, the receiver will not fulfill any maxRate setting we set, worse 
still, it will cause ProvisionedThroughputExceededException from Kinesis, 
especially when we restart the streaming application. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to