[ https://issues.apache.org/jira/browse/SPARK-22046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
danny mor updated SPARK-22046: ------------------------------ Description: State cannot be distributed on the cluster. When the {color:#59afe1}StateStoreRDD{color}'s {color:#59afe1}getPrefferedLocation {color}is called it creates a {color:#59afe1} StateStoreId(checkpointLocation, operatorId, partition.index){color}, send it to the {color:#59afe1}StateStoreCoordinator {color},which holds a hashmap of {color:#59afe1}StateStoreId {color}to {color:#59afe1}ExecutorCacheTaskLocation{color}, and returns the executorId if it is cached. the operatorId is generated once every batch in the {color:#59afe1}IncrementalExecution {color}instance but it is almost always 0 since {color:#59afe1}IncrementalExecution {color}is instantiated each batch the partition index is limited to the configured value {color:#14892c}"spark.sql.shuffle.partitions"{color} (in my case the default 200) so this limits cache to 200 entries which has no regard to the key itself . When introducing new Executors to the cluster and new keys to streaming data, it does not effect the distribution of state because the {color:#59afe1}StateStoreId {color}does not regard those variables. was: State cannot be distributed on the cluster. When the {color:#59afe1}StateStoreRDD{color}'s {color:#59afe1}getPrefferedLocation {color}is called it creates a {color:#59afe1} StateStoreId(checkpointLocation, operatorId, partition.index){color}, send it to the {color:#59afe1}StateStoreCoordinator {color},which holds a hashmap of {color:#59afe1}StateStoreId {color}to {color:#59afe1}ExecutorCacheTaskLocation{color}, and returns the executorId if it is cached. the operatorId is generated once every batch in the {color:#59afe1}IncrementalExecution {color}instance but it is almost always 0 since {color:#59afe1}IncrementalExecution {color}is instantiated each batch the partition index is limited to the configured value "spark.sql.shuffle.partitions" (in my case the default 200) so this limits cache to 200 entries which has no regard to the key itself . When introducing new Executors to the cluster and new keys to streaming data, it does not effect the distribution of state because the {color:#59afe1}StateStoreId {color}does not regard those variables. > Streaming State cannot be scalable > ---------------------------------- > > Key: SPARK-22046 > URL: https://issues.apache.org/jira/browse/SPARK-22046 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.0 > Environment: OS: amazon linux, > Streaming Source: kafka 0.10 > vm: aws ec2 > cluster resources: 16Gb per worker, single executor per worker, 8 cores per > executor > storage: hdfs > Reporter: danny mor > Priority: Minor > > State cannot be distributed on the cluster. > When the {color:#59afe1}StateStoreRDD{color}'s > {color:#59afe1}getPrefferedLocation {color}is called it > creates a {color:#59afe1} StateStoreId(checkpointLocation, operatorId, > partition.index){color}, > send it to the {color:#59afe1}StateStoreCoordinator {color},which holds a > hashmap of {color:#59afe1}StateStoreId {color}to > {color:#59afe1}ExecutorCacheTaskLocation{color}, and returns the executorId > if it is cached. > the operatorId is generated once every batch in the > {color:#59afe1}IncrementalExecution {color}instance > but it is almost always 0 since {color:#59afe1}IncrementalExecution {color}is > instantiated each batch > the partition index is limited to the configured value > {color:#14892c}"spark.sql.shuffle.partitions"{color} (in my case the default > 200) > so this limits cache to 200 entries which has no regard to the key itself . > When introducing new Executors to the cluster and new keys to streaming data, > it does not effect the distribution of state because the > {color:#59afe1}StateStoreId {color}does not regard those variables. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org