Chris,

I think the schema repository acts as the stand in for the database catalog
or hcat. People who don't have that will have to give their schema with the
query. I think that can be some kind of plugin to provide the schema so it
is automatically inferred for the avro people and manually provided for
others.

-Jay

On Fri, Jan 30, 2015 at 12:06 PM, Chris Riccomini <criccom...@apache.org>
wrote:

> Hey all,
>
> I have a few more comments on the metadata issue that I brought up. The
> three things that we lack right now are:
>
> 1. Partition count.
> 2. Partition key.
> 3. Stream (or message) schema.
>
> These have to be evaluated both on the ingest (first consumers) and egress
> (final producers) of a query.
>
> On the consumer-side, (1) can be determined simply by asking Kafka. On the
> producer-side, we have no way to determine how many partitions an output
> stream should be right now, short of having the user specify it in some
> way.
>
> On the producer-side (2) must defined in SQL, or via a hint, for final
> output to a stream. Intermediate streams can be keyed based off of the
> consumers within the session. On the consumer-side, the consumer can always
> re-partition an input stream based on the partition key that it needs, so
> it doesn't need to know a priori what the key was.
>
> Message schemas (3) can be evaluated on the consumer-side at runtime by
> poking at the incoming messages, which should be self-describing. Output
> message schemas can be derived based off of the transformations of incoming
> messages on the producer-side.
>
> If we're OK with these strategies, then it seems we only really need to
> worry about:
>
> 1. Producer-side partition count.
> 2. Producer-side partition key.
>
> One other annoying property of (3) is that compile-time validation of a
> query won't be able to check that a field for a given stream actually
> exists--it'll fail at runtime. This could perhaps be accelerated by
> sampling some messages before deploying the job, to check that the messages
> have the appropriate fields, but it's still a runtime check.
>
> Cheers,
> Chris
>
> On Fri, Jan 30, 2015 at 10:43 AM, Chris Riccomini <criccom...@apache.org>
> wrote:
>
> > Hey all,
> >
> > Just catching up on this thread. The Calcite + Samza approach seems
> pretty
> > compelling to me. I think most of what Julian is arguing for makes sense.
> > My main concern is with practicalities.
> >
> > One specific case of this is the discussion about the partitioning model.
> > In an ideal world, I agree, developers wouldn't need to define partitions
> > in SQL. Practically speaking, though, Samza (Kafka, really) currently
> > doesn't have a metadata repository. Without a metadata repository, we
> have
> > no way of knowing 1) which key a topic is partitioned by, and 2) what the
> > schema of the topic is (without looking at records). We know this
> *within*
> > a query (or session), but not *between* queries (or sessions) from
> disjoint
> > users.
> >
> > One could argue that we should spend time defining such a metadata
> > repository, which is a reasonable argument. But that's also a fairly
> large
> > effort. I wonder if we might be able to cut some corners in a clean-ish
> > forwards-compatible way, so that developers don't have to wait for us to
> > fully implement (or integrate with) something like a Hive metadata store.
> >
> > In person, one thing you mentioned, Julian, was using hints, rather than
> > stuff baked into the syntax. If that's stomach-able, we could support
> > partitioning through hints, until we have a full blown metadata store.
> >
> > Thoughts?
> >
> > Cheers,
> > Chris
> >
> > On Thu, Jan 29, 2015 at 5:10 PM, Julian Hyde <jul...@hydromatic.net>
> > wrote:
> >
> >>
> >> > On Jan 29, 2015, at 4:38 PM, Yi Pan <nickpa...@gmail.com> wrote:
> >> >
> >> > I am wondering if I can get an average that's per 30 min window
> >> averages?
> >> > I.e. the following is the input events in a stream:
> >> >  {10:01, ORCL, 10, 10}
> >> >  {10:02, MSFT, 30, 30}
> >> >  {10:03, ORCL, 100, 110}
> >> >  {10:17, MSFT, 45, 75}
> >> >  {10:59, ORCL, 20, 130}
> >> >  {11:02, ORCL, 50, 50}
> >> > Can I get the non-overlapping window average from 10:00-10:29, and
> >> > 10:30-10:59, ... ? Could you give an example how to define that window
> >> > operation in your model? Note that in this use case, although I may
> >> have 1
> >> > trading record per minute from the stream, I only generate 2 average
> >> > records from 10:00-11:00.
> >>
> >> That takes a bit of messy date arithmetic, made even more messy by the
> >> fact that you can't regard a SQL timestamp as a "milliseconds" value as
> one
> >> would in C or Java, nor can you divide it. Hence the trick of
> subtracting a
> >> constant timestamp, which yields an interval value, and then doing
> integer
> >> division on that interval to find the number of 30-minute periods since
> the
> >> epoch.
> >>
> >> select stream rowtime, ticker, amount,
> >>   sum(amount) over (
> >>     order by rowtime
> >>     partition by ticker,
> >>       (rowtime - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 30)
> >> from StockTrades;
> >>
> >> If I were doing this kind of calculation often I'd define a UDF, or even
> >> that user-defined window SPI I mentioned earlier.
> >>
> >> > {quote}
> >> > CREATE TABLE Emp(empno INTEGER, name VARCHAR(20), department
> >> VARCHAR(20))
> >> >  PARTITION BY HASHCODE (department);
> >> > {quote}
> >> > That's good! At least it resolves my question on: "which field is the
> >> > partition key". However, I still have the question on the number of
> >> > partitions. As I stated that when the system currently does not have a
> >> > "auto-scaling" feature, the number of partitions for a stream has to
> be
> >> > explicitly specified. Where do you suggest to put this information in
> >> w/o
> >> > breaking SQL syntax?
> >>
> >> I imagine you could start the system on Tuesday with 10 partitions per
> >> stream and restart it on Wednesday with 8 or 12? You wouldn't want to
> >> change the SQL, because that's in the application. But you could change
> the
> >> definition of the stream, either the DDL or by changing some other
> system
> >> configuration. Then partitioning function (applied by the system to
> route
> >> the record) could, say, take the value of p modulo of the current
> number of
> >> streams.
> >>
> >> Julian
> >>
> >>
> >
>

Reply via email to