*Code in Flink 1.9.3:*
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings mySetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
mySetting);

        env.setParallelism(1);

        DataStream<String> oriStream = env.fromElements("test", "union
all");

        Table testTable = tableEnv.fromDataStream(oriStream, "text");
        tableEnv.registerTable("test_table", testTable);

        Table regularJoin = tableEnv.sqlQuery(
                "SELECT\n" +
                        "    text AS text,\n" +
                        "    '中文' AS another_text\n" +
                        "FROM\n" +
                        "    test_table\n" +
                        "UNION ALL\n" +
                        "SELECT\n" +
                        "    'another_text' AS text,\n" +
                        "    '中文' AS another_text\n" +
                        "FROM\n" +
                        "    test_table");
        DataStream<Row> appendStream = tableEnv.toAppendStream(regularJoin,
Row.class);
        appendStream.print();

        env.execute("test-union-all");
    }
}

*Error Message:*
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3332)
        at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
        at 
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
        at java.lang.StringBuilder.append(StringBuilder.java:136)
        at scala.StringContext.standardInterpolator(StringContext.scala:126)
        at scala.StringContext.s(StringContext.scala:95)
        at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableStringConstants(CodeGeneratorContext.scala:716)
        at
org.apache.flink.table.planner.codegen.GenerateUtils$.generateLiteral(GenerateUtils.scala:357)
        at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLiteral(ExprCodeGenerator.scala:392)
        at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLiteral(ExprCodeGenerator.scala:51)
        at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1137)
        at
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$8.apply(ExprCodeGenerator.scala:448)
        at
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$8.apply(ExprCodeGenerator.scala:439)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:439)
        at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
        at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
        at
org.apache.flink.table.planner.codegen.ExpressionReducer$$anonfun$4.apply(ExpressionReducer.scala:84)
        at
org.apache.flink.table.planner.codegen.ExpressionReducer$$anonfun$4.apply(ExpressionReducer.scala:84)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:84)

*Code in Flink 1.11.2 with some new syntax:*
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class StreamingJob {

        public static void main(String[] args) throws Exception {
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings mySetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env,
mySetting);

                env.setParallelism(1);

                DataStream<String> oriStream = env.fromElements("test", "union 
all");

                Table testTable = tableEnv.fromDataStream(oriStream, $("text"));
                tableEnv.createTemporaryView("test_table", testTable);

                Table regularJoin = tableEnv.sqlQuery(
                                "SELECT\n" +
                                                "    text AS text,\n" +
                                                "    '中文' AS another_text\n" +
                                                "FROM\n" +
                                                "    test_table\n" +
                                                "UNION ALL\n" +
                                                "SELECT\n" +
                                                "    'another_text' AS text,\n" 
+
                                                "    '中文' AS another_text\n" +
                                                "FROM\n" +
                                                "    test_table");
                DataStream<Row> appendStream = 
tableEnv.toAppendStream(regularJoin,
Row.class);
                appendStream.print();

                env.execute("test-union-all");
        }
}



-----
Best wishes.
Smile
--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Reply via email to