zhangjun0x01 commented on a change in pull request #1936:
URL: https://github.com/apache/iceberg/pull/1936#discussion_r561461670



##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
     AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
         NumberFormatException.class, () -> sql(sqlParseErrorLTE));
   }
+
+  /**
+   * The sql can be executed in both streaming and batch mode, in order to get 
the parallelism, we convert the flink
+   * Table to flink DataStream, so we only use streaming mode here.
+   *
+   * @throws TableNotExistException table not exist exception
+   */
+  @Test
+  public void testInferedParallelism() throws TableNotExistException {
+    Assume.assumeTrue("The execute mode should  be streaming mode", 
isStreamingJob);

Review comment:
       I think these are two different concepts.
   In flink, whether using batch mode or streaming mode, we can read batch 
data. Flink treats batch jobs as bounded streaming jobs, so there should be no 
problem whether it is using batch mode or streaming mode to read batch data.  
In addition, flink will use `StreamExecutionEnvironment (DataStream)` to do 
batch tasks and stream tasks uniformly ([the  doc 
link](https://flink.apache.org/news/2020/12/10/release-1.12.0.html#batch-execution-mode-in-the-datastream-api))
 . The batch mode may expire, so I think we should also use 
`StreamExecutionEnvironment (DataStream)` for batch tasks as much as possible.
   
   When we use `StreamExecutionEnvironment` , in `FlinkSource.Builder#build` 
method, 
   the `if` and `else` block in this method are both streaming jobs, `if` block 
is a bounded streaming jobs, maybe we can rename `ScanContext#isStreaming` 
field to `ScanContext#isStreamingRead`, which will be easier to understand. 
`If` code block is bounded stream job (batch), `else` code block to do 
long-running stream job.
   ```
       if (!context.isStreaming()) {
           int parallelism = inferParallelism(format, context);
           return env.createInput(format, typeInfo).setParallelism(parallelism);
         } else {
           StreamingMonitorFunction function = new 
StreamingMonitorFunction(tableLoader, context);
   
           String monitorFunctionName = String.format("Iceberg table (%s) 
monitor", table);
           String readerOperatorName = String.format("Iceberg table (%s) 
reader", table);
   
           return env.addSource(function, monitorFunctionName)
               .transform(readerOperatorName, typeInfo, 
StreamingReaderOperator.factory(format));
         }
   ```

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
     AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
         NumberFormatException.class, () -> sql(sqlParseErrorLTE));
   }
+
+  /**
+   * The sql can be executed in both streaming and batch mode, in order to get 
the parallelism, we convert the flink
+   * Table to flink DataStream, so we only use streaming mode here.
+   *
+   * @throws TableNotExistException table not exist exception
+   */
+  @Test
+  public void testInferedParallelism() throws TableNotExistException {
+    Assume.assumeTrue("The execute mode should  be streaming mode", 
isStreamingJob);

Review comment:
       > In my mind, Providing unit tests to check whether the 
`inferParallelism()` is returning the expected parallelism value is enough for 
this changes. Seems like The ITCase is validating the behavior of 
DataStreamSource#setParallelism , we could think it's always correct because 
it's a basic API in flink.
   
   I think it’s better not to use the `inferParallelism` method to get the 
parallelism to do assertion, because the `inferParallelism` method is private 
and is an internal method of iceberg. Just as you 
[commented](https://github.com/apache/iceberg/pull/1936#discussion_r554983424) 
that it is best not to use the internal code of flink, I think we should try to 
use public APIs to get information.
   
   The current `TestFlinkTableSource` class uses batch mode for unit test. In 
order not to modify too much code, we can move the `testInferedParallelism` 
method to other test classes, such as `TestFlinkScan.java`.
   
   So I think we can use `DataStream.getTransformation().getParallelism();` to 
get the parallelism of the flink operator. This method is public api of flink. 
Even if flink is upgraded in the future, it should not be modified. What do you 
think?
   

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -70,6 +73,7 @@ public static Builder forRowData() {
     private Table table;
     private TableLoader tableLoader;
     private TableSchema projectedSchema;
+    private ReadableConfig flinkConf;

Review comment:
       When we construct the `IcebergTableSource`, we use the 
`TableSourceFactory.Context#getConfiguration` method to get the configuration. 
This method returns a ReadableConfig, so we use ReadableConfig instead of 
Configuration. In addition, Configuration is the implementation class of the 
ReadableConfig interface, so I think ReadableConfig should not has a problem

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
     AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
         NumberFormatException.class, () -> sql(sqlParseErrorLTE));
   }
+
+  /**
+   * The sql can be executed in both streaming and batch mode, in order to get 
the parallelism, we convert the flink
+   * Table to flink DataStream, so we only use streaming mode here.
+   *
+   * @throws TableNotExistException table not exist exception
+   */
+  @Test
+  public void testInferedParallelism() throws TableNotExistException {
+    Assume.assumeTrue("The execute mode should  be streaming mode", 
isStreamingJob);

Review comment:
       yes,it should be a streaming source, like kafka. If necessary, we can 
open a separate PR to discuss this.

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
     AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
         NumberFormatException.class, () -> sql(sqlParseErrorLTE));
   }
+
+  /**
+   * The sql can be executed in both streaming and batch mode, in order to get 
the parallelism, we convert the flink
+   * Table to flink DataStream, so we only use streaming mode here.
+   *
+   * @throws TableNotExistException table not exist exception
+   */
+  @Test
+  public void testInferedParallelism() throws TableNotExistException {
+    Assume.assumeTrue("The execute mode should  be streaming mode", 
isStreamingJob);

Review comment:
       I update the pr,move the `testInferedParallelism` method to 
TestFlinkScanSql,use  `FlinkSource.Builder#inferParallelism` method to do the 
assertion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to