My DDL is:

CREATE TABLE csv (
       id BIGINT,
       name STRING
) WITH (
       'connector' = 'filesystem',
       'path' = '.....',
       'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote:

> Hi Flavio,
>
> We would recommend you to use new table source & sink interfaces, which
> have different
> property keys compared to the old ones, e.g. 'connector' v.s.
> 'connector.type'.
>
> You can follow the 1.12 doc [1] to define your csv table, everything
> should work just fine.
>
> *Flink SQL> set table.dml-sync=true;*
>
> *[INFO] Session property has been set.*
>
>
> *Flink SQL> select * from csv;*
>
> *+----------------------+----------------------+*
>
> *|                   id |                 name |*
>
> *+----------------------+----------------------+*
>
> *|                    3 |                    c |*
>
> *+----------------------+----------------------+*
>
> *Received a total of 1 row*
>
>
> *Flink SQL> insert overwrite csv values(4, 'd');*
>
> *[INFO] Submitting SQL update statement to the cluster...*
>
> *[INFO] Execute statement in sync mode. Please wait for the execution
> finish...*
>
> *[INFO] Complete execution of the SQL update statement.*
>
>
> *Flink SQL> select * from csv;*
>
> *+----------------------+----------------------+*
>
> *|                   id |                 name |*
>
> *+----------------------+----------------------+*
>
> *|                    4 |                    d |*
>
> *+----------------------+----------------------+*
>
> *Received a total of 1 row*
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Hi Till,
>> since I was using the same WITH-clause both for reading and writing I
>> discovered that overwrite is actually supported in the Sinks, while in the
>> Sources an exception is thrown (I was thinking that those properties were
>> simply ignored).
>> However the quote-character is not supported in the sinks: is this a bug
>> or is it the intended behaviour?.
>> Here is a minimal example that reproduce the problem (put in the
>> /tmp/test.csv something like '1,hello' or '2,hi').
>>
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> public class FlinkCsvTest {
>>   public static void main(String[] args) throws Exception {
>>     final EnvironmentSettings envSettings =
>>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>     final TableEnvironment tableEnv =
>> TableEnvironment.create(envSettings);
>>     // ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>     // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
>>     final String tableInName = "testTableIn";
>>     final String createInTableDdl = getSourceDdl(tableInName,
>> "/tmp/test.csv"); //
>>
>>     final String tableOutName = "testTableOut";
>>     final String createOutTableDdl = getSinkDdl(tableOutName,
>> "/tmp/test-out.csv"); //
>>     tableEnv.executeSql(createInTableDdl);
>>     tableEnv.executeSql(createOutTableDdl);
>>
>>     Table tableIn = tableEnv.from(tableInName);
>>     Table tableOut = tableEnv.from(tableOutName);
>>     tableIn.insertInto(tableOutName);
>>     // tableEnv.toDataSet(table, Row.class).print();
>>     tableEnv.execute("TEST read/write");
>>
>>   }
>>
>>   private static String getSourceDdl(String tableName, String filePath) {
>>     return "CREATE TABLE " + tableName + " (\n" + //
>>         " `id` BIGINT,\n" + //
>>         " `name` STRING) WITH (\n" + //
>>         " 'connector.type' = 'filesystem',\n" + //
>>         " 'connector.property-version' = '1',\n" + //
>>         " 'connector.path' = '" + filePath + "',\n" + //
>>         " 'format.type' = 'csv',\n" + //
>>         " 'format.field-delimiter' = ',',\n" + //
>>  //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>>         " 'format.property-version' = '1',\n" + //
>>         " 'format.quote-character' = '\"',\n" + //
>>         " 'format.ignore-first-line' = 'false'" + //
>>         ")";
>>   }
>>
>>   private static String getSinkDdl(String tableName, String filePath) {
>>     return "CREATE TABLE " + tableName + " (\n" + //
>>         " `id` BIGINT,\n" + //
>>         " `name` STRING) WITH (\n" + //
>>         " 'connector.type' = 'filesystem',\n" + //
>>         " 'connector.property-version' = '1',\n" + //
>>         " 'connector.path' = '" + filePath + "',\n" + //
>>         " 'format.type' = 'csv',\n" + //
>>         " 'format.field-delimiter' = ',',\n" + //
>>         " 'format.num-files' = '1',\n" + //
>>         " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks
>> only)
>>         " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>>         " 'format.property-version' = '1'\n" + //
>>         ")";
>>   }
>> }
>>
>> Thanks for the support,
>> Flavio
>>
>>
>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> I tried to execute the code snippet you have provided and I could not
>>> reproduce the problem.
>>>
>>> Concretely I am running this code:
>>>
>>> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>>>     .useBlinkPlanner()
>>>     .inStreamingMode()
>>>     .build();
>>> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>>>
>>> tableEnv.fromValues("foobar").execute().await();
>>>
>>> Am I missing something? Maybe you can share a minimal but fully working
>>> example where the problem occurs. Thanks a lot.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Any help here? Moreover if I use the DataStream APIs there's no
>>>> left/right outer join yet..are those meant to be added in Flink 1.13 or
>>>> 1.14?
>>>>
>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Hi to all,
>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the following
>>>>> error:
>>>>>
>>>>> The matching candidates:
>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>>>> Unsupported property keys:
>>>>> format.quote-character
>>>>>
>>>>> I create the table env using this:
>>>>>
>>>>> final EnvironmentSettings envSettings =
>>>>> EnvironmentSettings.newInstance()//
>>>>>         .useBlinkPlanner()//
>>>>>         // .inBatchMode()//
>>>>>         .inStreamingMode()//
>>>>>         .build();
>>>>>     final TableEnvironment tableEnv =
>>>>> TableEnvironment.create(envSettings);
>>>>>
>>>>> The error is the same both with inBatchMode and inStreamingMode.
>>>>> Is this really not supported or am I using the wrong API?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>

Reply via email to