Hi Simon, This is a temporary workaround for 1.9 release. We will fix the behavior in 1.10, see FLINK-13461.
Regards, Jark On Tue, 13 Aug 2019 at 13:57, Simon Su <barley...@163.com> wrote: > Hi Jark > > Thanks for your reply. > > It’s weird that In this case the tableEnv provide the api called > “registerCatalog”, but it does not work in some cases ( like my cases ). > Do you think it’s feasible to unify this behaviors ? I think the document > is necessary, but a unify way to use tableEnv is also important. > > Thanks, > SImon > > On 08/13/2019 12:27,Jark Wu<imj...@gmail.com> <imj...@gmail.com> wrote: > > I think we might need to improve the javadoc of > tableEnv.registerTableSource/registerTableSink. > Currently, the comment says > > "Registers an external TableSink with already configured field names and > field types in this TableEnvironment's catalog." > > But, what catalog? The current one or default in-memory one? > I think, it would be better to improve the description and add a NOTE on > it. > > Regards, > Jark > > On Tue, 13 Aug 2019 at 10:52, Xuefu Z <usxu...@gmail.com> wrote: > >> Yes, tableEnv.registerTable(_) etc always registers in the default >> catalog. >> To create table in your custom catalog, you could use >> tableEnv.sqlUpdate("create table ...."). >> >> Thanks, >> Xuefu >> >> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <barley...@163.com> wrote: >> >> > Hi Xuefu >> > >> > Thanks for you reply. >> > >> > Actually I have tried it as your advises. I have tried to call >> > tableEnv.useCatalog and useDatabase. Also I have tried to use >> > “catalogname.databasename.tableName” in SQL. I think the root cause is >> > that when I call tableEnv.registerTableSource, it’s always use a >> “build-in” >> > Catalog and Database rather than the custom one. So if I want to use a >> > custom one, I have to write code like this: >> > >> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, >> > EnvironmentSettings.newInstance() >> > .useBlinkPlanner() >> > .inStreamingMode() >> > .withBuiltInCatalogName("ca1") >> > .withBuiltInDatabaseName("db1") >> > .build()); >> > >> > >> > As Dawid said, if I want to store in my custom catalog, I can call >> > catalog.createTable or using DDL. >> > >> > Thanks, >> > SImon >> > >> > On 08/13/2019 02:55,Xuefu Z<usxu...@gmail.com> <usxu...@gmail.com> >> wrote: >> > >> > Hi Simon, >> > >> > Thanks for reporting the problem. There is some rough edges around >> catalog >> > API and table environments, and we are improving post 1.9 release. >> > >> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in >> > Flink's CatalogManager, It doens't change the default catalog/database >> as >> > you expected. To switch to your newly registered catalog, you could call >> > tableEnv.useCatalog() and .useDatabase(). >> > >> > As an alternative, you could fully qualify your table name with a >> > "catalog.db.table" syntax without switching current catalog/database. >> > >> > Please try those and let me know if you find new problems. >> > >> > Thanks, >> > Xuefu >> > >> > >> > >> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <barley...@163.com> wrote: >> > >> >> Hi All >> >> I want to use a custom catalog by setting the name “ca1” and >> create a >> >> database under this catalog. When I submit the >> >> SQL, and it raises the error like : >> >> >> >> >> >> Exception in thread "main" >> >> org.apache.flink.table.api.ValidationException: SQL validation failed. >> From >> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found >> >> within 'ca1.db1' >> >> at >> >> >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) >> >> at >> >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) >> >> at >> >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) >> >> at >> >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) >> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) >> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) >> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From >> line >> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found >> within >> >> 'ca1.db1' >> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> >> at >> >> >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> >> at >> >> >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> >> at >> >> >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) >> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) >> >> at >> >> >> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) >> >> at >> >> >> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) >> >> at >> >> >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) >> >> at >> >> >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >> >> at >> >> >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) >> >> at >> >> >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) >> >> ... 7 more >> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: >> Object >> >> 'orderstream' not found within 'ca1.db1' >> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> >> at >> >> >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> >> at >> >> >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> >> at >> >> >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) >> >> ... 26 more >> >> >> >> It seems that Calcite cannot find the source object as expected, After >> I >> >> debug the code I found that when using tableEnv.registerTableSource or >> >> registerTableSink, It will use a build-in catalog with a hard-code >> catalog >> >> name ( default-catalog ) and database name ( default_database ) while >> >> tableEnv.registerCatalog here cannot change this behaviros, So is this >> a >> >> reasonable behaviors ? If I don’t want to use default build-in catalog >> and >> >> database, is there any other ways to do this ? >> >> >> >> >> >> GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); >> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to >> >> change build-in catalog !! >> >> tableEnv.useCatalog(catalog.getName()); >> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), >> >> "comment"), true); >> >> tableEnv.useDatabase("db1"); >> >> >> >> tableEnv.connect(sourceKafka) >> >> .withFormat(csv) >> >> .withSchema(schema2) >> >> .inAppendMode() >> >> .registerTableSource("orderstream"); >> >> >> >> tableEnv.connect(sinkKafka) >> >> .withFormat(csv) >> >> .withSchema(schema2) >> >> .inAppendMode() >> >> .registerTableSink("sinkstream");; >> >> >> >> String sql = "insert into ca1.db1.sinkstream " + >> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from >> >> ca1.db1.orderstream " + >> >> "group by tumble(ts, INTERVAL '5' SECOND), data"; >> >> >> >> tableEnv.sqlUpdate(sql); >> >> tableEnv.execute("test"); >> >> >> >> >> >> Thanks, >> >> SImon >> >> >> >> >> > >> > -- >> > Xuefu Zhang >> > >> > "In Honey We Trust!" >> > >> > >> >> -- >> Xuefu Zhang >> >> "In Honey We Trust!" >> >