-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17869/#review34108
-----------------------------------------------------------


1. Clean up misc white space tabs (highlighted in red) in RB.
2. Add a test.
3. Update docs to describe new configuration 
(docs/learn/documentation/0.7.0/jobs/configuration-table.html)


samza-core/src/main/scala/org/apache/samza/config/SystemConsumersConfig.scala
<https://reviews.apache.org/r/17869/#comment64080>

    Thought about this more. Rather than create a SystemConsumersConfig, let's 
just move this setting into the existing TaskConfig class.
    
    For the name, let's keep it consistent with other config names:
    
    task.drop.deserilization.errors



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/17869/#comment64074>

    Remove extra tab.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/17869/#comment64076>

    Could you move this to right above where it's used (above the new 
SystemConsumers call in SamazContainer)?
    
    Using config.getDropIncomingMessagesOnSerdeFailure should work. I wonder if 
it didn't work because SystemConsumers expects a boolean, and this is returning 
an Option[Boolean]. You might try:
    
    config.getDropIncomingMessagesOnSerdeFailure.getOrElse(false)



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/17869/#comment64081>

    No need to define this here. Just use serdeErrorDrop directly below.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/17869/#comment64082>

    Move this to SystemConsumersMetrics. Rename to deserialization error, since 
we're deserializing, not serializing here. Use standard metrics style 
("deserialization-errors").



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/17869/#comment64083>

    We can make this a little more compact and idiomatic-Scala with:
    
    val messageEnvelope = try {
      Some(serdeManager.fromBytes(envelope))
    } catch {
      ...
      None
    }
    
    if(messageEnvelope.isDefined) {
      messageEnvelope.get
    }


- Chris Riccomini


On Feb. 10, 2014, 8:37 p.m., Daniel Antonetti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17869/
> -----------------------------------------------------------
> 
> (Updated Feb. 10, 2014, 8:37 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-59
>     https://issues.apache.org/jira/browse/SAMZA-59
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> I have added a boolean flag (serdeErrorDrop) to SystemConsumers.scala,  Which 
> will allow a configuration to drop bad incoming messages.
> 
> 
> I moved the message deserialization up before updateFetchMap().
> 
> 
> I tried to setup the configuration, but it was not working. I am getting a 
> compiler error, and I could not find where the other configurations were 
> setup.
> 
> 
> NOTE : I tried to make my change consistent with the rest of the code, but I 
> am not too familiar with scala.
> 
> 
> Diffs
> -----
> 
>   
> samza-core/src/main/scala/org/apache/samza/config/SystemConsumersConfig.scala 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 13421d2 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> cdba7fe 
> 
> Diff: https://reviews.apache.org/r/17869/diff/
> 
> 
> Testing
> -------
> 
> I tested both setting dropOnSerdeError to true and to false, and they both 
> seemed to work.
> But the configuration was not working, and I could not figure out how to get 
> it working.
> 
> 
> Thanks,
> 
> Daniel Antonetti
> 
>

Reply via email to