Hi Till,
since I was using the same WITH-clause both for reading and writing I
discovered that overwrite is actually supported in the Sinks, while in the
Sources an exception is thrown (I was thinking that those properties were
simply ignored).
However the quote-character is not supported in the sinks: is this a bug or
is it the intended behaviour?.
Here is a minimal example that reproduce the problem (put in the
/tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
    final EnvironmentSettings envSettings =

EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
    // ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    final String tableInName = "testTableIn";
    final String createInTableDdl = getSourceDdl(tableInName,
"/tmp/test.csv"); //

    final String tableOutName = "testTableOut";
    final String createOutTableDdl = getSinkDdl(tableOutName,
"/tmp/test-out.csv"); //
    tableEnv.executeSql(createInTableDdl);
    tableEnv.executeSql(createOutTableDdl);

    Table tableIn = tableEnv.from(tableInName);
    Table tableOut = tableEnv.from(tableOutName);
    tableIn.insertInto(tableOutName);
    // tableEnv.toDataSet(table, Row.class).print();
    tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
 //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1',\n" + //
        " 'format.quote-character' = '\"',\n" + //
        " 'format.ignore-first-line' = 'false'" + //
        ")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
    return "CREATE TABLE " + tableName + " (\n" + //
        " `id` BIGINT,\n" + //
        " `name` STRING) WITH (\n" + //
        " 'connector.type' = 'filesystem',\n" + //
        " 'connector.property-version' = '1',\n" + //
        " 'connector.path' = '" + filePath + "',\n" + //
        " 'format.type' = 'csv',\n" + //
        " 'format.field-delimiter' = ',',\n" + //
        " 'format.num-files' = '1',\n" + //
        " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
        " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
        " 'format.property-version' = '1'\n" + //
        ")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Flavio,
>
> I tried to execute the code snippet you have provided and I could not
> reproduce the problem.
>
> Concretely I am running this code:
>
> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>     .useBlinkPlanner()
>     .inStreamingMode()
>     .build();
> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>
> tableEnv.fromValues("foobar").execute().await();
>
> Am I missing something? Maybe you can share a minimal but fully working
> example where the problem occurs. Thanks a lot.
>
> Cheers,
> Till
>
> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Any help here? Moreover if I use the DataStream APIs there's no
>> left/right outer join yet..are those meant to be added in Flink 1.13 or
>> 1.14?
>>
>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Hi to all,
>>> I'm testing writing to a CSV using Flink 1.13 and I get the following
>>> error:
>>>
>>> The matching candidates:
>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>> Unsupported property keys:
>>> format.quote-character
>>>
>>> I create the table env using this:
>>>
>>> final EnvironmentSettings envSettings =
>>> EnvironmentSettings.newInstance()//
>>>         .useBlinkPlanner()//
>>>         // .inBatchMode()//
>>>         .inStreamingMode()//
>>>         .build();
>>>     final TableEnvironment tableEnv =
>>> TableEnvironment.create(envSettings);
>>>
>>> The error is the same both with inBatchMode and inStreamingMode.
>>> Is this really not supported or am I using the wrong API?
>>>
>>> Best,
>>> Flavio
>>>
>>

Reply via email to