davidradl commented on code in PR #5:
URL: 
https://github.com/apache/flink-connector-http/pull/5#discussion_r2581407254


##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java:
##########
@@ -114,7 +118,7 @@ private SinkHttpClientResponse 
prepareSinkHttpClientResponse(
         for (var response : responses) {
             var sinkRequestEntry = response.getHttpRequest();
             var optResponse = response.getResponse();
-
+            
HttpLogger.getHttpLogger(properties).logResponse(response.getResponse().get());

Review Comment:
   It sounded reasonable to make these changes - but if I do this I get 13 unit 
test failures, it seems that making this change introduces a serialization 
issue. Part of the test failure output is: 
   
   org.apache.flink.table.api.ValidationException: Function class 'class 
org.apache.flink.connector.http.table.lookup.AsyncHttpTableLookupFunction' is 
not serializable. Make sure that the class is self-contained (i.e. no 
references to outer classes) and all inner fields are serializable as well.
        at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:595)
        at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:258)
        at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createJoinTransformation(CommonExecLookupJoin.java:262)
        at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.java:197)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
        at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
        at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1133)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
        at 
org.apache.flink.connector.http.table.lookup.HttpLookupTableSourceITCaseTest.testLookupJoinOnRowType(HttpLookupTableSourceITCaseTest.java:597)
        at java.base/java.lang.reflect.Method.invoke(Method.java:572)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
   Caused by: org.apache.flink.api.common.InvalidProgramException: 
org.apache.flink.connector.http.HttpLogger@ced37eef is not serializable. The 
object probably contains or references non serializable fields.
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
        at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:592)
        ... 30 more
   Caused by: java.io.NotSerializableException: 
org.apache.flink.connector.http.HttpLogger
        at 
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
        at 
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
        at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:547)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:149)
        ... 36 more
   
   [ERROR] 
org.apache.flink.connector.http.table.lookup.HttpLookupTableSourceITCaseTest.testLookupJoinOnRowTypeAndRootColumn
  Time elapsed: 0.675 s  <<< ERROR!
   org.apache.flink.table.api.ValidationException: Function class 'class 
org.apache.flink.connector.http.table.lookup.AsyncHttpTableLookupFunction' is 
not serializable. Make sure that the class is self-contained (i.e. no 
references to outer classes) and all inner fields are serializable as well.
        at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:595)
        at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:258)
        at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createJoinTransformation(CommonExecLookupJoin.java:262)
        at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.java:197)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
        at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
        at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1133)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
        at 
org.apache.flink.connector.http.table.lookup.HttpLookupTableSourceITCaseTest.testLookupJoinOnRowTypeAndRootColumn(HttpLookupTableSourceITCaseTest.java:668)
        at java.base/java.lang.reflect.Method.invoke(Method.java:572)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
   Caused by: org.apache.flink.api.common.InvalidProgramException: 
org.apache.flink.connector.http.HttpLogger@f6eb7b8d is not serializable. The 
object probably contains or references non serializable fields.
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
        at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:592)
        ... 30 more
   Caused by: java.io.NotSerializableException: 
org.apache.flink.connector.http.HttpLogger
        at 
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
        at 
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
        at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:547)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:149)
        ... 36 more
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to