zhangjun0x01 commented on a change in pull request #1936:
URL: https://github.com/apache/iceberg/pull/1936#discussion_r561461670
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
NumberFormatException.class, () -> sql(sqlParseErrorLTE));
}
+
+ /**
+ * The sql can be executed in both streaming and batch mode, in order to get
the parallelism, we convert the flink
+ * Table to flink DataStream, so we only use streaming mode here.
+ *
+ * @throws TableNotExistException table not exist exception
+ */
+ @Test
+ public void testInferedParallelism() throws TableNotExistException {
+ Assume.assumeTrue("The execute mode should be streaming mode",
isStreamingJob);
Review comment:
I think these are two different concepts.
In flink, whether using batch mode or streaming mode, we can read batch
data. Flink treats batch jobs as bounded streaming jobs, so there should be no
problem whether it is using batch mode or streaming mode to read batch data.
In addition, flink will use `StreamExecutionEnvironment (DataStream)` to do
batch tasks and stream tasks uniformly ([the doc
link](https://flink.apache.org/news/2020/12/10/release-1.12.0.html#batch-execution-mode-in-the-datastream-api))
. The batch mode may expire, so I think we should also use
`StreamExecutionEnvironment (DataStream)` for batch tasks as much as possible.
When we use `StreamExecutionEnvironment` , in `FlinkSource.Builder#build`
method,
the `if` and `else` block in this method are both streaming jobs, `if` block
is a bounded streaming jobs, maybe we can rename `ScanContext#isStreaming`
field to `ScanContext#isStreamingRead`, which will be easier to understand.
`If` code block is bounded stream job (batch), `else` code block to do
long-running stream job.
```
if (!context.isStreaming()) {
int parallelism = inferParallelism(format, context);
return env.createInput(format, typeInfo).setParallelism(parallelism);
} else {
StreamingMonitorFunction function = new
StreamingMonitorFunction(tableLoader, context);
String monitorFunctionName = String.format("Iceberg table (%s)
monitor", table);
String readerOperatorName = String.format("Iceberg table (%s)
reader", table);
return env.addSource(function, monitorFunctionName)
.transform(readerOperatorName, typeInfo,
StreamingReaderOperator.factory(format));
}
```
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
NumberFormatException.class, () -> sql(sqlParseErrorLTE));
}
+
+ /**
+ * The sql can be executed in both streaming and batch mode, in order to get
the parallelism, we convert the flink
+ * Table to flink DataStream, so we only use streaming mode here.
+ *
+ * @throws TableNotExistException table not exist exception
+ */
+ @Test
+ public void testInferedParallelism() throws TableNotExistException {
+ Assume.assumeTrue("The execute mode should be streaming mode",
isStreamingJob);
Review comment:
> In my mind, Providing unit tests to check whether the
`inferParallelism()` is returning the expected parallelism value is enough for
this changes. Seems like The ITCase is validating the behavior of
DataStreamSource#setParallelism , we could think it's always correct because
it's a basic API in flink.
I think it’s better not to use the `inferParallelism` method to get the
parallelism to do assertion, because the `inferParallelism` method is private
and is an internal method of iceberg. Just as you
[commented](https://github.com/apache/iceberg/pull/1936#discussion_r554983424)
that it is best not to use the internal code of flink, I think we should try to
use public APIs to get information.
The current `TestFlinkTableSource` class uses batch mode for unit test. In
order not to modify too much code, we can move the `testInferedParallelism`
method to other test classes, such as `TestFlinkScan.java`.
So I think we can use `DataStream.getTransformation().getParallelism();` to
get the parallelism of the flink operator. This method is public api of flink.
Even if flink is upgraded in the future, it should not be modified. What do you
think?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -70,6 +73,7 @@ public static Builder forRowData() {
private Table table;
private TableLoader tableLoader;
private TableSchema projectedSchema;
+ private ReadableConfig flinkConf;
Review comment:
When we construct the `IcebergTableSource`, we use the
`TableSourceFactory.Context#getConfiguration` method to get the configuration.
This method returns a ReadableConfig, so we use ReadableConfig instead of
Configuration. In addition, Configuration is the implementation class of the
ReadableConfig interface, so I think ReadableConfig should not has a problem
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
NumberFormatException.class, () -> sql(sqlParseErrorLTE));
}
+
+ /**
+ * The sql can be executed in both streaming and batch mode, in order to get
the parallelism, we convert the flink
+ * Table to flink DataStream, so we only use streaming mode here.
+ *
+ * @throws TableNotExistException table not exist exception
+ */
+ @Test
+ public void testInferedParallelism() throws TableNotExistException {
+ Assume.assumeTrue("The execute mode should be streaming mode",
isStreamingJob);
Review comment:
yes,it should be a streaming source, like kafka. If necessary, we can
open a separate PR to discuss this.
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
NumberFormatException.class, () -> sql(sqlParseErrorLTE));
}
+
+ /**
+ * The sql can be executed in both streaming and batch mode, in order to get
the parallelism, we convert the flink
+ * Table to flink DataStream, so we only use streaming mode here.
+ *
+ * @throws TableNotExistException table not exist exception
+ */
+ @Test
+ public void testInferedParallelism() throws TableNotExistException {
+ Assume.assumeTrue("The execute mode should be streaming mode",
isStreamingJob);
Review comment:
I update the pr,move the `testInferedParallelism` method to
TestFlinkScanSql,use `FlinkSource.Builder#inferParallelism` method to do the
assertion
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]