[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

2019-09-16 Thread Karthikeyan Ravi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Karthikeyan Ravi updated SPARK-25721:
-
Attachment: Screen Shot 2019-09-16 at 12.27.25 PM.png

> maxRate configuration not being used in Kinesis receiver
> 
>
> Key: SPARK-25721
> URL: https://issues.apache.org/jira/browse/SPARK-25721
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Zhaobo Yu
>Priority: Major
> Attachments: Screen Shot 2019-09-16 at 12.27.25 PM.png, 
> rate_violation.png
>
>
> In the onStart() function of KinesisReceiver class, the 
> KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so 
> KinesisClientLibConfiguration will set it to 1 by default since it has 
> been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 1;
> In such a case, the receiver will not fulfill any maxRate setting we set if 
> it's less than 10k, worse still, it will cause 
> ProvisionedThroughputExceededException from Kinesis, especially when we 
> restart the streaming application. 
>  
> Attached  rate_violation.png, we have a spark streaming application that has 
> 40 receivers, which is set to consume 1 record per second. Within 5 minutes 
> the spark streaming application should take no more than 12k records/5 
> minutes (40*60*5 = 12k), but cloudwatch metrics shows it was consuming more 
> than that, which is almost at the rate of 22k records/5 minutes.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

2018-10-12 Thread Zhaobo Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhaobo Yu updated SPARK-25721:
--
Description: 
In the onStart() function of KinesisReceiver class, the 
KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so 
KinesisClientLibConfiguration will set it to 1 by default since it has been 
hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 1;

In such a case, the receiver will not fulfill any maxRate setting we set if 
it's less than 10k, worse still, it will cause 
ProvisionedThroughputExceededException from Kinesis, especially when we restart 
the streaming application. 

 

Attached  rate_violation.png, we have a spark streaming application that has 40 
receivers, which is set to consume 1 record per second. Within 5 minutes the 
spark streaming application should take no more than 12k records/5 minutes 
(40*60*5 = 12k), but cloudwatch metrics shows it was consuming more than that, 
which is almost at the rate of 22k records/5 minutes.

  was:
In the onStart() function of KinesisReceiver class, the 
KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so 
KinesisClientLibConfiguration will set it to 1 by default since it has been 
hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 1;

In such a case, the receiver will not fulfill any maxRate setting we set, worse 
still, it will cause ProvisionedThroughputExceededException from Kinesis, 
especially when we restart the streaming application. 

 

Attached  rate_violation.png, we have a spark streaming application that has 40 
receivers, which is set to consume 1 record per second. Within 5 minutes the 
spark streaming application should take no more than 12k records/5 minutes 
(40*60*5 = 12k), but cloudwatch metrics shows it was consuming more than that, 
which is almost at the rate of 22k records/5 minutes.


> maxRate configuration not being used in Kinesis receiver
> 
>
> Key: SPARK-25721
> URL: https://issues.apache.org/jira/browse/SPARK-25721
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Zhaobo Yu
>Priority: Major
> Attachments: rate_violation.png
>
>
> In the onStart() function of KinesisReceiver class, the 
> KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so 
> KinesisClientLibConfiguration will set it to 1 by default since it has 
> been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 1;
> In such a case, the receiver will not fulfill any maxRate setting we set if 
> it's less than 10k, worse still, it will cause 
> ProvisionedThroughputExceededException from Kinesis, especially when we 
> restart the streaming application. 
>  
> Attached  rate_violation.png, we have a spark streaming application that has 
> 40 receivers, which is set to consume 1 record per second. Within 5 minutes 
> the spark streaming application should take no more than 12k records/5 
> minutes (40*60*5 = 12k), but cloudwatch metrics shows it was consuming more 
> than that, which is almost at the rate of 22k records/5 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: 

[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

2018-10-12 Thread Zhaobo Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhaobo Yu updated SPARK-25721:
--
Description: 
In the onStart() function of KinesisReceiver class, the 
KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so 
KinesisClientLibConfiguration will set it to 1 by default since it has been 
hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 1;

In such a case, the receiver will not fulfill any maxRate setting we set, worse 
still, it will cause ProvisionedThroughputExceededException from Kinesis, 
especially when we restart the streaming application. 

 

Attached  rate_violation.png, we have a spark streaming application that has 40 
receivers, which is set to consume 1 record per second. Within 5 minutes the 
spark streaming application should take no more than 12k records/5 minutes 
(40*60*5 = 12k), but cloudwatch metrics shows it was consuming more than that, 
which is almost at the rate of 22k records/5 minutes.

  was:
In the onStart() function of KinesisReceiver class, the 
KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so 
KinesisClientLibConfiguration will set it to 1 by default since it has been 
hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 1;

In such a case, the receiver will not fulfill any maxRate setting we set, worse 
still, it will cause ProvisionedThroughputExceededException from Kinesis, 
especially when we restart the streaming application. 

 

Attached  rate_violation.png, 


> maxRate configuration not being used in Kinesis receiver
> 
>
> Key: SPARK-25721
> URL: https://issues.apache.org/jira/browse/SPARK-25721
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Zhaobo Yu
>Priority: Major
> Attachments: rate_violation.png
>
>
> In the onStart() function of KinesisReceiver class, the 
> KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so 
> KinesisClientLibConfiguration will set it to 1 by default since it has 
> been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 1;
> In such a case, the receiver will not fulfill any maxRate setting we set, 
> worse still, it will cause ProvisionedThroughputExceededException from 
> Kinesis, especially when we restart the streaming application. 
>  
> Attached  rate_violation.png, we have a spark streaming application that has 
> 40 receivers, which is set to consume 1 record per second. Within 5 minutes 
> the spark streaming application should take no more than 12k records/5 
> minutes (40*60*5 = 12k), but cloudwatch metrics shows it was consuming more 
> than that, which is almost at the rate of 22k records/5 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

2018-10-12 Thread Zhaobo Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhaobo Yu updated SPARK-25721:
--
Attachment: rate_violation.png

> maxRate configuration not being used in Kinesis receiver
> 
>
> Key: SPARK-25721
> URL: https://issues.apache.org/jira/browse/SPARK-25721
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Zhaobo Yu
>Priority: Major
> Attachments: rate_violation.png
>
>
> In the onStart() function of KinesisReceiver class, the 
> KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so 
> KinesisClientLibConfiguration will set it to 1 by default since it has 
> been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 1;
> In such a case, the receiver will not fulfill any maxRate setting we set, 
> worse still, it will cause ProvisionedThroughputExceededException from 
> Kinesis, especially when we restart the streaming application. 
>  
> Attached  rate_violation.png, 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

2018-10-12 Thread Zhaobo Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhaobo Yu updated SPARK-25721:
--
Description: 
In the onStart() function of KinesisReceiver class, the 
KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so 
KinesisClientLibConfiguration will set it to 1 by default since it has been 
hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 1;

In such a case, the receiver will not fulfill any maxRate setting we set, worse 
still, it will cause ProvisionedThroughputExceededException from Kinesis, 
especially when we restart the streaming application. 

 

Attached  rate_violation.png, 

  was:
In the onStart() function of KinesisReceiver class, the 
KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so 
KinesisClientLibConfiguration will set it to 1 by default since it has been 
hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 1;

In such a case, the receiver will not fulfill any maxRate setting we set, worse 
still, it will cause ProvisionedThroughputExceededException from Kinesis, 
especially when we restart the streaming application. 

 

Attached  


> maxRate configuration not being used in Kinesis receiver
> 
>
> Key: SPARK-25721
> URL: https://issues.apache.org/jira/browse/SPARK-25721
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Zhaobo Yu
>Priority: Major
> Attachments: rate_violation.png
>
>
> In the onStart() function of KinesisReceiver class, the 
> KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so 
> KinesisClientLibConfiguration will set it to 1 by default since it has 
> been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 1;
> In such a case, the receiver will not fulfill any maxRate setting we set, 
> worse still, it will cause ProvisionedThroughputExceededException from 
> Kinesis, especially when we restart the streaming application. 
>  
> Attached  rate_violation.png, 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

2018-10-12 Thread Zhaobo Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhaobo Yu updated SPARK-25721:
--
Attachment: Screenshot from 2018-10-12 14-51-29.png

> maxRate configuration not being used in Kinesis receiver
> 
>
> Key: SPARK-25721
> URL: https://issues.apache.org/jira/browse/SPARK-25721
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Zhaobo Yu
>Priority: Major
>
> In the onStart() function of KinesisReceiver class, the 
> KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so 
> KinesisClientLibConfiguration will set it to 1 by default since it has 
> been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 1;
> In such a case, the receiver will not fulfill any maxRate setting we set, 
> worse still, it will cause ProvisionedThroughputExceededException from 
> Kinesis, especially when we restart the streaming application. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

2018-10-12 Thread Zhaobo Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhaobo Yu updated SPARK-25721:
--
Description: 
In the onStart() function of KinesisReceiver class, the 
KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so 
KinesisClientLibConfiguration will set it to 1 by default since it has been 
hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 1;

In such a case, the receiver will not fulfill any maxRate setting we set, worse 
still, it will cause ProvisionedThroughputExceededException from Kinesis, 
especially when we restart the streaming application. 

 

Attached  

  was:
In the onStart() function of KinesisReceiver class, the 
KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so 
KinesisClientLibConfiguration will set it to 1 by default since it has been 
hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 1;

In such a case, the receiver will not fulfill any maxRate setting we set, worse 
still, it will cause ProvisionedThroughputExceededException from Kinesis, 
especially when we restart the streaming application. 

 


> maxRate configuration not being used in Kinesis receiver
> 
>
> Key: SPARK-25721
> URL: https://issues.apache.org/jira/browse/SPARK-25721
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Zhaobo Yu
>Priority: Major
>
> In the onStart() function of KinesisReceiver class, the 
> KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so 
> KinesisClientLibConfiguration will set it to 1 by default since it has 
> been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 1;
> In such a case, the receiver will not fulfill any maxRate setting we set, 
> worse still, it will cause ProvisionedThroughputExceededException from 
> Kinesis, especially when we restart the streaming application. 
>  
> Attached  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

2018-10-12 Thread Zhaobo Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhaobo Yu updated SPARK-25721:
--
Attachment: (was: Screenshot from 2018-10-12 14-51-29.png)

> maxRate configuration not being used in Kinesis receiver
> 
>
> Key: SPARK-25721
> URL: https://issues.apache.org/jira/browse/SPARK-25721
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Zhaobo Yu
>Priority: Major
>
> In the onStart() function of KinesisReceiver class, the 
> KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so 
> KinesisClientLibConfiguration will set it to 1 by default since it has 
> been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 1;
> In such a case, the receiver will not fulfill any maxRate setting we set, 
> worse still, it will cause ProvisionedThroughputExceededException from 
> Kinesis, especially when we restart the streaming application. 
>  
> Attached  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org