Dears,
    I have installed the Flink standalone cluster with 3 nodes, and I want
to select some data from database. For high performance, I split the one
sql to many, and I want these many sqls can run on the 3 nodes
distributed.But actually these sqls are runing on the same node. Here is my
codes.
    ExecutionEnvironment env = ExecutionEnvironment.
getExecutionEnvironment();

    env.setParallelism(3);

    for (int i = 0; i < 100; i++) {
        String from = String.valueOf(i*1000);
        String to = String.valueOf((i+1)*1000);
        String sql = "select * from table a where a.id >= " + from + "
and a.id <= " + to;
        DataSet<Row> source =
env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername("org.postgresql.Driver")
                .setDBUrl("jdbc:postgresql://****:5432/test")
                .setUsername("root")
                .setPassword("****")
                .setQuery(sql)
                .setRowTypeInfo(new
RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DATE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BIG_DEC_TYPE_INFO))
                .finish()).rebalance();
        DataSet<Tuple2<String, Float>> top = source.flatMap(new
MyFlot()).groupBy(0).sum(1).sortPartition(1,
Order.DESCENDING).first(10);
        top.print();
    }
    env.execute();

    So how can I run thees sqls distributed so that can make the best
use of the flink cluster. There will occur OOM if the read database
operation run only on one node😢, Hope someone can help me, Thx!

Reply via email to