Hey all,

Trying to respond in order.

> If one wants finer-grained control over what the output topic would be
like, wouldn't it make sense to use a CREATE TABLE AS <query> statement?

Yes. CREATE TABLE AS could be used to define all three things that I
mentioned (partition key, partition count, and schema). I think the
discussion is about whether we *should* use it, and if so, for which ones.
See Jay's comments about wanting to explicitly define partitions in the
language. I agree with you, though. On the producer-side, between SEELCT
... clasuses, and CREATE TABLE AS, we have enough to define outgoing
schemas.

The point of my comments was just to explore what we could do with 1)
neither a schema registry 2) nor any SQL-level stream-specific clauses.
Once we've established that spectrum, I think we can argue where along it
we'd like to be.

> How would you know that even if you did look at the records? I'm pretty
sure that's impossible with avro.... Isn't it mandatory to have some sort
of metadata repository for Samza SQL to work?

It really depends on the serde. JSON, BSON, and MessagePack can all be
decoded (and the schema inferred) just by reading the message. The three
you mention (Avro, Thrift, Protobuf) might require a schema registry,
though all three could be encoded with per-message schemas as well. It
depends on the serde. I'm not saying that this is a good idea, but there
are certainly a lot of ways to make SQL work (for some definition of work)
without a schema registry.

> Given the solutions that already exist though, would it really be that
much effort to leverage one of them?

I might be overly-pessimistic, but adding a new piece of software to the
stack almost always has a non-trivial amount of overhead to
it. Operational, mental-model, code, etc. Samza is already so heavy-weight
as it is (ZK, YARN, Kafka). I agree with your statement that half the power
of SQL comes from the DDL. What I'm trying to do is figure out what the MVP
is. It would totally be useful to have a schema registry that defines
partition keys, partition counts, schemas, etc. What I was suggesting is
that we might be able to forego it initially, and try and implement the SQL
in a future-proof way, such that adding it later isn't a big problem.

Basically, I want to limit scope, so we could have something to show for
ourselves fast, rather than trying to do everything up front.

> I think the schema repository acts as the stand in for the database
catalog or hcat. People who don't have that will have to give their schema
with the query. I think that can be some kind of plugin to provide the
schema so it is automatically inferred for the avro people and manually
provided for others.

I agree. If you have a schema repo, we can use it. If you don't, you can
either 1) define the schema in your query or 2) you'll fail at runtime.
That seems OK to me. The json model that Julian and Milinda mention also
seems like a viable solution for those without a schema registry, though
it's a bit of a pain. Basically, to get the best experience, you need a
schema repo.

But this still leaves us with the partition key/partition count discussion.
It sounds like there's some consensus on the partition key being defined in
SQL (CREATE TABLE foo PARTITION BY member_id AS SELECT ... or something).

Is it fair to say that the last remaining point of my original comment is
on partition count being defined in the SQL or not? Personally, I tend to
agree with Jay's comments, but I'm not picky about whether the partition
count is defined directly in the query, or simply as a hint:

  CREATE TABLE foo PARTITION BY member_id WITH 200 PARTITIONS SELECT ...

Or:

  -- partition_count=200
  CREATE TABLE foo PARTITION BY member_id SELECT ...

I'm mostly just concerned that we have no way to define this now, so it
seems mandatory that we have *some* way of defining it (unless we expect
users to manually run a create-topic command before executing every query).

I guess I have two remaining questions, then:

1. What should we do about the partition count definition? (Jay's
discussion)
2. We had been building the operator layer (Yi's stuff) to support
heterogeneous schema types in a single stream. The schema discussion we've
been having seems to suggest that we only want to support one schema per
stream. Since Yi's stuff is more general, it could be made to work with
just one schema per-stream, but I just wanted to call it out as a conscious
decision. This seems as though it would prevent us from being able to
select over streams that want a strong time-order amongst messages of
different types (e.g. a change log from a DB shard that had more than on
table in it).

Cheers,
Chris

On Fri, Jan 30, 2015 at 3:50 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Chris,
>
> I think the schema repository acts as the stand in for the database catalog
> or hcat. People who don't have that will have to give their schema with the
> query. I think that can be some kind of plugin to provide the schema so it
> is automatically inferred for the avro people and manually provided for
> others.
>
> -Jay
>
> On Fri, Jan 30, 2015 at 12:06 PM, Chris Riccomini <criccom...@apache.org>
> wrote:
>
> > Hey all,
> >
> > I have a few more comments on the metadata issue that I brought up. The
> > three things that we lack right now are:
> >
> > 1. Partition count.
> > 2. Partition key.
> > 3. Stream (or message) schema.
> >
> > These have to be evaluated both on the ingest (first consumers) and
> egress
> > (final producers) of a query.
> >
> > On the consumer-side, (1) can be determined simply by asking Kafka. On
> the
> > producer-side, we have no way to determine how many partitions an output
> > stream should be right now, short of having the user specify it in some
> > way.
> >
> > On the producer-side (2) must defined in SQL, or via a hint, for final
> > output to a stream. Intermediate streams can be keyed based off of the
> > consumers within the session. On the consumer-side, the consumer can
> always
> > re-partition an input stream based on the partition key that it needs, so
> > it doesn't need to know a priori what the key was.
> >
> > Message schemas (3) can be evaluated on the consumer-side at runtime by
> > poking at the incoming messages, which should be self-describing. Output
> > message schemas can be derived based off of the transformations of
> incoming
> > messages on the producer-side.
> >
> > If we're OK with these strategies, then it seems we only really need to
> > worry about:
> >
> > 1. Producer-side partition count.
> > 2. Producer-side partition key.
> >
> > One other annoying property of (3) is that compile-time validation of a
> > query won't be able to check that a field for a given stream actually
> > exists--it'll fail at runtime. This could perhaps be accelerated by
> > sampling some messages before deploying the job, to check that the
> messages
> > have the appropriate fields, but it's still a runtime check.
> >
> > Cheers,
> > Chris
> >
> > On Fri, Jan 30, 2015 at 10:43 AM, Chris Riccomini <criccom...@apache.org
> >
> > wrote:
> >
> > > Hey all,
> > >
> > > Just catching up on this thread. The Calcite + Samza approach seems
> > pretty
> > > compelling to me. I think most of what Julian is arguing for makes
> sense.
> > > My main concern is with practicalities.
> > >
> > > One specific case of this is the discussion about the partitioning
> model.
> > > In an ideal world, I agree, developers wouldn't need to define
> partitions
> > > in SQL. Practically speaking, though, Samza (Kafka, really) currently
> > > doesn't have a metadata repository. Without a metadata repository, we
> > have
> > > no way of knowing 1) which key a topic is partitioned by, and 2) what
> the
> > > schema of the topic is (without looking at records). We know this
> > *within*
> > > a query (or session), but not *between* queries (or sessions) from
> > disjoint
> > > users.
> > >
> > > One could argue that we should spend time defining such a metadata
> > > repository, which is a reasonable argument. But that's also a fairly
> > large
> > > effort. I wonder if we might be able to cut some corners in a clean-ish
> > > forwards-compatible way, so that developers don't have to wait for us
> to
> > > fully implement (or integrate with) something like a Hive metadata
> store.
> > >
> > > In person, one thing you mentioned, Julian, was using hints, rather
> than
> > > stuff baked into the syntax. If that's stomach-able, we could support
> > > partitioning through hints, until we have a full blown metadata store.
> > >
> > > Thoughts?
> > >
> > > Cheers,
> > > Chris
> > >
> > > On Thu, Jan 29, 2015 at 5:10 PM, Julian Hyde <jul...@hydromatic.net>
> > > wrote:
> > >
> > >>
> > >> > On Jan 29, 2015, at 4:38 PM, Yi Pan <nickpa...@gmail.com> wrote:
> > >> >
> > >> > I am wondering if I can get an average that's per 30 min window
> > >> averages?
> > >> > I.e. the following is the input events in a stream:
> > >> >  {10:01, ORCL, 10, 10}
> > >> >  {10:02, MSFT, 30, 30}
> > >> >  {10:03, ORCL, 100, 110}
> > >> >  {10:17, MSFT, 45, 75}
> > >> >  {10:59, ORCL, 20, 130}
> > >> >  {11:02, ORCL, 50, 50}
> > >> > Can I get the non-overlapping window average from 10:00-10:29, and
> > >> > 10:30-10:59, ... ? Could you give an example how to define that
> window
> > >> > operation in your model? Note that in this use case, although I may
> > >> have 1
> > >> > trading record per minute from the stream, I only generate 2 average
> > >> > records from 10:00-11:00.
> > >>
> > >> That takes a bit of messy date arithmetic, made even more messy by the
> > >> fact that you can't regard a SQL timestamp as a "milliseconds" value
> as
> > one
> > >> would in C or Java, nor can you divide it. Hence the trick of
> > subtracting a
> > >> constant timestamp, which yields an interval value, and then doing
> > integer
> > >> division on that interval to find the number of 30-minute periods
> since
> > the
> > >> epoch.
> > >>
> > >> select stream rowtime, ticker, amount,
> > >>   sum(amount) over (
> > >>     order by rowtime
> > >>     partition by ticker,
> > >>       (rowtime - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 30)
> > >> from StockTrades;
> > >>
> > >> If I were doing this kind of calculation often I'd define a UDF, or
> even
> > >> that user-defined window SPI I mentioned earlier.
> > >>
> > >> > {quote}
> > >> > CREATE TABLE Emp(empno INTEGER, name VARCHAR(20), department
> > >> VARCHAR(20))
> > >> >  PARTITION BY HASHCODE (department);
> > >> > {quote}
> > >> > That's good! At least it resolves my question on: "which field is
> the
> > >> > partition key". However, I still have the question on the number of
> > >> > partitions. As I stated that when the system currently does not
> have a
> > >> > "auto-scaling" feature, the number of partitions for a stream has to
> > be
> > >> > explicitly specified. Where do you suggest to put this information
> in
> > >> w/o
> > >> > breaking SQL syntax?
> > >>
> > >> I imagine you could start the system on Tuesday with 10 partitions per
> > >> stream and restart it on Wednesday with 8 or 12? You wouldn't want to
> > >> change the SQL, because that's in the application. But you could
> change
> > the
> > >> definition of the stream, either the DDL or by changing some other
> > system
> > >> configuration. Then partitioning function (applied by the system to
> > route
> > >> the record) could, say, take the value of p modulo of the current
> > number of
> > >> streams.
> > >>
> > >> Julian
> > >>
> > >>
> > >
> >
>

Reply via email to