Hello, I would like to do some queries and data transformation using SQL. For this, I am trying to use StreamEndpoint from SQL API to do a simple query, as shown below:
@Override public void populateDAG(DAG dag, Configuration conf) { SQLExecEnvironment env = SQLExecEnvironment.getEnvironment(); Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of( "RowTime", Date.class, "id", Integer.class, "Product", String.class, "units", Integer.class); LineByLineFileInputOperator input = dag.addOperator("input", new LineByLineFileInputOperator()); env.registerTable(conf.get("sqlSchemaInputName"), new StreamEndpoint(input.output, fieldMapping)); env.executeSQL(dag, conf.get("sql")); } The query inside properties file is SELECT STREAM * FROM table1 and the "sqlSchemaInputName" is "table1". However, when executing the program, I am getting this error: "java.lang.RuntimeException: Unexpected tuple received. Received class: class java.lang.String. Expected class: class java.lang.Class". Also, I've tried to replace LineByLineFileInputOperator by customized classes in which the output Port is of Object type and I've got the error: "java.lang.RuntimeException: Unexpected tuple received. Received class: class java.lang.Class. Expected class: class java.lang.Class". I don't get it... The documentation says: "StreamEndpoint: This allows us to connect existing operator output or input ports to the SQL query as a data source or sink respectively." So, it should work for different sources if I'm using the output port as StreamEndpoint argument, right? I do not intend to use Kafka or File Endpoints, I need to use StreamEndpoint for my application. Any idea of the reason why this is not working? Thanks in advance!