Hi,

In addition to what Rong said:

- The types look OK.
- You can also use Types.STRING, and Types.LONG instead of BasicTypeInfo.xxx
- Beware that in the failure case, you might have multiple entries in the
database table. Some databases support an upsert syntax which (together
with key or uniqueness constraints) can ensure that each result is added
just once, even if the query recovers from a failure.

Best, Fabian

2018-07-01 17:25 GMT+02:00 Rong Rong <walter...@gmail.com>:

> Hi Chris,
>
> Looking at the code, seems like JDBCTypeUtil [1] is used for converting
> Flink TypeInformation into JDBC Type (Java.sql.type), and SQL_TIMESTAMP and
> SQL_TIME are both listed in the conversion mapping. However the JDBC types
> are different.
>
> Regarding the question whether your insert is correctly configured. It
> directly relates to how your DB executes the JDBC insert command.
> 1. Regarding type settings: Looking at the JDBCOutputFormat [2], seems
> like you can even execute your command without type array or type mapping
> cannot be found, in this case the PrepareStatement will be written with
> plain Object type. I tired it on MySQL and it actually works pretty well.
> 2. Another question is whether your underlying DB can handle "implicit
> type cast": For example, inserting an INTEGER type into a BIGINT column.
> AFAIK JDBCAppendableSink does not check compatibilities before writeRecord,
> so it might be a good idea to include some sanity check beforehand.
>
> Thanks,
> Rong
>
> [1] https://github.com/apache/flink/blob/master/flink-
> connectors/flink-jdbc/src/main/java/org/apache/flink/
> api/java/io/jdbc/JDBCTypeUtil.java
> [2] https://github.com/apache/flink/blob/master/flink-
> connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/
> JDBCOutputFormat.java#L109
>
> On Sun, Jul 1, 2018 at 5:22 AM chrisr123 <chris.rueg...@gmail.com> wrote:
>
>>
>> Full Source except for mapper and timestamp assigner.
>>
>> Sample Input Stream record:
>> 1530447316589,Mary,./home
>>
>>
>> What are the correct parameters to pass for data types in the
>> JDBCAppendTableSink?
>> Am I doing this correctly?
>>
>>
>>                 // Get Execution Environment
>>                 StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>                 env.setStreamTimeCharacteristic(
>> TimeCharacteristic.EventTime);
>>                 StreamTableEnvironment tableEnvironment =
>> TableEnvironment.getTableEnvironment(env);
>>
>>                 // Get and Set execution parameters.
>>                 ParameterTool parms = ParameterTool.fromArgs(args);
>>                 env.getConfig().setGlobalJobParameters(parms);
>>
>>                 // Configure Checkpoint and Restart
>>                 // configureCheckpoint(env);
>>                 // configureRestart(env);
>>
>>                 // Get Our Data Stream
>>                 DataStream<Tuple3&lt;Long,String,String>> eventStream =
>> env
>>                                 .socketTextStream(parms.get("host"),
>> parms.getInt("port"))
>>                                 .map(new TableStreamMapper())
>>                                 .assignTimestampsAndWatermarks(new
>> MyEventTimestampAssigner());
>>
>>
>>                 // Register Table
>>                 // Dynamic Table From Stream
>>                 tableEnvironment.registerDataStream("pageViews",
>> eventStream,
>> "pageViewTime.rowtime, username, url");
>>
>>             // Continuous Query
>>                 String continuousQuery =
>>                                 "SELECT TUMBLE_START(pageViewTime,
>> INTERVAL '1' MINUTE) as wstart, " +
>>                                 "TUMBLE_END(pageViewTime, INTERVAL '1'
>> MINUTE) as wend, " +
>>                                 "username, COUNT(url) as viewcount FROM
>> pageViews " +
>>                                 "GROUP BY TUMBLE(pageViewTime, INTERVAL
>> '1' MINUTE), username";
>>
>>                 // Dynamic Table from Continuous Query
>>                 Table windowedTable = tableEnvironment.sqlQuery(
>> continuousQuery);
>>                 windowedTable.printSchema();
>>
>>                 // Convert Results to DataStream
>>                 Table resultTable = windowedTable
>>                         .select("wstart, wend, username,viewcount");
>>
>>
>>                 TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>>
>> tupleTypeInfo =
>> new TupleTypeInfo<>(
>>                                 Types.SQL_TIMESTAMP,
>>                                 Types.SQL_TIMESTAMP,
>>                                 Types.STRING,
>>                                 Types.LONG);
>>                 DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>>
>> resultDataStream =
>>                 tableEnvironment.toAppendStream(resultTable,
>> tupleTypeInfo);
>>                 resultDataStream.print();
>>
>>
>>                 // Write Result Table to Sink
>>                 // Configure Sink
>>                 JDBCAppendTableSink pageViewSink =
>> JDBCAppendTableSink.builder()
>>                         .setDrivername("org.apache.
>> derby.jdbc.ClientDriver")
>>                         .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
>>                         .setUsername("chris")
>>                         .setPassword("xxxx")
>>                         .setBatchSize(1)
>>                         .setQuery("INSERT INTO chris.pageclicks
>> (window_start,window_end,username,viewcount) VALUES (?,?,?,?)")
>>
>> .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,
>> BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
>>                         .build();
>>
>>
>>                 // Write Result Table to Sink
>>                 resultTable.writeToSink(pageViewSink);
>>                 System.out.println("WRITE TO SINK");
>>
>>
>>                 // Execute
>>                 env.execute("PageViewsTumble");
>>         }
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>
>

Reply via email to