Thanks for the quick response Fabian I have DataStream of avro objects. Not sure how to add a TIMESTAMP attribute or convert the event_timestramp field to Timestamp Attribute for my SQL use cases. Most docs only covers the Table API with static schema. p.s. my Avro schema has 100+ fields. Can you guide me how to prepare my query to aggregate by nd_key and event_timestamp per hour?
val testData = List( UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(10).setEventTimestamp(1512172415.longValue()).build(), UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(15).setEventTimestamp(1512172415.longValue()).build(), UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(20).setEventTimestamp(1512172415.longValue()).build(), UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(25).setEventTimestamp(1512172415.longValue()).build() ) val kinesisStream = env.fromCollection(testData) tableEnv.registerDataStream(streamName, avroStream); val query = "SELECT nd_key, sum(concept_rank) FROM "+streamName + " GROUP BY nd_key" Thanks, Tao On Mon, Dec 4, 2017 at 3:32 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > yes, Apache Calcite's group window functions are supported. > > The error message tells you that the attribute event_timestamp should be > of type DATETIME (or TIMESTAMP) and not BIGINT. > Please check the documentation for details [1]. > > Best, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/table/sql.html#group-windows > > 2017-12-04 22:17 GMT+01:00 Tao Xia <t...@udacity.com>: > >> Hi All, >> Do you know if window function supported on SQL yet? >> I got the error message when trying to use group function in SQL. >> >> My query below: >> >> val query = "SELECT nd_key, concept_rank, event_timestamp FROM "+streamName >> + " GROUP BY TUMBLE(event_timestamp, INTERVAL '1' HOUR), nd_key" >> >> >> Error Message: >> Exception in thread "main" org.apache.flink.table.api.ValidationException: >> SQL validation failed. From line 1, column 74 to line 1, column 115: Cannot >> apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'. >> Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' >> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)' >> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(Fli >> nkPlannerImpl.scala:93) >> at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEn >> vironment.scala:561) >> at com.udacity.data.pipeline.AggregationJob$.main(AggregationJo >> b.scala:43) >> at com.udacity.data.pipeline.AggregationJob.main(AggregationJob.scala) >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line >> 1, column 74 to line 1, column 115: Cannot apply 'TUMBLE' to arguments of >> type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'. Supported form(s): >> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' >> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native >> ConstructorAccessorImpl.java:62) >> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De >> legatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Reso >> urces.java:463) >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803) >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788) >> at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidati >> onError(SqlValidatorImpl.java:4654) >> at org.apache.calcite.sql.SqlCallBinding.newValidationSignature >> Error(SqlCallBinding.java:284) >> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSi >> ngleOperandType(FamilyOperandTypeChecker.java:92) >> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOp >> erandTypes(FamilyOperandTypeChecker.java:109) >> at org.apache.calcite.sql.type.CompositeOperandTypeChecker.chec >> kOperandTypes(CompositeOperandTypeChecker.java:243) >> at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOper >> ator.java:659) >> at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOpera >> tor.java:432) >> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:287) >> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:223) >> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeV >> isitor.visit(SqlValidatorImpl.java:5374) >> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeV >> isitor.visit(SqlValidatorImpl.java:5361) >> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:138) >> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeI >> mpl(SqlValidatorImpl.java:1595) >> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType( >> SqlValidatorImpl.java:1580) >> at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:225) >> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGro >> upClause(SqlValidatorImpl.java:3824) >> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSel >> ect(SqlValidatorImpl.java:3210) >> at org.apache.calcite.sql.validate.SelectNamespace.validateImpl >> (SelectNamespace.java:60) >> at org.apache.calcite.sql.validate.AbstractNamespace.validate(A >> bstractNamespace.java:84) >> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNam >> espace(SqlValidatorImpl.java:945) >> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQue >> ry(SqlValidatorImpl.java:926) >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226) >> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSco >> pedExpression(SqlValidatorImpl.java:901) >> at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(Sq >> lValidatorImpl.java:611) >> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(Fli >> nkPlannerImpl.scala:89) >> ... 3 more >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot >> apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'. >> Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' >> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native >> ConstructorAccessorImpl.java:62) >> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De >> legatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Reso >> urces.java:463) >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) >> ... 30 more >> > >