Full Source except for mapper and timestamp assigner.
Sample Input Stream record: 1530447316589,Mary,./home What are the correct parameters to pass for data types in the JDBCAppendTableSink? Am I doing this correctly? // Get Execution Environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env); // Get and Set execution parameters. ParameterTool parms = ParameterTool.fromArgs(args); env.getConfig().setGlobalJobParameters(parms); // Configure Checkpoint and Restart // configureCheckpoint(env); // configureRestart(env); // Get Our Data Stream DataStream<Tuple3<Long,String,String>> eventStream = env .socketTextStream(parms.get("host"), parms.getInt("port")) .map(new TableStreamMapper()) .assignTimestampsAndWatermarks(new MyEventTimestampAssigner()); // Register Table // Dynamic Table From Stream tableEnvironment.registerDataStream("pageViews", eventStream, "pageViewTime.rowtime, username, url"); // Continuous Query String continuousQuery = "SELECT TUMBLE_START(pageViewTime, INTERVAL '1' MINUTE) as wstart, " + "TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " + "username, COUNT(url) as viewcount FROM pageViews " + "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username"; // Dynamic Table from Continuous Query Table windowedTable = tableEnvironment.sqlQuery(continuousQuery); windowedTable.printSchema(); // Convert Results to DataStream Table resultTable = windowedTable .select("wstart, wend, username,viewcount"); TupleTypeInfo<Tuple4<Timestamp,Timestamp,String,Long>> tupleTypeInfo = new TupleTypeInfo<>( Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.STRING, Types.LONG); DataStream<Tuple4<Timestamp,Timestamp,String,Long>> resultDataStream = tableEnvironment.toAppendStream(resultTable,tupleTypeInfo); resultDataStream.print(); // Write Result Table to Sink // Configure Sink JDBCAppendTableSink pageViewSink = JDBCAppendTableSink.builder() .setDrivername("org.apache.derby.jdbc.ClientDriver") .setDBUrl("jdbc:derby://captain:1527/rueggerllc") .setUsername("chris") .setPassword("xxxx") .setBatchSize(1) .setQuery("INSERT INTO chris.pageclicks (window_start,window_end,username,viewcount) VALUES (?,?,?,?)") .setParameterTypes(Types.SQL_TIMESTAMP,Types.SQL_TIMESTAMP,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO) .build(); // Write Result Table to Sink resultTable.writeToSink(pageViewSink); System.out.println("WRITE TO SINK"); // Execute env.execute("PageViewsTumble"); } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/