----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17869/#review34108 -----------------------------------------------------------
1. Clean up misc white space tabs (highlighted in red) in RB. 2. Add a test. 3. Update docs to describe new configuration (docs/learn/documentation/0.7.0/jobs/configuration-table.html) samza-core/src/main/scala/org/apache/samza/config/SystemConsumersConfig.scala <https://reviews.apache.org/r/17869/#comment64080> Thought about this more. Rather than create a SystemConsumersConfig, let's just move this setting into the existing TaskConfig class. For the name, let's keep it consistent with other config names: task.drop.deserilization.errors samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/17869/#comment64074> Remove extra tab. samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/17869/#comment64076> Could you move this to right above where it's used (above the new SystemConsumers call in SamazContainer)? Using config.getDropIncomingMessagesOnSerdeFailure should work. I wonder if it didn't work because SystemConsumers expects a boolean, and this is returning an Option[Boolean]. You might try: config.getDropIncomingMessagesOnSerdeFailure.getOrElse(false) samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala <https://reviews.apache.org/r/17869/#comment64081> No need to define this here. Just use serdeErrorDrop directly below. samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala <https://reviews.apache.org/r/17869/#comment64082> Move this to SystemConsumersMetrics. Rename to deserialization error, since we're deserializing, not serializing here. Use standard metrics style ("deserialization-errors"). samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala <https://reviews.apache.org/r/17869/#comment64083> We can make this a little more compact and idiomatic-Scala with: val messageEnvelope = try { Some(serdeManager.fromBytes(envelope)) } catch { ... None } if(messageEnvelope.isDefined) { messageEnvelope.get } - Chris Riccomini On Feb. 10, 2014, 8:37 p.m., Daniel Antonetti wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17869/ > ----------------------------------------------------------- > > (Updated Feb. 10, 2014, 8:37 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-59 > https://issues.apache.org/jira/browse/SAMZA-59 > > > Repository: samza > > > Description > ------- > > I have added a boolean flag (serdeErrorDrop) to SystemConsumers.scala, Which > will allow a configuration to drop bad incoming messages. > > > I moved the message deserialization up before updateFetchMap(). > > > I tried to setup the configuration, but it was not working. I am getting a > compiler error, and I could not find where the other configurations were > setup. > > > NOTE : I tried to make my change consistent with the rest of the code, but I > am not too familiar with scala. > > > Diffs > ----- > > > samza-core/src/main/scala/org/apache/samza/config/SystemConsumersConfig.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 13421d2 > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala > cdba7fe > > Diff: https://reviews.apache.org/r/17869/diff/ > > > Testing > ------- > > I tested both setting dropOnSerdeError to true and to false, and they both > seemed to work. > But the configuration was not working, and I could not figure out how to get > it working. > > > Thanks, > > Daniel Antonetti > >
