[ 
https://issues.apache.org/jira/browse/SPARK-22046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

danny mor updated SPARK-22046:
------------------------------
    Description: 
State cannot be distributed on the cluster.
When the {color:#59afe1}StateStoreRDD{color}'s 
{color:#59afe1}getPrefferedLocation {color}is called it 
creates a {color:#59afe1} StateStoreId(checkpointLocation, operatorId, 
partition.index){color},
send it to the {color:#59afe1}StateStoreCoordinator {color},which holds a 
hashmap of {color:#59afe1}StateStoreId {color}to 
{color:#59afe1}ExecutorCacheTaskLocation{color}, and returns the executorId if 
it is cached.
the operatorId is generated once every batch in the 
{color:#59afe1}IncrementalExecution {color}instance
but it is almost always 0 since {color:#59afe1}IncrementalExecution {color}is 
instantiated each batch
the partition index is limited to the configured value 
{color:#14892c}"spark.sql.shuffle.partitions"{color} (in my case the default 
200)
so this limits cache to 200 entries which has no regard to the key itself .
When introducing new Executors to the cluster and new keys to streaming data, 
it does not effect the distribution of state because the 
{color:#59afe1}StateStoreId {color}does not regard those variables.



  was:
State cannot be distributed on the cluster.
When the {color:#59afe1}StateStoreRDD{color}'s 
{color:#59afe1}getPrefferedLocation {color}is called it 
creates a {color:#59afe1} StateStoreId(checkpointLocation, operatorId, 
partition.index){color},
send it to the {color:#59afe1}StateStoreCoordinator {color},which holds a 
hashmap of {color:#59afe1}StateStoreId {color}to 
{color:#59afe1}ExecutorCacheTaskLocation{color}, and returns the executorId if 
it is cached.
the operatorId is generated once every batch in the 
{color:#59afe1}IncrementalExecution {color}instance
but it is almost always 0 since {color:#59afe1}IncrementalExecution {color}is 
instantiated each batch
the partition index is limited to the configured value 
"spark.sql.shuffle.partitions" (in my case the default 200)
so this limits cache to 200 entries which has no regard to the key itself .
When introducing new Executors to the cluster and new keys to streaming data, 
it does not effect the distribution of state because the 
{color:#59afe1}StateStoreId {color}does not regard those variables.




> Streaming State cannot be scalable
> ----------------------------------
>
>                 Key: SPARK-22046
>                 URL: https://issues.apache.org/jira/browse/SPARK-22046
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: OS: amazon linux, 
> Streaming Source: kafka 0.10
> vm: aws ec2
> cluster resources: 16Gb per worker, single executor per worker, 8 cores per 
> executor
> storage: hdfs
>            Reporter: danny mor
>            Priority: Minor
>
> State cannot be distributed on the cluster.
> When the {color:#59afe1}StateStoreRDD{color}'s 
> {color:#59afe1}getPrefferedLocation {color}is called it 
> creates a {color:#59afe1} StateStoreId(checkpointLocation, operatorId, 
> partition.index){color},
> send it to the {color:#59afe1}StateStoreCoordinator {color},which holds a 
> hashmap of {color:#59afe1}StateStoreId {color}to 
> {color:#59afe1}ExecutorCacheTaskLocation{color}, and returns the executorId 
> if it is cached.
> the operatorId is generated once every batch in the 
> {color:#59afe1}IncrementalExecution {color}instance
> but it is almost always 0 since {color:#59afe1}IncrementalExecution {color}is 
> instantiated each batch
> the partition index is limited to the configured value 
> {color:#14892c}"spark.sql.shuffle.partitions"{color} (in my case the default 
> 200)
> so this limits cache to 200 entries which has no regard to the key itself .
> When introducing new Executors to the cluster and new keys to streaming data, 
> it does not effect the distribution of state because the 
> {color:#59afe1}StateStoreId {color}does not regard those variables.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to