> On Feb. 10, 2014, 10:11 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 246
> > <https://reviews.apache.org/r/17869/diff/1/?file=480625#file480625line246>
> >
> >     Could you move this to right above where it's used (above the new 
> > SystemConsumers call in SamazContainer)?
> >     
> >     Using config.getDropIncomingMessagesOnSerdeFailure should work. I 
> > wonder if it didn't work because SystemConsumers expects a boolean, and 
> > this is returning an Option[Boolean]. You might try:
> >     
> >     config.getDropIncomingMessagesOnSerdeFailure.getOrElse(false)
> 
> Daniel Antonetti wrote:
>     I am getting a compiler error with this change.
>     I am not familiar with this syntax, or scala, i need to do some research 
> on what I am doing wrong here.
>     
>     
>     [ant:scalac] 
> /Users/dantonetti/work/samza/incubator-samza/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:279:
>  error: type mismatch;
>     [ant:scalac]  found   : Any
>     [ant:scalac]  required: Boolean
>     [ant:scalac] Error occurred in an application involving default arguments.
>     [ant:scalac]       serdeErrorDrop = dropOnSerdeError)
>     [ant:scalac]                        ^
>     [ant:scalac] one error found
>

Hey Daniel,

The problem is that getDropIncomingMessagesOnSerdeFailure is returning an 
Option[String]. If you then call getOrElse(false), as I suggested, Scala 
discovers that the only shared type between String and Boolean (false) is "Any" 
(kind of like Object), so it's deciding to use Any as the type, and that 
doesn't work with serdeErrorDrop, which is defined as Boolean.

The fix is to do something like StreamConfig.getResetOffset:

  def getResetOffset(systemStream: SystemStream) =
    getOption(StreamConfig.CONSUMER_RESET_OFFSET format 
(systemStream.getSystem, systemStream.getStream)) match {
      case Some("true") => true
      case Some("false") => false
      case Some(resetOffset) =>
        warn("Got a configuration for %s that is not true, or false (was %s). 
Defaulting to false." format (StreamConfig.CONSUMER_RESET_OFFSET format 
(systemStream.getSystem, systemStream.getStream), resetOffset))
        false
      case _ => false
    }

This should then always return a boolean, which should make the Scala compiler 
happy. Sorry about the faulty suggestion.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17869/#review34108
-----------------------------------------------------------


On Feb. 17, 2014, 6:48 a.m., Daniel Antonetti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17869/
> -----------------------------------------------------------
> 
> (Updated Feb. 17, 2014, 6:48 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-59
>     https://issues.apache.org/jira/browse/SAMZA-59
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> I have added a boolean flag (serdeErrorDrop) to SystemConsumers.scala,  Which 
> will allow a configuration to drop bad incoming messages.
> 
> 
> I moved the message deserialization up before updateFetchMap().
> 
> 
> I tried to setup the configuration, but it was not working. I am getting a 
> compiler error, and I could not find where the other configurations were 
> setup.
> 
> 
> NOTE : I tried to make my change consistent with the rest of the code, but I 
> am not too familiar with scala.
> 
> 
> Diffs
> -----
> 
>   docs/learn/documentation/0.7.0/jobs/configuration-table.html 8b42b3a 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 3510f1f 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 13421d2 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> cdba7fe 
>   
> samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
>  d632314 
> 
> Diff: https://reviews.apache.org/r/17869/diff/
> 
> 
> Testing
> -------
> 
> I tested both setting dropOnSerdeError to true and to false, and they both 
> seemed to work.
> But the configuration was not working, and I could not figure out how to get 
> it working.
> 
> 
> Thanks,
> 
> Daniel Antonetti
> 
>

Reply via email to