danny mor created SPARK-22046:
---------------------------------

             Summary: Streaming State cannot be scalable
                 Key: SPARK-22046
                 URL: https://issues.apache.org/jira/browse/SPARK-22046
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.2.0
         Environment: OS: amazon linux, 
Streaming Source: kafka 0.10
vm: aws ec2
cluster resources: 16Gb per worker, single executor per worker, 8 cores per 
executor
storage: hdfs
            Reporter: danny mor
            Priority: Minor


State cannot be distributed on the cluster.
When the {color:#59afe1}StateStoreRDD{color}'s 
{color:#59afe1}getPrefferedLocation {color}is called it 
creates a {color:#59afe1} StateStoreId(checkpointLocation, operatorId, 
partition.index){color},
send it to the {color:#59afe1}StateStoreCoordinator {color},which holds a 
hashmap of {color:#59afe1}StateStoreId {color}to 
{color:#59afe1}ExecutorCacheTaskLocation{color}, and returns the executorId if 
it is cached.
the operatorId is generated once every batch in the 
{color:#59afe1}IncrementalExecution {color}instance
but it is almost always 0 since {color:#59afe1}IncrementalExecution {color}is 
instantiated each batch
the partition index is limited to the configured value 
"spark.sql.shuffle.partitions" (in my case the default 200)
so this limits cache to 200 entries which has no regard to the key itself .
When introducing new Executors to the cluster and new keys to streaming data, 
it does not effect the distribution of state because the 
{color:#59afe1}StateStoreId {color}does not regard those variables.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to