Full Source except for mapper and timestamp assigner.

Sample Input Stream record:
1530447316589,Mary,./home


What are the correct parameters to pass for data types in the
JDBCAppendTableSink?
Am I doing this correctly? 


                // Get Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                StreamTableEnvironment tableEnvironment =
TableEnvironment.getTableEnvironment(env); 
                
                // Get and Set execution parameters.
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);
                
                // Configure Checkpoint and Restart
                // configureCheckpoint(env);
                // configureRestart(env);
                
                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), 
parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new 
MyEventTimestampAssigner());
                

                // Register Table
                // Dynamic Table From Stream
                tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");
                
            // Continuous Query
                String continuousQuery =
                                "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' 
MINUTE) as wstart, " +
                                "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) 
as wend, " +
                                "username, COUNT(url) as viewcount FROM 
pageViews " +
                                "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' 
MINUTE), username";
                
                // Dynamic Table from Continuous Query
                Table windowedTable = 
tableEnvironment.sqlQuery(continuousQuery);
                windowedTable.printSchema();    
                
                // Convert Results to DataStream
                Table resultTable = windowedTable
                        .select("wstart, wend, username,viewcount");
                
        
                TupleTypeInfo<Tuple4&lt;Timestamp,Timestamp,String,Long>> 
tupleTypeInfo =
new TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.LONG);
                DataStream<Tuple4&lt;Timestamp,Timestamp,String,Long>> 
resultDataStream =
                tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
                resultDataStream.print();                       
                
                
                // Write Result Table to Sink
                // Configure Sink
                JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder()
                        .setDrivername("org.apache.derby.jdbc.ClientDriver")
                        .setDBUrl("jdbc:derby://captain:1527/rueggerllc")
                        .setUsername("chris")
                        .setPassword("xxxx")
                        .setBatchSize(1)
                        .setQuery("INSERT INTO chris.pageclicks
(window_start,window_end,username,viewcount) VALUES (?,?,?,?)")
                
.setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO)
                        .build();
                
                
                // Write Result Table to Sink
                resultTable.writeToSink(pageViewSink);
                System.out.println("WRITE TO SINK");            
                

                // Execute
                env.execute("PageViewsTumble");
        }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to