*Code in Flink 1.9.3:*
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings mySetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
mySetting);
env.setParallelism(1);
DataStream<String> oriStream = env.fromElements("test", "union
all");
Table testTable = tableEnv.fromDataStream(oriStream, "text");
tableEnv.registerTable("test_table", testTable);
Table regularJoin = tableEnv.sqlQuery(
"SELECT\n" +
" text AS text,\n" +
" '中文' AS another_text\n" +
"FROM\n" +
" test_table\n" +
"UNION ALL\n" +
"SELECT\n" +
" 'another_text' AS text,\n" +
" '中文' AS another_text\n" +
"FROM\n" +
" test_table");
DataStream<Row> appendStream = tableEnv.toAppendStream(regularJoin,
Row.class);
appendStream.print();
env.execute("test-union-all");
}
}
*Error Message:*
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at scala.StringContext.standardInterpolator(StringContext.scala:126)
at scala.StringContext.s(StringContext.scala:95)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableStringConstants(CodeGeneratorContext.scala:716)
at
org.apache.flink.table.planner.codegen.GenerateUtils$.generateLiteral(GenerateUtils.scala:357)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLiteral(ExprCodeGenerator.scala:392)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLiteral(ExprCodeGenerator.scala:51)
at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1137)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$8.apply(ExprCodeGenerator.scala:448)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$8.apply(ExprCodeGenerator.scala:439)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:439)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
at
org.apache.flink.table.planner.codegen.ExpressionReducer$$anonfun$4.apply(ExpressionReducer.scala:84)
at
org.apache.flink.table.planner.codegen.ExpressionReducer$$anonfun$4.apply(ExpressionReducer.scala:84)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:84)
*Code in Flink 1.11.2 with some new syntax:*
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings mySetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env,
mySetting);
env.setParallelism(1);
DataStream<String> oriStream = env.fromElements("test", "union
all");
Table testTable = tableEnv.fromDataStream(oriStream, $("text"));
tableEnv.createTemporaryView("test_table", testTable);
Table regularJoin = tableEnv.sqlQuery(
"SELECT\n" +
" text AS text,\n" +
" '中文' AS another_text\n" +
"FROM\n" +
" test_table\n" +
"UNION ALL\n" +
"SELECT\n" +
" 'another_text' AS text,\n"
+
" '中文' AS another_text\n" +
"FROM\n" +
" test_table");
DataStream<Row> appendStream =
tableEnv.toAppendStream(regularJoin,
Row.class);
appendStream.print();
env.execute("test-union-all");
}
}
-----
Best wishes.
Smile
--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/