[
https://issues.apache.org/jira/browse/SAMZA-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14011682#comment-14011682
]
Chris Riccomini commented on SAMZA-267:
---------------------------------------
bq. Leaving them in would give very fine-grain control over a job, ie run with
topica for while, stop, run with topicb for a while, stop, switch back to
topica. I'm just not clear on how useful that would be. I'm fine with stripping
the outdated ones.
Yea, I definitely see the use case for that. I also see it being very
confusing. If you run for topic b for months, and totally forget you ever ran
with topic a, but switch back months later, you'd pick up at some random spot
from months ago (if the underlying system still has the messages). Seems kind
of weird. On the other hand, with stripping streams, you get the weird behavior
that if you stop the job before a new checkpoint has been written and switch
back to the old input stream, you can STILL pick up where you left off.
Neither one seems really ideal. I just went with one that seemed a bit safer.
Either way, SAMZA-180 gets us out of any serious problems, since we can force
offsets if we need to.
> OffsetManager fails if a checkpointed topic isn't in task.inputs
> ----------------------------------------------------------------
>
> Key: SAMZA-267
> URL: https://issues.apache.org/jira/browse/SAMZA-267
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.7.0
>
> Attachments: SAMZA-267-0.patch
>
>
> If you run a job with task.inputs=foo, let the job checkpoint, then restart
> it with task.inputs=bar, the last checkpoint will have foo in it. This will
> cause the OffsetManager to fail with:
> {noformat}
> 2014-05-15 12:16:28 SamzaContainer [ERROR] Caught exception in process loop.
> org.apache.samza.SamzaException: Attempting to reset a stream that doesn't
> have offset settings SystemStream [system=kafka, stream=foo].
> at
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
> at
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1$$anonfun$apply$5.apply(OffsetManager.scala:305)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> at
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:305)
> at
> org.apache.samza.checkpoint.OffsetManager$$anonfun$getSystemStreamPartitionsToReset$1.apply(OffsetManager.scala:302)
> at
> scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
> at
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
> at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
> at
> org.apache.samza.checkpoint.OffsetManager.getSystemStreamPartitionsToReset(OffsetManager.scala:302)
> at
> org.apache.samza.checkpoint.OffsetManager.stripResetStreams(OffsetManager.scala:287)
> at
> org.apache.samza.checkpoint.OffsetManager.start(OffsetManager.scala:165)
> at
> org.apache.samza.container.SamzaContainer.startOffsetManager(SamzaContainer.scala:558)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:492)
> at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {noformat}
> We should just warn in this case, rather than fail the container.
--
This message was sent by Atlassian JIRA
(v6.2#6252)