Hey all,

I have a few more comments on the metadata issue that I brought up. The
three things that we lack right now are:

1. Partition count.
2. Partition key.
3. Stream (or message) schema.

These have to be evaluated both on the ingest (first consumers) and egress
(final producers) of a query.

On the consumer-side, (1) can be determined simply by asking Kafka. On the
producer-side, we have no way to determine how many partitions an output
stream should be right now, short of having the user specify it in some way.

On the producer-side (2) must defined in SQL, or via a hint, for final
output to a stream. Intermediate streams can be keyed based off of the
consumers within the session. On the consumer-side, the consumer can always
re-partition an input stream based on the partition key that it needs, so
it doesn't need to know a priori what the key was.

Message schemas (3) can be evaluated on the consumer-side at runtime by
poking at the incoming messages, which should be self-describing. Output
message schemas can be derived based off of the transformations of incoming
messages on the producer-side.

If we're OK with these strategies, then it seems we only really need to
worry about:

1. Producer-side partition count.
2. Producer-side partition key.

One other annoying property of (3) is that compile-time validation of a
query won't be able to check that a field for a given stream actually
exists--it'll fail at runtime. This could perhaps be accelerated by
sampling some messages before deploying the job, to check that the messages
have the appropriate fields, but it's still a runtime check.

Cheers,
Chris

On Fri, Jan 30, 2015 at 10:43 AM, Chris Riccomini <criccom...@apache.org>
wrote:

> Hey all,
>
> Just catching up on this thread. The Calcite + Samza approach seems pretty
> compelling to me. I think most of what Julian is arguing for makes sense.
> My main concern is with practicalities.
>
> One specific case of this is the discussion about the partitioning model.
> In an ideal world, I agree, developers wouldn't need to define partitions
> in SQL. Practically speaking, though, Samza (Kafka, really) currently
> doesn't have a metadata repository. Without a metadata repository, we have
> no way of knowing 1) which key a topic is partitioned by, and 2) what the
> schema of the topic is (without looking at records). We know this *within*
> a query (or session), but not *between* queries (or sessions) from disjoint
> users.
>
> One could argue that we should spend time defining such a metadata
> repository, which is a reasonable argument. But that's also a fairly large
> effort. I wonder if we might be able to cut some corners in a clean-ish
> forwards-compatible way, so that developers don't have to wait for us to
> fully implement (or integrate with) something like a Hive metadata store.
>
> In person, one thing you mentioned, Julian, was using hints, rather than
> stuff baked into the syntax. If that's stomach-able, we could support
> partitioning through hints, until we have a full blown metadata store.
>
> Thoughts?
>
> Cheers,
> Chris
>
> On Thu, Jan 29, 2015 at 5:10 PM, Julian Hyde <jul...@hydromatic.net>
> wrote:
>
>>
>> > On Jan 29, 2015, at 4:38 PM, Yi Pan <nickpa...@gmail.com> wrote:
>> >
>> > I am wondering if I can get an average that's per 30 min window
>> averages?
>> > I.e. the following is the input events in a stream:
>> >  {10:01, ORCL, 10, 10}
>> >  {10:02, MSFT, 30, 30}
>> >  {10:03, ORCL, 100, 110}
>> >  {10:17, MSFT, 45, 75}
>> >  {10:59, ORCL, 20, 130}
>> >  {11:02, ORCL, 50, 50}
>> > Can I get the non-overlapping window average from 10:00-10:29, and
>> > 10:30-10:59, ... ? Could you give an example how to define that window
>> > operation in your model? Note that in this use case, although I may
>> have 1
>> > trading record per minute from the stream, I only generate 2 average
>> > records from 10:00-11:00.
>>
>> That takes a bit of messy date arithmetic, made even more messy by the
>> fact that you can't regard a SQL timestamp as a "milliseconds" value as one
>> would in C or Java, nor can you divide it. Hence the trick of subtracting a
>> constant timestamp, which yields an interval value, and then doing integer
>> division on that interval to find the number of 30-minute periods since the
>> epoch.
>>
>> select stream rowtime, ticker, amount,
>>   sum(amount) over (
>>     order by rowtime
>>     partition by ticker,
>>       (rowtime - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 30)
>> from StockTrades;
>>
>> If I were doing this kind of calculation often I'd define a UDF, or even
>> that user-defined window SPI I mentioned earlier.
>>
>> > {quote}
>> > CREATE TABLE Emp(empno INTEGER, name VARCHAR(20), department
>> VARCHAR(20))
>> >  PARTITION BY HASHCODE (department);
>> > {quote}
>> > That's good! At least it resolves my question on: "which field is the
>> > partition key". However, I still have the question on the number of
>> > partitions. As I stated that when the system currently does not have a
>> > "auto-scaling" feature, the number of partitions for a stream has to be
>> > explicitly specified. Where do you suggest to put this information in
>> w/o
>> > breaking SQL syntax?
>>
>> I imagine you could start the system on Tuesday with 10 partitions per
>> stream and restart it on Wednesday with 8 or 12? You wouldn't want to
>> change the SQL, because that's in the application. But you could change the
>> definition of the stream, either the DDL or by changing some other system
>> configuration. Then partitioning function (applied by the system to route
>> the record) could, say, take the value of p modulo of the current number of
>> streams.
>>
>> Julian
>>
>>
>

Reply via email to