dianfu commented on a change in pull request #14844:
URL: https://github.com/apache/flink/pull/14844#discussion_r570093195



##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
##########
@@ -126,4 +124,23 @@ public void finishCurrentBatch() throws Exception {
         arrowStreamWriter.writeBatch();
         arrowWriter.reset();
     }
+
+    public void reinitializeReader(InputStream bais) throws IOException {
+        arrowReader = null;
+        arrowStreamReader.close();
+        initializeReader(bais);
+    }
+
+    public void reInitializeWriter(OutputStream baos) throws IOException {
+        initializeWriter(baos);
+    }
+
+    private void initializeReader(InputStream bais) {
+        arrowStreamReader = new ArrowStreamReader(bais, allocator);

Review comment:
       What about change it as following:
   ```
   arrowReader = null;
   if (arrowStreamReader != null) {
     arrowStreamReader.close();
   }
   arrowStreamReader = new ArrowStreamReader(bais, allocator);
   ```
   
   Then we can avoid method reinitializeReader?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
##########
@@ -126,4 +124,23 @@ public void finishCurrentBatch() throws Exception {
         arrowStreamWriter.writeBatch();
         arrowWriter.reset();
     }
+
+    public void reinitializeReader(InputStream bais) throws IOException {
+        arrowReader = null;
+        arrowStreamReader.close();
+        initializeReader(bais);
+    }
+
+    public void reInitializeWriter(OutputStream baos) throws IOException {

Review comment:
       Seems that this method is not necessary.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
##########
@@ -175,6 +175,7 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) 
throws Exception {
             windowAggResult.replace(key, arrowSerializer.read(i));
             rowDataWrapper.collect(reuseJoinedRow.replace(windowAggResult, 
windowProperty));
         }
+        arrowSerializer.reinitializeReader(bais);

Review comment:
       Is it possible to move `reinitializeReader(bais)` inside 
`ArowSerializer.load()`. Then we could avoid this change.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperator.java
##########
@@ -71,6 +71,7 @@ protected void invokeCurrentBatch() throws Exception {
             elementCount += currentBatchCount;
             checkInvokeFinishBundleByCount();
             currentBatchCount = 0;
+            arrowSerializer.reInitializeWriter(baos);

Review comment:
       Is it possible to call `arrowSerializer.reInitializeWriter(baos);` 
inside `finishCurrentBatch`? Then we could avoid this change. 

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
##########
@@ -126,4 +126,15 @@ public void finishCurrentBatch() throws Exception {
         arrowStreamWriter.writeBatch();
         arrowWriter.reset();
     }
+
+    public void resetReader(InputStream bais) throws IOException {
+        arrowReader = null;
+        arrowStreamReader.close();
+        arrowStreamReader = new ArrowStreamReader(bais, allocator);
+    }
+
+    public void resetWriter(OutputStream baos) throws IOException {

Review comment:
       the parameter baos could be removed

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
##########
@@ -126,4 +126,15 @@ public void finishCurrentBatch() throws Exception {
         arrowStreamWriter.writeBatch();
         arrowWriter.reset();
     }
+
+    public void resetReader(InputStream bais) throws IOException {

Review comment:
       the parameter bais could be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to