Assaf, thanks for the feedback.

The InternalRow issue is one we know about. If it helps, I wrote up some docs
for InternalRow
<https://github.com/apache/spark/blob/64da2971a1f083926df35fe1366bcba84d97c7b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/data/package.scala>
as
part of SPARK-23657 <https://issues.apache.org/jira/browse/SPARK-23657>. It
may be a good idea to make it easier for people to produce InternalRow, but
we want to be careful not to mislead people down paths that have bad
performance. InternalRow is what Spark will use directly for filters and we
don't want to do too much conversion. We shouldn't make it deceptively easy
to work with Row instead of InternalRow because Row is going to be slower.

For bad rows, I would suggest using a filtered iterator
<https://google.github.io/guava/releases/21.0/api/docs/com/google/common/collect/Iterables.html#filter-java.lang.Iterable-com.google.common.base.Predicate->
to solve your problem. How to handle invalid rows isn't really a concern
Spark should handle. Using a filtered iterator would give you the
hasNext/next methods you're looking for to implement Spark's API.

Metrics are something that we still need to add. I think that Spark should
handle record count and FS bytes read metrics like it does for other
sources (I've been meaning to contribute an implementation for DSv2). Bad
records may be a good candidate for requesting accumulators in the v2 API.

rb

On Thu, Oct 4, 2018 at 11:32 AM assaf.mendelson <assaf.mendel...@rsa.com>
wrote:

> Thanks for the info.
>
> I have been converting an internal data source to V2 and am now preparing
> it
> for 2.4.0.
>
> I have a couple of suggestions from my experience so far.
>
> First I believe we are missing documentation on this. I am currently
> writing
> an internal tutorial based on what I am learning, I would be happy to share
> it once it gets a little better (not sure where it should go though).
>
>
>
> The change from using Row to using InternalRow is a little confusing.
> For generic row we can do Row.fromSeq(values) where values are regular java
> types (matching the schema). This even includes more complex types like
> Array[String] and everything just works.
>
> For IntrenalRow, this doesn't work for non trivial types. I figured out how
> to convert strings and timestamps (hopefully I even did it correctly)  but
> I
> couldn't figure Array[String].
>
> Beyond the fact that I would love to learn how to do the conversion
> correctly for various types (such as array), I would suggest we should add
> some method to create the internal row from base types. In the 2.3.0
> version, the row we got from Get would be encoded via an encoder which was
> provided. I managed to get it to work by doing:
>
> val encoder = RowEncoder.apply(schema).resolveAndBind() in the constructor
> and then encoder.toRow(Row.fromSeq(values))
>
> this simply feels a little weird to me.
>
>
> Another issue that I encountered is handling bad data. In our legacy source
> we have cases where a specific row is bad. What we would do in non spark
> code is simply skip it.
>
> The problem is that in spark, if we put next to be true we must have some
> row for the get function. This means we always need to read records ahead
> to
> figure out if we actually ha something or not.
>
> Might we instead be allowed to return null from get in which case the line
> would just be skipped?
>
>
> Lastly I would be happy for a means to return metrics from the reading (how
> many records we read, how many bad records we have). Perhaps by allowing to
> use accumulators in the data source?
>
> Sorry for the long winded message, I will probably have more as I continue
> to explore this.
>
> Thanks,
>    Assaf.
>
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to