Hi,

It has taken me quite a bit of time to figure this out.
This is the solution I have now (works on my machine).

Please tell me where I can improve this.

Turns out that the schema you provide for registerDataStream only needs the
'top level' fields of the Avro datastructure.
With only the top fields there you can still access nested fields with
something like "topfield.x.y.z" in the SQL statement.

What I found is that the easiest way to make this all work is to ensure the
rowtime field in the structure is at the top level (which makes sense in
general) and generate the fields string where I only need to know the name
of the "rowtime" field.

So I have

    DataStream<Measurement> inputStream = ...


then I register the stream with


    TypeInformation<Measurement> typeInformation =
TypeInformation.of(Measurement.class);
    String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);

    List<String> rootSchema = new ArrayList<>();
    for (String fieldName: fieldNames) {
        if (rowtimeFieldName.equals(fieldName)) {
            rootSchema.add(fieldName + ".rowtime");
        } else {
            rootSchema.add(fieldName);
        }
    }

    tableEnv.registerDataStream("MeasurementStream", inputStream,
String.join(",", rootSchema));


Now after the actual SQL has been executed I have a

    Table resultTable = ...

Now simply feeding this into a DataStream with something like this fails
badly.

    TypeInformation<Row> tupleType = new
RowTypeInfo(resultTable.getSchema().getFieldTypes());
    DataStream<Row>      resultSet =
tableEnv.toAppendStream(resultTable, tupleType);

will result in

    org.apache.flink.table.api.TableException: The time indicator type
is an internal type only.
       at 
org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)

Turns out that the schema of the output contains a field that was created
by TUMBLE_START which is of type TimeIndicatorTypeInfo

So I have to do it this way (NASTY!):

    final TypeInformation<?>[] fieldTypes =
resultTable.getSchema().getFieldTypes();
    int index;
    for(index = 0 ; index < fieldTypes.length ; index++) {
        if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
           fieldTypes[index] = SQL_TIMESTAMP;
        }
    }
    TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes);
    DataStream<Row>      resultSet =
tableEnv.toAppendStream(resultTable, tupleType);

Which gives me the desired DataStream.


Niels Basjes





On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <twal...@apache.org> wrote:

> Hi Niels,
>
> if you are coming from DataStream API, all you need to do is to write a
> timestamp extractor.
>
> When you call:
>
> tableEnv.registerDataStream("TestStream", letterStream,
> "EventTime.rowtime, letter, counter");
>
> The ".rowtime" means that the framework will extract the rowtime from the
> stream record timestamp. You don't need to name all fields again but could
> simply construct a string from letterStream.getTypeInfo().getFieldNames().
> I hope we can improve this further in the future as part of FLIP-37.
>
> Regards,
> Timo
>
> Am 14.08.19 um 17:00 schrieb Niels Basjes:
>
> Hi,
>
> Experimenting with the StreamTableEnvironment I build something like this:
>
> DataStream<Tuple3<Long, String, Long>> letterStream = ...
> tableEnv.registerDataStream("TestStream", letterStream,
> "EventTime.rowtime, letter, counter");
>
>
> Because the "EventTime" was tagged with ".rowtime" it is now being used as
> the rowtime and has the DATETIME so I can do this
>
> TUMBLE_START(eventTime, INTERVAL '1' MINUTE)
>
>
> So far so good.
>
> Working towards a more realistic scenario I have a source that produces a
> stream of records that have been defined using Apache Avro.
>
> So I have a Measurement.avdl that (among other things) contains something
> like this:
>
> record Measurement {
>    /** The time (epoch in milliseconds since 1970-01-01 UTC) when the
> event occurred */
>     long                        timestamp;
>     string                      letter;
>     long                        pageviews;
> }
>
>
> Now because the registerDataStream call can also derive the schema from
> the provided data I can do this:
>
> DataStream<Measurement> inputStream = ...
> tableEnv.registerDataStream("DataStream", inputStream);
>
>
> This is very nice because any real schema is big (few hundred columns) and
> changes over time.
>
> Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a
> consequence I get this error
>
> Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL
> MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
>
>
> So far I have now yet figured how to make the system understand that the
> timestamp column show be treated as the rowtime.
> How do I do that?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>
>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to