[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16537025#comment-16537025 ]
ASF GitHub Bot commented on FLINK-8558: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201026527 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { + val props = properties() + assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { + val props = properties() + props.put("format.property-version", "2") + // the format should still be found + assertNotNull(TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testAmbiguousMoreSupportSelection(): Unit = { + val props = properties() + props.remove("format.important") + props.put("format.special_path", "/what/ever") + assertTrue( + TableFormatFactoryService + .find(classOf[TableFormatFactory[_]], props) + .isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousClassBasedSelection(): Unit = { + val props = properties() + props.remove("format.important") + assertTrue( + TableFormatFactoryService + .find(classOf[TestAmbiguousTableFormatFactory], props) + .isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousSchemaBasedSelection(): Unit = { + val props = properties() + props.remove("format.important") + props.put("schema.weird_field", "unknown") // this is unknown to the schema derivation factory + assertTrue( + TableFormatFactoryService + .find(classOf[TableFormatFactory[_]], props) + .isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testMissingClass(): Unit = { + val props = properties() + // this class is not a valid factory + TableFormatFactoryService.find(classOf[TableFormatFactoryServiceTest], props) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testInvalidContext(): Unit = { + val props = properties() + props.put("format.type", "FAIL") // no context specifies this + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testUnsupportedProperty(): Unit = { + val props = properties() + props.put("format.path_new", "/new/path") // no factory has this --- End diff -- `s/path_new/property_not_defined_by_any_factory/g` and remove comment > Add unified format interfaces and format discovery > -------------------------------------------------- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors > Reporter: Timo Walther > Assignee: Timo Walther > Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)