[ 
https://issues.apache.org/jira/browse/SAMZA-374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14102558#comment-14102558
 ] 

Chris Riccomini commented on SAMZA-374:
---------------------------------------

bq. All of these solutions seem pretty hacky since the main problem is that 
Kafka does not have a reliable, safe way to delete existing topics. 

Yea, I agree that they're hacky. I ended up fiddling with the Kafka topic's 
retention policy (drop to 2 seconds, wait, raise it back up) to delete 
everything.

bq. The general solution at the moment would be to rename the job. 
Alternatively, we could allow the users to set a prefix in the config for the 
checkpoint topic name and, when they need to discard an existing checkpoint, 
they increment or change this prefix and then restart the job.

This would work. It also feels a little bit hacky. I feel like waiting for 
delete topic might just be the best thing to do. It's just going to be painful 
in the short-term. I do think the Kafka folks have a partially working delete 
going, and are working on fixing it up.

> Need to be able to change SSP Grouper
> -------------------------------------
>
>                 Key: SAMZA-374
>                 URL: https://issues.apache.org/jira/browse/SAMZA-374
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>             Fix For: 0.8.0
>
>
> I recently ran a job with checkpointing enabled. The default grouper was used 
> (group by partition). I then decided that I wanted to increase parallelism, 
> so I set the grouper to group by SSP. This cause the container to get wedged 
> in this loop:
> {noformat}
> 2014-08-08 18:56:06 VerifiableProperties [INFO] Verifying properties
> 2014-08-08 18:56:06 VerifiableProperties [INFO] Property client.id is 
> overridden to 
> samza_checkpoint_manager-repartition_by_treeid-i001-1407524103759-2
> 2014-08-08 18:56:06 VerifiableProperties [INFO] Property metadata.broker.list 
> is overridden to kafka-vip-e:10251
> 2014-08-08 18:56:06 VerifiableProperties [INFO] Property request.timeout.ms 
> is overridden to 60000
> 2014-08-08 18:56:06 ClientUtils$ [INFO] Fetching metadata from broker 
> id:0,host:kafka-vip-e,port:10251 with correlation id 7 for 1 topic(s) 
> Set(__samza_checkpoint_ver_1_for_my-job_i001)
> 2014-08-08 18:56:06 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 
> 60000 (requested 60000), SO_RCVBUF = 43690 (requested -1), SO_SNDBUF = 102400 
> (requested 102400).
> 2014-08-08 18:56:06 SyncProducer [INFO] Connected to kafka-vip-e:10251 for 
> producing
> 2014-08-08 18:56:06 SyncProducer [INFO] Disconnecting from kafka-vip-e:10251
> 2014-08-08 18:56:06 ClientUtils$ [DEBUG] Successfully fetched metadata for 1 
> topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
> 2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Connecting to leader 
> app196.:10251 for topic __samza_checkpoint_ver_1_for_my-job_i001 and to fetch 
> all checkpoint messages.
> 2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Got offset 13 for topic 
> __samza_checkpoint_ver_1_for_my-job_i001 and partition 0. Attempting to fetch 
> messages for changelog partition mapping.
> 2014-08-08 18:56:06 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
> 2014-08-08 18:56:06 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 
> 60000 (requested 60000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 
> 8192 (requested -1).
> 2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Get latest offset 80626 for 
> topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0.
> 2014-08-08 18:56:07 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
> 2014-08-08 18:56:07 KafkaCheckpointManager [WARN] While trying to read last 
> changelog partition mapping entry for topic 
> __samza_checkpoint_ver_1_for_my-job_i001 and partition 0: 
> org.apache.samza.SamzaException: Exception while deserializing checkpoint 
> key. Retrying.
> 2014-08-08 18:56:07 KafkaCheckpointManager [DEBUG] Exception detail:
> org.apache.samza.SamzaException: Exception while deserializing checkpoint key
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:177)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:300)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:292)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at kafka.message.MessageSet.foreach(MessageSet.scala:67)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:292)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:254)
>       at 
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:253)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:240)
>       at 
> org.apache.samza.util.Util$.getTaskNameToChangeLogPartitionMapping(Util.scala:280)
>       at 
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:79)
>       at 
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
>       at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> Caused by: 
> org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartitionGrouperFactoryValues:
>  Checkpoint key's SystemStreamPartition Grouper factory 
> (org.apache.samza.container.grouper.stream.GroupByPartitionFactory) does not 
> match value from current configuration 
> (org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory).
>   This likely means the SystemStreamPartitionGrouper was changed between job 
> runs, which is not supported.
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:170)
>       ... 15 more
> 2014-08-08 18:56:17 VerifiableProperties [INFO] Verifying properties
> 2014-08-08 18:56:17 VerifiableProperties [INFO] Property client.id is 
> overridden to 
> samza_checkpoint_manager-repartition_by_treeid-i001-1407524103759-2
> 2014-08-08 18:56:17 VerifiableProperties [INFO] Property metadata.broker.list 
> is overridden to kafka-vip-e:10251
> 2014-08-08 18:56:17 VerifiableProperties [INFO] Property request.timeout.ms 
> is overridden to 60000
> 2014-08-08 18:56:17 ClientUtils$ [INFO] Fetching metadata from broker 
> id:0,host:kafka-vip-e,port:10251 with correlation id 8 for 1 topic(s) 
> Set(__samza_checkpoint_ver_1_for_my-job_i001)
> 2014-08-08 18:56:17 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 
> 60000 (requested 60000), SO_RCVBUF = 43690 (requested -1), SO_SNDBUF = 102400 
> (requested 102400).
> 2014-08-08 18:56:17 SyncProducer [INFO] Connected to kafka-vip-e:10251 for 
> producing
> 2014-08-08 18:56:17 SyncProducer [INFO] Disconnecting from kafka-vip-e:10251
> 2014-08-08 18:56:17 ClientUtils$ [DEBUG] Successfully fetched metadata for 1 
> topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
> 2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Connecting to leader 
> app196.:10251 for topic __samza_checkpoint_ver_1_for_my-job_i001 and to fetch 
> all checkpoint messages.
> 2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Got offset 14 for topic 
> __samza_checkpoint_ver_1_for_my-job_i001 and partition 0. Attempting to fetch 
> messages for changelog partition mapping.
> 2014-08-08 18:56:17 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
> 2014-08-08 18:56:17 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 
> 60000 (requested 60000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 
> 8192 (requested -1).
> 2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Get latest offset 80626 for 
> topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0.
> 2014-08-08 18:56:17 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
> 2014-08-08 18:56:17 KafkaCheckpointManager [WARN] While trying to read last 
> changelog partition mapping entry for topic 
> __samza_checkpoint_ver_1_for_my-job_i001 and partition 0: 
> org.apache.samza.SamzaException: Exception while deserializing checkpoint 
> key. Retrying.
> 2014-08-08 18:56:17 KafkaCheckpointManager [DEBUG] Exception detail:
> org.apache.samza.SamzaException: Exception while deserializing checkpoint key
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:177)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:300)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:292)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at kafka.message.MessageSet.foreach(MessageSet.scala:67)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:292)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:254)
>       at 
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:253)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:240)
>       at 
> org.apache.samza.util.Util$.getTaskNameToChangeLogPartitionMapping(Util.scala:280)
>       at 
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:79)
>       at 
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
>       at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> Caused by: 
> org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartitionGrouperFactoryValues:
>  Checkpoint key's SystemStreamPartition Grouper factory 
> (org.apache.samza.container.grouper.stream.GroupByPartitionFactory) does not 
> match value from current configuration 
> (org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory).
>   This likely means the SystemStreamPartitionGrouper was changed between job 
> runs, which is not supported.
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:170)
>       ... 15 more
> {noformat}
> I'm knowingly breaking grouping semantics because my job doesn't need it. As 
> I recall, this was discussed in SAMZA-123, and we were all worried about 
> people accidentally breaking their state/grouping, so we hard fail when the 
> grouper is changed. The problem is, I can't change the checkpoint topic name, 
> nor is it easy for me to delete the checkpoint messages in the topic, so I'm 
> kind of stuck.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to